[Kotlin] RxKotlin Study 03

Reactive Prgramming in Kotlin

Posted by JungHoon-Park on February 4, 2021

RxKotlin 공부하기


옵저버블과 옵저버와 구독자

Observable

Observable은 그 컨슈머(Observer)가 소비할 수 있는 값을 산출해 내는 기본 계산 작업을 갖고 있다. 여기서 가장 중요한 것은 컨슈머(Observer)가 값을 pull 방식을 사용해 접근하지 않는다는 점이다. 오히려 옵저버블은 컨슈머에게 값을 push 하는 역할을 한다. 순차적으로 이해해보면

  • 옵저버는 옵저버블을 구독한다.
  • 옵저버블이 그 내부의 아이템들을 내보내기 시작한다.
  • 옵저버는 옵저버블에서 내보내는 모든 아이템에 반응한다.

Observable 이 동작하는 방법

  • onNext : 옵저버블은 모든 아이템을 하나씩 이 메서드에 전달한다.
  • onComplete : 모든 아이템이 onNext 메서드를 통과하면 옵저버블은 onComplete 메서드를 호출한다.
  • onError : 옵저버블에서 에러가 발생하면 onError 메서드가 호출돼 정의된 대로 에러를 처리한다. onErroronComplete는 터미널 이벤트이다. onError가 호출됐을 때 onComplete가 호출되지 않으며, 반대의 경우도 마찬가지다.
    예제를 보자
    [ 예제 ]
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
    fun main() {
      val observer: Observer<Any> = object: Observer<Any> { // 1
          override fun onComplete() { // 2
              println("All Completed")
          }
    
          override fun onNext(item: Any) {// 3
              println("Next $item")
          }
    
          override fun onError(e: Throwable) {// 4
              println("Subscribed to $e")
          }
    
          override fun onSubscribe(d: Disposable) {// 5
              println("Subscribed to $d")
          }
      }
    
      val observable: Observable<Any> = listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f).toObservable() // 6
    
      observable.subscribe(observer)// 7
    
      val observableOnList: Observable<List<Any>> = Observable.just(
          listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f),
          listOf("List with Single Item"),
          listOf(1,2,3,4,5,6) 
      ) // 8
        
      observableOnList.subscribe(observer)// 9
    }
    
  • 1,2 -> onComplete() 메서드는 Observable이 오류 없이 모든 아이템을 처리하면 호출된다.
  • 3 -> onNext(item: Any)함수를 정의했으며 이 함수는 옵저버블이 내보내는 각 아이템에 대해 호출된다.
  • 4 -> 옵저버블에 오류가 발생했을 때 호출된다.
  • 5 -> 옵저버가 옵저버블을 구독할 때마다 호출된다.
  • 6 -> 옵저버블을 생성하고
  • 7 -> observerobservable을 구독한다.

Observable.create()

언제든지 Observable.create 메서드로 옵저버블을 직접 생성할 수 있다. 이 메서드는 관찰 대상자를 ObservableEmitter<T> 인터페이스의 인스턴스를 입력받는다.
[ 예제 ]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fun main() {
    val observer: Observer<String> = object: Observer<String> {
        override fun onComplete() {
            println("All Completed")
        }

        override fun onNext() {
            println("Next $item")
        }

        override fun onError(e: Throwable) {
            println("Error Occured #{e.message}")
        }

        override fun onSubscribed(d: Disposable) {
            println("New Subscribe")
        }
    } // Observer 생성

    val observable: Observable<String> = Observable.create<String> { // 1
        it.onNext("Emit 1")
        it.onNext("Emit 2")
        it.onNext("Emit 3")
        it.onNext("Emit 4")
        it.onComplete()
    }

    observable.subscrbie(observer)

    val observable2: Observable<String> = Observable.create<String> { // 2
        it.onNext("Emit 1")
        it.onNext("Emit 2")
        it.onNext("Emit 3")
        it.onNext("Emit 4")
        it.onError(Exception("My Custom Exception"))
    }

    observable2.subscrbie(observer)
}
  • 1 -> Observable.create 메서드로 옵저버블을 생성했다. onNext 메서드로 4개의 문자열을 내보낸 후 다음 onComplete 메서드로 완료되었음을 알려준다.
  • 2 -> onComplete를 호출하는 대신 사용자 정의된 Exception으로 onError를 호출한다.

Observable.create 메서드는 특히 사용자가 지정한 데이터 구조를 사용하거나 내보내는 값을 제어하려고 할 때 유용하다. 다른 스레드에서 옵저버로 값을 내보낼 수도 있다.

Observable.from()

Observable.from 메서드는 Observable.create 메서드에 비해 상대적으로 간단하다. from 메서드의 도움을 받아 거의 모든 코틀린 구조체로부터 Observable 인스턴스를 생성할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fun main() {

    val observer: Observer<String> = object: Observer<String> {

        override fun onComplete() {
            println("All Completed")
        }

        override fun onNext(item: String) {
            println("Next $item")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }

        override fun onSubscribed(d: Disposable) {
            println("New Subscription")
        }
    } // Observer 생성

    val list = listOf("String1", "String2", "String3", "String4")
    val observableFromIterable: Observable<String> = 
    Observable.fromIterable(list) //1
    observableFromIterable.subscribe(observer)

    val observableFromCallable: Observable<String> = 
    Observable.fromCallable(callable)
    observableFromCallable.subscribe(observer)
}

Observable.fromIterable 메서드를 사용해 Iterable 인스턴스 로부터 옵저버블을 생성했다.
Observable.fromCallable 메서드를 호출해 Callable 인스턴스 에서 옵저버블을 생성했다.

toObservable 의 확장 함수의 이해

코틀린의 확장함수 덕분에 List 와 같이 어떠한 Iterable 인스턴스도 Observable로 큰 어려움 없이 변경이 가능하다.

Observalbe.just 메서드 이해

Observable.just는 넘겨진 인자만을 배출하는 옵저버블을 생성한다. Iterable 인스턴스를 Observable.just에 단일 인자로 넘기면 전체 목록을 하나의 아이템으로 배출하는데, 이는 Iterable 내부의 각각의 아이템을 Observable로 생성하는 Observable.from 과는 다르다는 점에 유의해야한다.
Observable.just를 호출하면 다음과 같은 일이 일어난다.

  • 인자와 함께 Observable.just 를 호출
  • Observable.just 는 옵저버블 생성
  • onNext 알림을 통해 각각의 아이템을 내보냄
  • 모든 인자의 제출이 완료되면 onComplete 알림을 실행
    [ 예제 ]
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
    fun main() {
      val observer: Observer<Any> = object: Observer<Any> {
          override fun onComplete() {
              println("All Completed")
          }
    
          override fun onNext(item: String) {
              println("Next $item")
          }
    
          override fun onError(e: Throwable) {
              println("Error Occured ${e.message}")
          }
    
          override fun onSubscribed(d: Disposable) {
              println("New Subscription")
          }
      }
    
      Observable.just("A String").subscribe(observer)
      Observable.just(54).subscribe(observer)
      Observable.just(listOf("String1","String2","String3","String4")).subscribe(observer)
      Observable.just(
          mapOf(
              Pair("Key1", "Value1"),
              Pair("Key2", "Value2"),
              Pair("Key3", "Value3"),
          )
      ).subscribe(observer)
      Observable.just("String1", "String2", "String3").subscrbie(observer) // 1
    }
    

    결과를 확인해보면 리스트와 맵도 단일 아이템으로 취급된다. 하지만 마지막에 Observable.just의 인자로 넘기고 있는 주석 1을 확인해보면 Observable.just는 각자의 인자를 별개의 아이템으로 받아들여 내보내고 있다.

Observable 의 다른 팩토리 메서드

[ 예제 ]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fun main() {
    val observer: Observer<Any> = object: Observer<Any> {
        override fun onComplete() {
            println("All Completed")
        }

        override fun onNext(item: String) {
            println("Next $item")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }

        override fun onSubscribed(d: Disposable) {
            println("New Subscription")
        }
    }//Create Observer

    Observable.range(1, 10).subscribe(observer) // 1
    Observable.empty<String>().subscribe(observer) // 2

    runBlocking {
        Observable.interval(300, TimeUnit.MILLISECONDS).subscribe(observer) // 3
        delay(900)
        Observable.timer(400, TimeUnit.MILLISECONDS).subscribe(observer) // 4
        delay(450)
    }
}
  • 1 -> Observable.range() 팩토리 메서드로 옵저버블을 생성 [ 제공된 start 부터 시작해 count 만큼의 정수를 내보낸다.]
  • 2 -> Observable.empty() 메서드로 옵저버블을 생성 [ onNext 로 항목을 내보내지 않고 즉시 onComplete()를 발생시킨다. ]
  • 3 -> Observable.interval() 은 지정된 간격만큼의 숫자를 0부터 순차적으로 내보낸다.
  • 4 -> Observable.timer() 은 지정된 시간이 경과한 후에 한 번만 실행된다.


구독자 : Observer 인터페이스

앞의 예제 코드에서 볼 수 있듯이 ObserveronNext(item: T) , onError(error: Throwable) , onComplete() , onSubscribe(d: Disposable) 네 가지 메서드를 가지는 인터페이스이다. 옵저버블을 옵저버에 연결하면 이 네 가지 메서드가 호출된다. 다음은 네 가지 메서드에 대한 간단한 설명이다.

  • onNext : 아이템을 하나씩 넘겨주기 위해서 옵저버블은 옵저버의 이 메서드를 호출한다.
  • onComplete : 옵저버블이 onNext를 통한 아이템 전달이 종료됐음을 알리고 싶을 때 옵저버의 onComplete를 호출한다.
  • onError : 옵저버블에서 에러가 발생했을 때 옵저버에 정의된 로직이 있다면 onError를 호출하고 그렇지 않다면 예외를 발생시킨다.
  • onSubscriber : Observable이 새로운 Observer를 구독할 때마다 호출된다.

구독과 해지

Observable(관찰돼야 하는 대상)Observer(관찰해야 하는 주체) 가 있다. 어떻게 이 둘을 연결할 수 있을까? Observable 과 Observer 는 키보드 또는 마우스 같은 입력 장치와 컴퓨터를 연결할 때처럼 매개체가 필요하다. ObservableObserver에 연결하는 매개체는 Subscribe연산자를 사용한다.
Subscribe 연산자에 대해 1개에서 3개의 메서드 (onNext, onComplete, onError) 를 전달할 수도 있고 Observer 인터페이스의 인스턴스를 전달해 연결할 수도 있다.
[ 예제 ]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
fun main() {
    val observable: Observabl<Int> = Observable.range(1,5) // 1
    
    observable.subscribe ({ // 2
        //onNext 메서드
        println("Next $it")
    },{
        //onError 메서드
        println("Error ${it.message}")
    },{
        //onComplete 메서드
        println("Done")
    })

    val observer: Observer<Int> = object: Observer<Int> { // 3
        override fun onComplete() {
            println("All Completed")
        }

         override fun onNext(item: String) {
            println("Next $item")
        }

        override fun onError(e: Throwable) {
            println("Error Occured ${e.message}")
        }

        override fun onSubscribed(d: Disposable) {
            println("New Subscription")
        }
    }

    observable.subscribe(observer)
}
  • 1 -> Observable인스턴스를 생성하고 두 번에 걸쳐 오버로드된 subscribe연산자를 사용했다.
  • 2 -> subscribe 메서드에 인수로 세 가지 메서드를 전달했다. ( 첫번째 매개 변수는 onNext 메서드 , 두번째 매개 변수는 onError 메서드, 마지막 매개 변수는 onComplete )
    Observer 인터페이스의 인스턴스를 전달했다.
    구독하는 동안 Observer 인스턴스 대신 메서드를 전달하면 subscribe 연산자는 Disposable의 인스턴스를 반환하는데, Observer 인스턴스를 전달했다면 onSubscribe 메서드의 매개 변수에서 Disposable의 인스턴스를 얻을 수 있다. Disposable 인터페이스의 인스턴스를 사용해 주어진 시간에 배출을 멈출 수 있다.
    [ 예제 ]
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    
    fun main() {
      runBlocking {
          val observalbe: Observable<Long> = Observable.interval(100, TimeUnit.MILLISECONDS) // 1
    
          val observer: Observer<Long> = object: Observer<Long> {
              lateinit var disposable: Disposable // 2
    
              override fun onSubscribe(d: Disposable) {
                  disposable = d // 3
              }
    
              override fun onNext(item: Long) {
                  println("Received $item")
                  if (item >= 10 && !disposable.isDisposed) { // 4
                      disposable.dispose() // 5
                      println("Disposed")
                  }
              }
    
              override fun onError(e: Throwable) {
                  println("Error ${e.message}")
              }
    
              override fun onComplete() {
                  println("Complete")
              }
          }
    
          observable.subscribe(observer)
          delay(1500) // 6
      }
    }
    
  • 1 -> Observable.interval 팩토리 메서드로 옵저버블 생성 ( 100ms 간격마다 정수를 내보낸다. )
  • 2 -> Disposable 타입의 lateinit var disposable을 선언했는데 lateinit 은 나중에 변수가 초기화됨을 의미
  • 3 -> onSubscribe 메서드 내에서 수신된 매개 변수 값을 disposable 변수에 할당한다.
  • 결과 -> Received 0 ~ 10 까지 출력 후 Disposed 출력

Hot, Cold Observable

Cold Observable

앞선 모든 예제에서 동일한 옵저버블을 여러 번 구독해도 모든 구독의 새로운 배출을 얻을 수 있다.
[ 예제 ]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun main() {
    val observable: Observable<String> = listOf("String1", "String2", "String3", "String4").toObservable() // 1

    observable.subscribe({
        // 2
        println("Received $it")
    }, {
        println("Error ${it.message}")
    }, {
        println("Done")
    })

    observable.subscribe({
        // 3
        println("Received $it")
    }, {
        println("Error ${it.message}")
    }, {
        println("Done")
    })
}


[ 결과 ]

1
2
3
4
5
6
7
8
9
10
Received String 1
Received String 2
Received String 3
Received String 4
Done
Received String 1
Received String 2
Received String 3
Received String 4
Done


1 에서 Observable 을 선언하고 2,3에서 두 번 구독했다. 두 번의 호출의 경우 첫 번째 호출에서 마지막 호출까지 똑같은 결과를 얻었다. Observable은 특징적인 기능을 갖고 있는데 각 구독마다 처음부터 아이템을 배출하는 것을 Cold Observable이라고 한다. Cold Observable은 구독 시에 실행을 시작하고 subscribe 가 호출되면 아이템을 푸시하기 시작하는데 각 구독에서 아이템의 동일한 순서를 푸시한다.
지금까지 사용한 모든 팩토리 메서드는 Cold Observable을 반환한다. 콜드 옵저버블은 데이터와 유사하다. 데이터 관련 작업을 할 때, 예를 들어 안드로이드에서 SQLite나 Room 데이터베이스로 작업하는 동안은 Hot Observable보다는 Cold Observable에 더욱 많이 의존한다.

Hot Observable

Cold Observable은 수동적이며 구독이 호출될 때까지 아무것도 내보내지 않는다. Hot Observable은 이와 반대로 배출을 시작하기 위해 구독할 필요가 없다. Hot Observable은 데이터보다는 이벤트와 유사하다. 이벤트에는 데이터가 포함될 수 있지만 시간에 민감한 특징을 가지는데 최근 가입한 Observer가 이전에 내보낸 데이터를 놓칠 수 있기 때문이다.

ConnectableObservable 객체 ConnectableObservable은 가장 유용한 핫 옵저버블 중 하나로, 핫 옵저버블의 좋은 예시다. ConnectableObservable 은 옵저버블, 심지어 콜드 옵저버블을 핫 옵저버블로 바꿀 수 있다. subscribe 호출로 배출을 시작하는 대신 connect 메서드를 호출한 후에 활성화 된다. connect를 호출하기 전에 반드시 subscrbie를 호출해야 한다. connect 를 호출한 후 구독하는 모든 호출을 이전에 생성된 배출을 놓치게 된다.
[ 예제 ]

1
2
3
4
5
6
7
8
9
10
11
fun main() {
    val connectableObservable = listOf(
        "String 1", "String 2", "String 3", "String 4", "String 5").toObservable().publish() // 1

    connectableObservable.subscribe({ println("Subscription 1: $it") }) // 2
    connectableObservable.map(String::reversed) // 3
                        .subscribe({println("Subscription 2 $it")}) // 4

    connectableObservable.connect() // 5
    connectableObservable.subscribe( {println("Subscription 3: $it")}) // 6 //배출을 받지 못함    
}

ConnectableObservable의 주요 목적은 한 옵저버블에 여러 개의 구독을 연결해 하나의 푸시에 대응할 수 있도록 하는 것이다. 이는 푸시를 반복하고 각 구독마다 따로 푸시를 보내는 콜드 옵저버블과는 상이하다. ConnectableObservable 은 connect 메서드 이전에 호출된 모든 subscription(Observers)를 연결하고 모든 옵저버에 단일 푸시를 전달한다.
예제에서 toObservable() 메서드로 옵저버블을 생성. publish()로 콜드 옵저버블을 ConnectableObservable로 변환했다. 2주석에서 connectableObservable을 구독. 3에서는 map 연산자를 사용해 String을 뒤집었으며 4에서는 매핑된 connectableObservable을 구독했다. 5에서는 connect() 호출했고 두 옵저버에서 모두 배출이 시작됐다. [ 결과 ]

1
2
3
4
5
6
7
8
9
10
Subscription 1: String 1
Subscription 2 1 gnirtS
Subscription 1: String 2
Subscription 2 2 gnirtS
Subscription 1: String 3
Subscription 2 3 gnirtS
Subscription 1: String 4
Subscription 2 4 gnirtS
Subscription 1: String 5
Subscription 2 5 gnirtS

옵저버블에서 단 한번의 배출로 모든 Subscriptions/Observers 에게 배출을 전달하는 매커니즘을 멀티캐스팅이라고 한다. ConnectableObservableHot Observable이기 때문에 주석(6) 의 subscribe() 호출은 어떤 배출도 수신하지 않았으며 connect이후에 일어난 모든 새로운 구독은 이전에 생성된 배출을 놓치게 된다.
[ 예제 ]

1
2
3
4
5
6
7
8
9
10
11
12
13
fun main() {
    val connectableObservable = Observable.interval(100, TimeUnit.MILLISECONDS).publish() // 1
    
    connectableObservable.subscribe( {println("Subscription 1: $it")}) // 2
    connectableObservable.subscribe( {println("Subscription 2: $it")}) // 3
    connectableObservable.connect() // 4

    runBlocking{ delay(500) } // 5
    
    connectableObservable.subscribe( {println("Subscription 3: $it")} ) // 6

    runBlocking{ delay(500) } // 7
}

위의 예제는 Observable.interval 메서드로 옵저버블을 생성했다. 이유는 각 배출마다 간격이 생기기 때문에 connect 이후의 구독에 약간의 공간을 줄 수 있기 때문이다.
1에서 Cold ObservableConnectableObservable로 변환하고 두 번의 구독을 수행한 뒤에 connect()를 호출했다.(주석 2,3,4) 5에서 connect() 후 즉시 지연을 호출한 다음 다시 6에서 구독하고, 7에서 3번째 구독이 이 일부 데이터를 인쇄할 수 있도록 다시 지연을 호출한다. 위의 결과로 세 번째 구독이 5번째 배출을 받았고 이전의 구독을 모두 놓쳤다는 것을 알 수 있다.(세 번째 구독 이전에 5회의 배출이 있었다. - 500 밀리초 지연 / 100밀리초 간격)