RxKotlin 공부하기
백프레셔와 플로어블 소개
- 배압(Backpressure)
데이터 생산과 소비가 불균형적일 때 일어나는 현상
만약 10000개의 데이터를 0.1초마다 발행하고, 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓인다.
즉, Observable 이 데이터를 발행하는 속도를 Observer의 소비 속도가 따라가지 못하는 것이다.
이는 결국 메모리가 Overflow 되고 OutOfMemoryError로 이어져 앱이 터질 것이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.range(1, 1000)
.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe {
println("Received $it")
runBlocking { delay(50) }
}
runBlocking { delay(600000) }
data class MyItem(val id: Int) {
init {
println("MyItem Created $id")
}
}
Flowable
- ReactiveX 2.x (RxKotlin 2.x) 에 추가됨
- Observable 의 Backpressure 버전
- 연산자를 위해 최대 128개의 항목을 가질 수 있는 버퍼 제공
Flowable 그럼 언제 쓸까?
- Flowable 과 Backpressure 는 더 많은 양의 데이터를 처리할 때 도움이 됨 (10,000 개 이상의 아이템을 배출한다면 사용하자)
- 파일이나 DB를 읽거나 파싱하는 경우
- 결과를 반환하는 동안 IO 소스의 양을 조절할 수 있는 블로킹을 지원하는 네트워크 IO 작업 / 스트리밍 API 에서 배출할 때 사용
Observable 그럼 언제 쓸까?
- 소량의 데이터 (10,000 개 미만의 배출)을 다룰 때
- 오로지 동기 방식으로 작업하길 원하거나 또는 제한된 동시성을 가진 작업을 수행할 때
- UI 이벤트를 발생시킬 때
Flowable 은 Observable 보다 느리다!
Flowable & Subscriber
- Flowable 은 Observer 대신 Backpressure 호환이 가능한 Subscriber 를 사용
- 일부 추가 기능과 Backpressure를 동시에 지원하기 때문에 사용
Flowable 생성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
val subscriber: Subscriber<Int> = object : Subscriber<Int> {
override fun onComplete() {
println("All Complete")
}
override fun onSubscribe(s: Subscription?) {
println("New Subscription")
s?.request(10)
}
override fun onNext(t: Int?) {
println("Next $t")
}
override fun onError(t: Throwable?) {
println("Error!! ${t?.message}")
}
} // Subscriber 인스턴스 생성!!
val flowable = Flowable.create<Int>( {
for (i in 1..10) {
it.onNext(i)
}
it.onComplete()
}, BackpressureStrategy.BUFFER) // Flowable 인스턴스 생성!!
flowable.subscribe(subscriber) // 구독
runBlocking { delay(10000) }
결과
1
2
3
4
5
6
7
8
9
10
11
12
New Subscription
Next 1
Next 2
Next 3
Next 4
Next 5
Next 6
Next 7
Next 8
Next 9
Next 10
All Complete
Flowable.creat() 의 두 번째 매개 변수는 BackpressureStrategy 이다.
- BackpressureStrategy.MISSING
Backpressure 구현을 사용하지 않으며 다운스트림이 스스로 오버플로우를 처리해야한다.
이 옵션은 onBackpressurexxx() 연산자를 사용할 때 유용하다.
BackpressureStrategy.MISSING
BackpressureStrategy.MISSING 은 backpressure 전략을 구현하지 않으므로
Flowable에게 어떤 전략을 따를지 명시적으로 알려줄 필요가 있다.
onBackpressureBuffer() 연산자
이 연산자는 BackpressureStrategy.BUFFER 의 용도로 사용한다.
버퍼 크기, 크기 제한 여부와 같은 몇 가지 옵션을 얻을 수 있다.
1
2
3
4
5
6
7
8
9
10
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureBuffer() // .onBackpressureBuffer(30) 으로 버퍼크기 지정가능!!
.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe {
print("Received $it \t")
runBlocking { delay(1000) }
}
runBlocking { delay(100000) }
onBackpressureDrop() 연산자
마찬가지로 BackpressureStrategy.DROP 과 일치한다.
1
2
3
4
5
6
7
8
9
10
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureDrop{ print("Dropped $it")}
.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe {
print("Received $it \t")
runBlocking { delay(1000) }
}
runBlocking { delay(100000) }
버퍼의 크기가 128이기 때문에 Flowable은 128이후에 배출을 처리하지 못하고 있다.
onBackpressureLatest() 연산자
BackpressureStrategy.LATEST 와 똑같은 방식으로 동작한다.
onBackpressureLatest() 연산자는 따로 추가 설정을 제공하지 않는다.
1
2
3
4
5
6
7
8
9
10
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.MISSING)
.onBackpressureLatest()
.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe {
print("Received $it \t")
runBlocking { delay(1000) }
}
runBlocking { delay(100000) }
- BackpressureStrategy.ERROR
Backpressure를 구현하지 않는데 다운스트림이 소스를 따라잡을 수 없는 경우, MissingBackpressureException 예외를 발생시킨다.
- BackpressureStrategy.BUFFER
다운스트림이 배출을 소비할 수 있게 될 때까지 제한이 없는 버퍼에 저장을 한다. 하지만 버퍼 크기를 넘어서는 경우 OutOfMemoryError 가 발생할 수 있다.
- BackpressureStrategy.DROP
다운스트림이 바쁘고 소비 속도를 계속 유지할 수 없을 때 모든 배출량을 무시한다. 다운스트림이 이전 작업을 끝내고 나서 처음으로 배출된 것을 처리하고 그 사이의 값들은 모두 무시.
- BackpressureStrategy.LATEST
다운스트림이 바쁘고 배출을 유지할 수 없는 경우 최신 배출량만을 유지하고 나머지는 모두 무시한다. 이전 작업을 마치면 작업이 끝나기 직전의 마지막으로 배출된 것을 수신한다.
Observable 로 Flowable 만들기
toFlowable() 연산자를 통해 Observable을 Flowable로 만들어준다.
1
2
3
4
5
6
7
8
9
val source = Observable.range(1, 1000)
source.toFlowable(BackpressureStrategy.ERROR) // Flowable 로 변환!
.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe {
print("Received $it \t")
runBlocking { delay(1000) }
}
runBlocking { delay(100000) }
Flowable.generate()
Flowable.create() 와 유사하지만 약간의 차이점이 있다.
- Flowable.create() 와는 다르게 Flowable.generate()는 요청 시 아이템을 생성하고 이를 배출한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
val flowable = Flowable.generate<Int> {
it.onNext(GenerateFlowableItem.item)
}
flowable
.map { MyItem(it) }
.observeOn(Schedulers.io())
.subscribe {
runBlocking { delay(100) }
println("Next $it")
}
runBlocking { delay(700000) }
}
data class MyItem(val id: Int) {
init {
println("MyItem Created $id")
}
}
object GenerateFlowableItem {
var item = 0
get() {
field += 1
return field
}
}
Flowable 은 첫번째로 128개의 항목을 배출한 다음 ,다운스트림이 96개 아이템을 처리하기 위해 기다린 후 다시 Flowable이 128 개의 아이템을 배출하는 주기가 계속된다.
ConnectableFlowable
Observable 처럼 ConnectableFlowable은 Flowable과 유사하지만 구독 시점에 아이템 배출을 시작하지 않고 connect() 메서드가 호출될 때 시작한다.
이렇게 하면 Flowable이 아이템을 배출하기 전에 의도한 모든 구독자가 Flowable.subscribe() 을 기다리도록 할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val connectableFlowable = listOf("1", "2", "3", "4", "5", "6").toFlowable().publish()
connectableFlowable.
subscribe {
println("Subscription 1 값-> $it")
runBlocking { delay(1000) }
println("Subscription 1 delay")
}
connectableFlowable.
subscribe {
println("Subscription 2 값-> $it")
runBlocking { delay(1200) }
println("Subscription 2 delay")
}
connectableFlowable.connect()
Processor
- Backpressure 를 지원하는 Subject 이다.
가장 간단하게 동작하는 PublishProcessor 이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val connectableFlowable = listOf(1, 2, 3, 4, 5, 6, 7).toFlowable()
val publishPrecessor = PublishProcessor.create<Int>()
publishPrecessor.subscribe {
println("Subscription 1 -> $it")
runBlocking { delay(1000) }
println("Subscription 1 delay")
}
publishPrecessor.subscribe {
println("Subscription 2-> $it")
}
connectableFlowable.subscribe(publishPrecessor)
결과
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Subscription 1 -> 1
Subscription 1 delay
Subscription 2-> 1
Subscription 1 -> 2
Subscription 1 delay
Subscription 2-> 2
Subscription 1 -> 3
Subscription 1 delay
Subscription 2-> 3
Subscription 1 -> 4
Subscription 1 delay
Subscription 2-> 4
Subscription 1 -> 5
Subscription 1 delay
Subscription 2-> 5
buffer() 연산자
컨슈머가 소비할 때까지 배출을 버퍼링하는 onBackpressureBuffer() 연산자와는 달리, buffer() 연산자는 배출을 모아서 리스트나 다른 컬렉션 유형으로 전달한다.
1
2
3
4
5
val flowable = Flowable.range(1, 111)
flowable.buffer(10) // 버퍼의 크기 10으로 설정!!
.subscribe {
println(it)
}
결과
1
2
3
4
5
6
7
8
9
10
11
12
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
[101, 102, 103, 104, 105, 106, 107, 108, 109, 110]
[111]
buffer() 연산자에는 skip 변수를 설정할 수 있다.
1
2
3
4
5
val flowable = Flowable.range(1, 111)
flowable.buffer(10, 15)
.subscribe {
println(it)
}
결과
1
2
3
4
5
6
7
8
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[16, 17, 18, 19, 20, 21, 22, 23, 24, 25]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[46, 47, 48, 49, 50, 51, 52, 53, 54, 55]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[76, 77, 78, 79, 80, 81, 82, 83, 84, 85]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
[106, 107, 108, 109, 110, 111]
이 경우 구독당 5개 항목을 건너뛴다.
아래처럼 일정 시간 간격으로 배출할 수 있다.
1
2
3
4
5
val flowable = Flowable.interval(1, TimeUnit.SECONDS)
flowable.buffer(3, TimeUnit.SECONDS)
.subscribe {
println(it)
}
결과
1
2
3
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
window() 연산자
window() 연산자는 아이템을 컬렉션 형태로 버퍼링하는 대신 다른 프로듀서 형태로 버퍼링 한다는 점만 빼면 buffer()와 거의 유사하다.
1
2
3
4
5
6
7
val flowable = Flowable.range(1, 111)
flowable.window(10).subscribe {
flowableInstance -> flowableInstance.subscribe {
print("$it ,")
}
println()
}
결과
1
2
3
4
5
6
7
8
9
10
11
12
1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10 ,
11 ,12 ,13 ,14 ,15 ,16 ,17 ,18 ,19 ,20 ,
21 ,22 ,23 ,24 ,25 ,26 ,27 ,28 ,29 ,30 ,
31 ,32 ,33 ,34 ,35 ,36 ,37 ,38 ,39 ,40 ,
41 ,42 ,43 ,44 ,45 ,46 ,47 ,48 ,49 ,50 ,
51 ,52 ,53 ,54 ,55 ,56 ,57 ,58 ,59 ,60 ,
61 ,62 ,63 ,64 ,65 ,66 ,67 ,68 ,69 ,70 ,
71 ,72 ,73 ,74 ,75 ,76 ,77 ,78 ,79 ,80 ,
81 ,82 ,83 ,84 ,85 ,86 ,87 ,88 ,89 ,90 ,
91 ,92 ,93 ,94 ,95 ,96 ,97 ,98 ,99 ,100 ,
101 ,102 ,103 ,104 ,105 ,106 ,107 ,108 ,109 ,110 ,
111 ,
window() 연산자는 새로운 Flowable 인스턴스로 10개의 배출을 버퍼링한다.
throttle() 연산자
throttle() 연산자는 배출을 생략한다.
1
2
3
val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
flowable.throttleFirst(200, TimeUnit.MILLISECONDS)
.subscribe { println(it) }
결과
1
2
3
4
0
3
6
9
throttleFirst 는 200 밀리초마다 발생하는 첫 번째 값을 배출한다.