RxKotlin 공부하기
옵저버블과 옵저버와 구독자
Observable
Observable
은 그 컨슈머(Observer
)가 소비할 수 있는 값을 산출해 내는 기본 계산 작업을 갖고 있다. 여기서 가장 중요한 것은 컨슈머(Observer
)가 값을 pull
방식을 사용해 접근하지 않는다는 점이다. 오히려 옵저버블은 컨슈머에게 값을 push
하는 역할을 한다. 순차적으로 이해해보면
- 옵저버는 옵저버블을 구독한다.
- 옵저버블이 그 내부의 아이템들을 내보내기 시작한다.
- 옵저버는 옵저버블에서 내보내는 모든 아이템에 반응한다.
Observable 이 동작하는 방법
- onNext : 옵저버블은 모든 아이템을 하나씩 이 메서드에 전달한다.
- onComplete : 모든 아이템이
onNext
메서드를 통과하면 옵저버블은onComplete
메서드를 호출한다. - onError : 옵저버블에서 에러가 발생하면
onError
메서드가 호출돼 정의된 대로 에러를 처리한다.onError
와onComplete
는 터미널 이벤트이다.onError
가 호출됐을 때onComplete
가 호출되지 않으며, 반대의 경우도 마찬가지다.
예제를 보자
[ 예제 ]1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
fun main() { val observer: Observer<Any> = object: Observer<Any> { // 1 override fun onComplete() { // 2 println("All Completed") } override fun onNext(item: Any) {// 3 println("Next $item") } override fun onError(e: Throwable) {// 4 println("Subscribed to $e") } override fun onSubscribe(d: Disposable) {// 5 println("Subscribed to $d") } } val observable: Observable<Any> = listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f).toObservable() // 6 observable.subscribe(observer)// 7 val observableOnList: Observable<List<Any>> = Observable.just( listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f), listOf("List with Single Item"), listOf(1,2,3,4,5,6) ) // 8 observableOnList.subscribe(observer)// 9 }
- 1,2 ->
onComplete()
메서드는Observable
이 오류 없이 모든 아이템을 처리하면 호출된다. - 3 ->
onNext(item: Any)
함수를 정의했으며 이 함수는 옵저버블이 내보내는 각 아이템에 대해 호출된다. - 4 -> 옵저버블에 오류가 발생했을 때 호출된다.
- 5 -> 옵저버가 옵저버블을 구독할 때마다 호출된다.
- 6 -> 옵저버블을 생성하고
- 7 ->
observer
가observable
을 구독한다.
Observable.create()
언제든지 Observable.create 메서드로 옵저버블을 직접 생성할 수 있다. 이 메서드는 관찰 대상자를 ObservableEmitter<T>
인터페이스의 인스턴스를 입력받는다.
[ 예제 ]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fun main() {
val observer: Observer<String> = object: Observer<String> {
override fun onComplete() {
println("All Completed")
}
override fun onNext() {
println("Next $item")
}
override fun onError(e: Throwable) {
println("Error Occured #{e.message}")
}
override fun onSubscribed(d: Disposable) {
println("New Subscribe")
}
} // Observer 생성
val observable: Observable<String> = Observable.create<String> { // 1
it.onNext("Emit 1")
it.onNext("Emit 2")
it.onNext("Emit 3")
it.onNext("Emit 4")
it.onComplete()
}
observable.subscrbie(observer)
val observable2: Observable<String> = Observable.create<String> { // 2
it.onNext("Emit 1")
it.onNext("Emit 2")
it.onNext("Emit 3")
it.onNext("Emit 4")
it.onError(Exception("My Custom Exception"))
}
observable2.subscrbie(observer)
}
- 1 ->
Observable.create
메서드로 옵저버블을 생성했다.onNext
메서드로 4개의 문자열을 내보낸 후 다음onComplete
메서드로 완료되었음을 알려준다. - 2 ->
onComplete
를 호출하는 대신 사용자 정의된Exception
으로onError
를 호출한다.
Observable.create 메서드는 특히 사용자가 지정한 데이터 구조를 사용하거나 내보내는 값을 제어하려고 할 때 유용하다. 다른 스레드에서 옵저버로 값을 내보낼 수도 있다.
Observable.from()
Observable.from
메서드는 Observable.create
메서드에 비해 상대적으로 간단하다.
from
메서드의 도움을 받아 거의 모든 코틀린 구조체로부터 Observable 인스턴스
를 생성할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fun main() {
val observer: Observer<String> = object: Observer<String> {
override fun onComplete() {
println("All Completed")
}
override fun onNext(item: String) {
println("Next $item")
}
override fun onError(e: Throwable) {
println("Error Occured ${e.message}")
}
override fun onSubscribed(d: Disposable) {
println("New Subscription")
}
} // Observer 생성
val list = listOf("String1", "String2", "String3", "String4")
val observableFromIterable: Observable<String> =
Observable.fromIterable(list) //1
observableFromIterable.subscribe(observer)
val observableFromCallable: Observable<String> =
Observable.fromCallable(callable)
observableFromCallable.subscribe(observer)
}
Observable.fromIterable
메서드를 사용해 Iterable 인스턴스
로부터 옵저버블을 생성했다.
Observable.fromCallable
메서드를 호출해 Callable 인스턴스
에서 옵저버블을 생성했다.
toObservable 의 확장 함수의 이해
코틀린의 확장함수
덕분에 List 와 같이 어떠한 Iterable 인스턴스도 Observable로 큰 어려움 없이 변경이 가능하다.
Observalbe.just 메서드 이해
Observable.just
는 넘겨진 인자만을 배출하는 옵저버블을 생성
한다. Iterable 인스턴스를 Observable.just에 단일 인자로 넘기면 전체 목록을 하나의 아이템으로 배출하는데, 이는 Iterable 내부의 각각의 아이템을 Observable로 생성하는 Observable.from 과는 다르다는 점에 유의해야한다.
Observable.just
를 호출하면 다음과 같은 일이 일어난다.
- 인자와 함께 Observable.just 를 호출
- Observable.just 는 옵저버블 생성
- onNext 알림을 통해 각각의 아이템을 내보냄
- 모든 인자의 제출이 완료되면 onComplete 알림을 실행
[ 예제 ]1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
fun main() { val observer: Observer<Any> = object: Observer<Any> { override fun onComplete() { println("All Completed") } override fun onNext(item: String) { println("Next $item") } override fun onError(e: Throwable) { println("Error Occured ${e.message}") } override fun onSubscribed(d: Disposable) { println("New Subscription") } } Observable.just("A String").subscribe(observer) Observable.just(54).subscribe(observer) Observable.just(listOf("String1","String2","String3","String4")).subscribe(observer) Observable.just( mapOf( Pair("Key1", "Value1"), Pair("Key2", "Value2"), Pair("Key3", "Value3"), ) ).subscribe(observer) Observable.just("String1", "String2", "String3").subscrbie(observer) // 1 }
결과를 확인해보면
리스트와 맵도 단일 아이템으로 취급
된다. 하지만 마지막에Observable.just
의 인자로 넘기고 있는 주석1
을 확인해보면Observable.just
는 각자의 인자를 별개의 아이템으로 받아들여 내보내고 있다.
Observable 의 다른 팩토리 메서드
[ 예제 ]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fun main() {
val observer: Observer<Any> = object: Observer<Any> {
override fun onComplete() {
println("All Completed")
}
override fun onNext(item: String) {
println("Next $item")
}
override fun onError(e: Throwable) {
println("Error Occured ${e.message}")
}
override fun onSubscribed(d: Disposable) {
println("New Subscription")
}
}//Create Observer
Observable.range(1, 10).subscribe(observer) // 1
Observable.empty<String>().subscribe(observer) // 2
runBlocking {
Observable.interval(300, TimeUnit.MILLISECONDS).subscribe(observer) // 3
delay(900)
Observable.timer(400, TimeUnit.MILLISECONDS).subscribe(observer) // 4
delay(450)
}
}
- 1 ->
Observable.range()
팩토리 메서드로 옵저버블을 생성 [ 제공된start
부터 시작해count
만큼의 정수를 내보낸다.] - 2 ->
Observable.empty()
메서드로 옵저버블을 생성 [onNext
로 항목을 내보내지 않고 즉시onComplete()
를 발생시킨다. ] - 3 ->
Observable.interval()
은 지정된 간격만큼의 숫자를 0부터 순차적으로 내보낸다. - 4 ->
Observable.timer()
은 지정된 시간이 경과한 후에 한 번만 실행된다.
구독자 : Observer 인터페이스
앞의 예제 코드에서 볼 수 있듯이 Observer
는 onNext(item: T)
, onError(error: Throwable)
, onComplete()
, onSubscribe(d: Disposable)
네 가지 메서드를 가지는 인터페이스이다. 옵저버블을 옵저버에 연결하면 이 네 가지 메서드가 호출된다. 다음은 네 가지 메서드에 대한 간단한 설명이다.
- onNext : 아이템을 하나씩 넘겨주기 위해서 옵저버블은 옵저버의 이 메서드를 호출한다.
- onComplete : 옵저버블이 onNext를 통한 아이템 전달이 종료됐음을 알리고 싶을 때 옵저버의 onComplete를 호출한다.
- onError : 옵저버블에서 에러가 발생했을 때 옵저버에 정의된 로직이 있다면 onError를 호출하고 그렇지 않다면 예외를 발생시킨다.
- onSubscriber : Observable이 새로운 Observer를 구독할 때마다 호출된다.
구독과 해지
Observable(관찰돼야 하는 대상)
과 Observer(관찰해야 하는 주체)
가 있다. 어떻게 이 둘을 연결할 수 있을까? Observable 과 Observer 는 키보드 또는 마우스 같은 입력 장치와 컴퓨터를 연결할 때처럼 매개체가 필요하다. Observable
을 Observer
에 연결하는 매개체는 Subscribe연산자를 사용한다.
Subscribe 연산자에 대해 1개에서 3개의 메서드 (onNext, onComplete, onError) 를 전달할 수도 있고 Observer 인터페이스의 인스턴스를 전달해 연결할 수도 있다.
[ 예제 ]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fun main() {
val observable: Observabl<Int> = Observable.range(1,5) // 1
observable.subscribe ({ // 2
//onNext 메서드
println("Next $it")
},{
//onError 메서드
println("Error ${it.message}")
},{
//onComplete 메서드
println("Done")
})
val observer: Observer<Int> = object: Observer<Int> { // 3
override fun onComplete() {
println("All Completed")
}
override fun onNext(item: String) {
println("Next $item")
}
override fun onError(e: Throwable) {
println("Error Occured ${e.message}")
}
override fun onSubscribed(d: Disposable) {
println("New Subscription")
}
}
observable.subscribe(observer)
}
- 1 ->
Observable
인스턴스를 생성하고 두 번에 걸쳐 오버로드된subscribe
연산자를 사용했다. - 2 ->
subscribe
메서드에 인수로 세 가지 메서드를 전달했다. ( 첫번째 매개 변수는 onNext 메서드 , 두번째 매개 변수는 onError 메서드, 마지막 매개 변수는 onComplete )
Observer 인터페이스의 인스턴스를 전달했다.
구독하는 동안 Observer 인스턴스 대신 메서드를 전달하면subscribe
연산자는Disposable
의 인스턴스를 반환하는데,Observer
인스턴스를 전달했다면onSubscribe
메서드의 매개 변수에서Disposable
의 인스턴스를 얻을 수 있다.Disposable
인터페이스의 인스턴스를 사용해 주어진 시간에 배출을 멈출 수 있다.
[ 예제 ]1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
fun main() { runBlocking { val observalbe: Observable<Long> = Observable.interval(100, TimeUnit.MILLISECONDS) // 1 val observer: Observer<Long> = object: Observer<Long> { lateinit var disposable: Disposable // 2 override fun onSubscribe(d: Disposable) { disposable = d // 3 } override fun onNext(item: Long) { println("Received $item") if (item >= 10 && !disposable.isDisposed) { // 4 disposable.dispose() // 5 println("Disposed") } } override fun onError(e: Throwable) { println("Error ${e.message}") } override fun onComplete() { println("Complete") } } observable.subscribe(observer) delay(1500) // 6 } }
- 1 ->
Observable.interval
팩토리 메서드로 옵저버블 생성 ( 100ms 간격마다 정수를 내보낸다. ) - 2 ->
Disposable
타입의 lateinit var disposable을 선언했는데 lateinit 은 나중에 변수가 초기화됨을 의미 - 3 ->
onSubscribe
메서드 내에서 수신된 매개 변수 값을 disposable 변수에 할당한다. - 결과 -> Received 0 ~ 10 까지 출력 후
Disposed
출력
Hot, Cold Observable
Cold Observable
앞선 모든 예제에서 동일한 옵저버블을 여러 번 구독해도 모든 구독의 새로운 배출을 얻을 수 있다.
[ 예제 ]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun main() {
val observable: Observable<String> = listOf("String1", "String2", "String3", "String4").toObservable() // 1
observable.subscribe({
// 2
println("Received $it")
}, {
println("Error ${it.message}")
}, {
println("Done")
})
observable.subscribe({
// 3
println("Received $it")
}, {
println("Error ${it.message}")
}, {
println("Done")
})
}
[ 결과 ]
1
2
3
4
5
6
7
8
9
10
Received String 1
Received String 2
Received String 3
Received String 4
Done
Received String 1
Received String 2
Received String 3
Received String 4
Done
1 에서 Observable 을 선언하고 2,3에서 두 번 구독했다. 두 번의 호출의 경우 첫 번째 호출에서 마지막 호출까지 똑같은 결과를 얻었다.
Observable
은 특징적인 기능을 갖고 있는데 각 구독마다 처음부터 아이템을 배출하는 것을 Cold Observable
이라고 한다.
Cold Observable은 구독 시에 실행을 시작하고 subscribe 가 호출되면 아이템을 푸시하기 시작하는데 각 구독에서 아이템의 동일한 순서를 푸시한다.
지금까지 사용한 모든 팩토리 메서드는 Cold Observable
을 반환한다. 콜드 옵저버블은 데이터와 유사하다. 데이터 관련 작업을 할 때, 예를 들어 안드로이드에서 SQLite나 Room 데이터베이스로 작업하는 동안은 Hot Observable
보다는 Cold Observable
에 더욱 많이 의존한다.
Hot Observable
Cold Observable
은 수동적이며 구독이 호출될 때까지 아무것도 내보내지 않는다. Hot Observable
은 이와 반대로 배출을 시작하기 위해 구독할 필요가 없다.
Hot Observable
은 데이터보다는 이벤트와 유사하다. 이벤트에는 데이터가 포함될 수 있지만 시간에 민감한 특징을 가지는데 최근 가입한 Observer가 이전에 내보낸 데이터를 놓칠 수 있기 때문이다.
ConnectableObservable 객체
ConnectableObservable
은 가장 유용한 핫 옵저버블 중 하나로, 핫 옵저버블의 좋은 예시다. ConnectableObservable 은 옵저버블, 심지어 콜드 옵저버블을 핫 옵저버블로 바꿀 수 있다. subscribe 호출로 배출을 시작하는 대신 connect
메서드를 호출한 후에 활성화 된다. connect
를 호출하기 전에 반드시 subscrbie
를 호출해야 한다. connect 를 호출한 후 구독하는 모든 호출을 이전에 생성된 배출을 놓치게 된다.
[ 예제 ]
1
2
3
4
5
6
7
8
9
10
11
fun main() {
val connectableObservable = listOf(
"String 1", "String 2", "String 3", "String 4", "String 5").toObservable().publish() // 1
connectableObservable.subscribe({ println("Subscription 1: $it") }) // 2
connectableObservable.map(String::reversed) // 3
.subscribe({println("Subscription 2 $it")}) // 4
connectableObservable.connect() // 5
connectableObservable.subscribe( {println("Subscription 3: $it")}) // 6 //배출을 받지 못함
}
ConnectableObservable
의 주요 목적은 한 옵저버블에 여러 개의 구독을 연결해 하나의 푸시에 대응할 수 있도록 하는 것이다. 이는 푸시를 반복하고 각 구독마다 따로 푸시를 보내는 콜드 옵저버블과는 상이하다. ConnectableObservable 은 connect 메서드 이전에 호출된 모든 subscription(Observers)를 연결하고 모든 옵저버에 단일 푸시를 전달한다.
예제에서 toObservable()
메서드로 옵저버블을 생성. publish()
로 콜드 옵저버블을 ConnectableObservable로 변환했다.
2주석에서 connectableObservable을 구독. 3에서는 map 연산자를 사용해 String을 뒤집었으며 4에서는 매핑된 connectableObservable을 구독했다.
5에서는 connect()
호출했고 두 옵저버에서 모두 배출이 시작됐다.
[ 결과 ]
1
2
3
4
5
6
7
8
9
10
Subscription 1: String 1
Subscription 2 1 gnirtS
Subscription 1: String 2
Subscription 2 2 gnirtS
Subscription 1: String 3
Subscription 2 3 gnirtS
Subscription 1: String 4
Subscription 2 4 gnirtS
Subscription 1: String 5
Subscription 2 5 gnirtS
옵저버블에서 단 한번의 배출로 모든 Subscriptions
/Observers
에게 배출을 전달하는 매커니즘을 멀티캐스팅
이라고 한다.
ConnectableObservable
이 Hot Observable이기 때문에 주석(6) 의 subscribe() 호출은 어떤 배출도 수신하지 않았으며 connect
이후에 일어난 모든 새로운 구독은 이전에 생성된 배출을 놓치게 된다.
[ 예제 ]
1
2
3
4
5
6
7
8
9
10
11
12
13
fun main() {
val connectableObservable = Observable.interval(100, TimeUnit.MILLISECONDS).publish() // 1
connectableObservable.subscribe( {println("Subscription 1: $it")}) // 2
connectableObservable.subscribe( {println("Subscription 2: $it")}) // 3
connectableObservable.connect() // 4
runBlocking{ delay(500) } // 5
connectableObservable.subscribe( {println("Subscription 3: $it")} ) // 6
runBlocking{ delay(500) } // 7
}
위의 예제는 Observable.interval
메서드로 옵저버블을 생성했다. 이유는 각 배출마다 간격이 생기기 때문에 connect
이후의 구독에 약간의 공간을 줄 수 있기 때문이다.
1
에서 Cold Observable을 ConnectableObservable
로 변환하고 두 번의 구독을 수행한 뒤에 connect()
를 호출했다.(주석 2,3,4)
5
에서 connect()
후 즉시 지연을 호출한 다음 다시 6
에서 구독하고, 7
에서 3번째 구독이 이 일부 데이터를 인쇄할 수 있도록 다시 지연을 호출한다.
위의 결과로 세 번째 구독이 5번째 배출을 받았고 이전의 구독을 모두 놓쳤다는 것을 알 수 있다.(세 번째 구독 이전에 5회의 배출이 있었다. - 500 밀리초 지연 / 100밀리초 간격)