[Kotlin] RxKotlin Study 04

Reactive Prgramming in Kotlin

Posted by JungHoon-Park on March 2, 2021

RxKotlin 공부하기


백프레셔와 플로어블 소개

  • 배압(Backpressure)

데이터 생산과 소비가 불균형적일 때 일어나는 현상

만약 10000개의 데이터를 0.1초마다 발행하고, 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓인다.

즉, Observable 이 데이터를 발행하는 속도를 Observer의 소비 속도가 따라가지 못하는 것이다.

이는 결국 메모리가 Overflow 되고 OutOfMemoryError로 이어져 앱이 터질 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.range(1, 1000)
          .map { MyItem(it) }
          .observeOn(Schedulers.io())
          .subscribe {
              println("Received $it")
              runBlocking { delay(50) }
          }

runBlocking { delay(600000) }

data class MyItem(val id: Int) {
    init {
        println("MyItem Created $id")
    }
}

Flowable

  • ReactiveX 2.x (RxKotlin 2.x) 에 추가됨
  • Observable 의 Backpressure 버전
  • 연산자를 위해 최대 128개의 항목을 가질 수 있는 버퍼 제공

Flowable 그럼 언제 쓸까?

  • Flowable 과 Backpressure 는 더 많은 양의 데이터를 처리할 때 도움이 됨 (10,000 개 이상의 아이템을 배출한다면 사용하자)
  • 파일이나 DB를 읽거나 파싱하는 경우
  • 결과를 반환하는 동안 IO 소스의 양을 조절할 수 있는 블로킹을 지원하는 네트워크 IO 작업 / 스트리밍 API 에서 배출할 때 사용

Observable 그럼 언제 쓸까?

  • 소량의 데이터 (10,000 개 미만의 배출)을 다룰 때
  • 오로지 동기 방식으로 작업하길 원하거나 또는 제한된 동시성을 가진 작업을 수행할 때
  • UI 이벤트를 발생시킬 때

Flowable 은 Observable 보다 느리다!

Flowable & Subscriber

  • Flowable 은 Observer 대신 Backpressure 호환이 가능한 Subscriber 를 사용
  • 일부 추가 기능과 Backpressure를 동시에 지원하기 때문에 사용

Flowable 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
val subscriber: Subscriber<Int> = object : Subscriber<Int> {
    override fun onComplete() {
        println("All Complete")
    }

    override fun onSubscribe(s: Subscription?) {
        println("New Subscription")
        s?.request(10)
    }

    override fun onNext(t: Int?) {
        println("Next $t")
    }

    override fun onError(t: Throwable?) {
        println("Error!! ${t?.message}")
    }
} // Subscriber 인스턴스 생성!!

val flowable = Flowable.create<Int>( {
    for (i in 1..10) {
        it.onNext(i)
    }
    it.onComplete()
}, BackpressureStrategy.BUFFER) // Flowable 인스턴스 생성!!

flowable.subscribe(subscriber) // 구독

runBlocking { delay(10000) }

결과

1
2
3
4
5
6
7
8
9
10
11
12
New Subscription
Next 1
Next 2
Next 3
Next 4
Next 5
Next 6
Next 7
Next 8
Next 9
Next 10
All Complete

Flowable.creat() 의 두 번째 매개 변수는 BackpressureStrategy 이다.

  • BackpressureStrategy.MISSING

Backpressure 구현을 사용하지 않으며 다운스트림이 스스로 오버플로우를 처리해야한다.

이 옵션은 onBackpressurexxx() 연산자를 사용할 때 유용하다.

BackpressureStrategy.MISSING

BackpressureStrategy.MISSING 은 backpressure 전략을 구현하지 않으므로

Flowable에게 어떤 전략을 따를지 명시적으로 알려줄 필요가 있다.

onBackpressureBuffer() 연산자

이 연산자는 BackpressureStrategy.BUFFER 의 용도로 사용한다.

버퍼 크기, 크기 제한 여부와 같은 몇 가지 옵션을 얻을 수 있다.

1
2
3
4
5
6
7
8
9
10
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.MISSING)
			.onBackpressureBuffer() // .onBackpressureBuffer(30) 으로 버퍼크기 지정가능!!
      .map { MyItem(it) }
      .observeOn(Schedulers.io())
      .subscribe {
            print("Received $it \t")
            runBlocking { delay(1000) }
        }
runBlocking { delay(100000) }

onBackpressureDrop() 연산자

마찬가지로 BackpressureStrategy.DROP 과 일치한다.

1
2
3
4
5
6
7
8
9
10
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.MISSING)
			.onBackpressureDrop{ print("Dropped $it")}
      .map { MyItem(it) }
      .observeOn(Schedulers.io())
      .subscribe {
            print("Received $it \t")
            runBlocking { delay(1000) }
        }
runBlocking { delay(100000) }

버퍼의 크기가 128이기 때문에 Flowable은 128이후에 배출을 처리하지 못하고 있다.

onBackpressureLatest() 연산자

BackpressureStrategy.LATEST 와 똑같은 방식으로 동작한다.

onBackpressureLatest() 연산자는 따로 추가 설정을 제공하지 않는다.

1
2
3
4
5
6
7
8
9
10
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.MISSING)
			.onBackpressureLatest()
      .map { MyItem(it) }
      .observeOn(Schedulers.io())
      .subscribe {
            print("Received $it \t")
            runBlocking { delay(1000) }
        }
runBlocking { delay(100000) }
  • BackpressureStrategy.ERROR

Backpressure를 구현하지 않는데 다운스트림이 소스를 따라잡을 수 없는 경우, MissingBackpressureException 예외를 발생시킨다.

  • BackpressureStrategy.BUFFER

다운스트림이 배출을 소비할 수 있게 될 때까지 제한이 없는 버퍼에 저장을 한다. 하지만 버퍼 크기를 넘어서는 경우 OutOfMemoryError 가 발생할 수 있다.

  • BackpressureStrategy.DROP

다운스트림이 바쁘고 소비 속도를 계속 유지할 수 없을 때 모든 배출량을 무시한다. 다운스트림이 이전 작업을 끝내고 나서 처음으로 배출된 것을 처리하고 그 사이의 값들은 모두 무시.

  • BackpressureStrategy.LATEST

다운스트림이 바쁘고 배출을 유지할 수 없는 경우 최신 배출량만을 유지하고 나머지는 모두 무시한다. 이전 작업을 마치면 작업이 끝나기 직전의 마지막으로 배출된 것을 수신한다.

Observable 로 Flowable 만들기

toFlowable() 연산자를 통해 Observable을 Flowable로 만들어준다.

1
2
3
4
5
6
7
8
9
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.ERROR) // Flowable 로 변환!
        .map { MyItem(it) }
        .observeOn(Schedulers.io())
        .subscribe {
            print("Received $it \t")
            runBlocking { delay(1000) }
        }
runBlocking { delay(100000) }

Flowable.generate()

Flowable.create() 와 유사하지만 약간의 차이점이 있다.

  • Flowable.create() 와는 다르게 Flowable.generate()는 요청 시 아이템을 생성하고 이를 배출한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    val flowable = Flowable.generate<Int> {
        it.onNext(GenerateFlowableItem.item)
    }
    
    flowable
            .map { MyItem(it) }
            .observeOn(Schedulers.io())
            .subscribe {
                runBlocking { delay(100) }
                println("Next $it")
            }
    runBlocking { delay(700000) }

}

data class MyItem(val id: Int) {
    init {
        println("MyItem Created $id")
    }
}

object GenerateFlowableItem {
    var item = 0
    get() {
        field += 1
        return field
    }
}

Flowable 은 첫번째로 128개의 항목을 배출한 다음 ,다운스트림이 96개 아이템을 처리하기 위해 기다린 후 다시 Flowable이 128 개의 아이템을 배출하는 주기가 계속된다.

ConnectableFlowable

Observable 처럼 ConnectableFlowable은 Flowable과 유사하지만 구독 시점에 아이템 배출을 시작하지 않고 connect() 메서드가 호출될 때 시작한다.

이렇게 하면 Flowable이 아이템을 배출하기 전에 의도한 모든 구독자가 Flowable.subscribe() 을 기다리도록 할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val connectableFlowable = listOf("1", "2", "3", "4", "5", "6").toFlowable().publish()
connectableFlowable.
        subscribe {
            println("Subscription 1 값-> $it")
            runBlocking { delay(1000) }
            println("Subscription 1 delay")
        }

connectableFlowable.
        subscribe {
            println("Subscription 2 값-> $it")
            runBlocking { delay(1200) }
            println("Subscription 2 delay")
        }
connectableFlowable.connect()

Processor

  • Backpressure 를 지원하는 Subject 이다.

가장 간단하게 동작하는 PublishProcessor 이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val connectableFlowable = listOf(1, 2, 3, 4, 5, 6, 7).toFlowable()
    val publishPrecessor = PublishProcessor.create<Int>()

    publishPrecessor.subscribe {
        println("Subscription 1 -> $it")
        runBlocking { delay(1000) }
        println("Subscription 1 delay")
    }

    publishPrecessor.subscribe {
        println("Subscription 2-> $it")
    }

   connectableFlowable.subscribe(publishPrecessor)

결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Subscription 1 -> 1
Subscription 1 delay
Subscription 2-> 1
Subscription 1 -> 2
Subscription 1 delay
Subscription 2-> 2
Subscription 1 -> 3
Subscription 1 delay
Subscription 2-> 3
Subscription 1 -> 4
Subscription 1 delay
Subscription 2-> 4
Subscription 1 -> 5
Subscription 1 delay
Subscription 2-> 5

buffer() 연산자

컨슈머가 소비할 때까지 배출을 버퍼링하는 onBackpressureBuffer() 연산자와는 달리, buffer() 연산자는 배출을 모아서 리스트나 다른 컬렉션 유형으로 전달한다.

1
2
3
4
5
val flowable = Flowable.range(1, 111)
flowable.buffer(10) // 버퍼의 크기 10으로 설정!!
        .subscribe {
            println(it)
        }

결과

1
2
3
4
5
6
7
8
9
10
11
12
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
[101, 102, 103, 104, 105, 106, 107, 108, 109, 110]
[111]

buffer() 연산자에는 skip 변수를 설정할 수 있다.

1
2
3
4
5
val flowable = Flowable.range(1, 111)
flowable.buffer(10, 15)
        .subscribe {
            println(it)
        }

결과

1
2
3
4
5
6
7
8
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[16, 17, 18, 19, 20, 21, 22, 23, 24, 25]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[46, 47, 48, 49, 50, 51, 52, 53, 54, 55]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[76, 77, 78, 79, 80, 81, 82, 83, 84, 85]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
[106, 107, 108, 109, 110, 111]

이 경우 구독당 5개 항목을 건너뛴다.

아래처럼 일정 시간 간격으로 배출할 수 있다.

1
2
3
4
5
val flowable = Flowable.interval(1, TimeUnit.SECONDS)
flowable.buffer(3, TimeUnit.SECONDS)
        .subscribe {
            println(it)
        }

결과

1
2
3
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]

window() 연산자

window() 연산자는 아이템을 컬렉션 형태로 버퍼링하는 대신 다른 프로듀서 형태로 버퍼링 한다는 점만 빼면 buffer()와 거의 유사하다.

1
2
3
4
5
6
7
val flowable = Flowable.range(1, 111)
flowable.window(10).subscribe {
            flowableInstance -> flowableInstance.subscribe {
            print("$it ,")
        }
            println()
}

결과

1
2
3
4
5
6
7
8
9
10
11
12
1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10 ,
11 ,12 ,13 ,14 ,15 ,16 ,17 ,18 ,19 ,20 ,
21 ,22 ,23 ,24 ,25 ,26 ,27 ,28 ,29 ,30 ,
31 ,32 ,33 ,34 ,35 ,36 ,37 ,38 ,39 ,40 ,
41 ,42 ,43 ,44 ,45 ,46 ,47 ,48 ,49 ,50 ,
51 ,52 ,53 ,54 ,55 ,56 ,57 ,58 ,59 ,60 ,
61 ,62 ,63 ,64 ,65 ,66 ,67 ,68 ,69 ,70 ,
71 ,72 ,73 ,74 ,75 ,76 ,77 ,78 ,79 ,80 ,
81 ,82 ,83 ,84 ,85 ,86 ,87 ,88 ,89 ,90 ,
91 ,92 ,93 ,94 ,95 ,96 ,97 ,98 ,99 ,100 ,
101 ,102 ,103 ,104 ,105 ,106 ,107 ,108 ,109 ,110 ,
111 ,

window() 연산자는 새로운 Flowable 인스턴스로 10개의 배출을 버퍼링한다.

throttle() 연산자

throttle() 연산자는 배출을 생략한다.

1
2
3
val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
flowable.throttleFirst(200, TimeUnit.MILLISECONDS)
        .subscribe { println(it) }

결과

1
2
3
4
0
3
6
9

throttleFirst 는 200 밀리초마다 발생하는 첫 번째 값을 배출한다.