[Kotlin] RxKotlin Study 05

Reactive Prgramming in Kotlin

Posted by JungHoon-Park on March 16, 2021

RxKotlin 공부하기


스케줄러를 사용한 동시성과 병렬처리

스케줄러 ?

ReactiveX 에서 기본적으로 옵저버블과 이에 적용된 연산자 체인은 subscribe이 호출된 동일한 스레드에서 작업을 수행하며, 옵저버가 onComplete or onError 알림을 수신할 때까지 스레드가 차단된다.

스케줄러는 스레드 풀로 생각하면 된다. 스케줄러를 사용해 ReactiveX는 스레드풀을 생성하고 스레드를 실행할 수 있다. ReactiveX를 통해 멀티 스레딩과 동시성 구현을 손쉽게 할 수 있다.

스케줄러의 종류

스레드 풀 관리를 위한 추상화 계층으로서, 스케줄러 API 는 미리 구성된 스케줄러를 제공한다.

스케줄러가 없는 코드

Observable.range(1,10)
				//.subscribeOn(Schedulers.computation())
        .subscribe{
			runBlocking{delay(200)}
			println("Observable1 Item Received $it / current Thread1 -> ${Thread.currentThread().name}")
}

Observable.range(21,10)
				//.subscribeOn(Schedulers.computation())
        .subscribe{
			runBlocking{delay(100)}
			println("Observable2 Item Received $it / current Thread2 -> ${Thread.currentThread().name}")
}

결과

Observable1 Item Received 1 / current Thread1 -> main
Observable1 Item Received 2 / current Thread1 -> main
Observable1 Item Received 3 / current Thread1 -> main
Observable1 Item Received 4 / current Thread1 -> main
Observable1 Item Received 5 / current Thread1 -> main
Observable1 Item Received 6 / current Thread1 -> main
Observable1 Item Received 7 / current Thread1 -> main
Observable1 Item Received 8 / current Thread1 -> main
Observable1 Item Received 9 / current Thread1 -> main
Observable1 Item Received 10 / current Thread1 -> main
Observable2 Item Received 21 / current Thread2 -> main
Observable2 Item Received 22 / current Thread2 -> main
Observable2 Item Received 23 / current Thread2 -> main
Observable2 Item Received 24 / current Thread2 -> main
Observable2 Item Received 25 / current Thread2 -> main
Observable2 Item Received 26 / current Thread2 -> main
Observable2 Item Received 27 / current Thread2 -> main
Observable2 Item Received 28 / current Thread2 -> main
Observable2 Item Received 29 / current Thread2 -> main
Observable2 Item Received 30 / current Thread2 -> main

Schedulers.io() - I/O 연관 스케줄러

I/O 관련 작업을 수행할 수 있는 무제한의 워커 스레드를 생성하는 스레드풀을 제공한다.

이 스레드풀의 블로킹이고 I/O 작업을 더 많이 수행하도록 작성했기 때문에 계산 집약적인 작업보다 CPU 부하는 적지만 대기 중인 I/O 작업으로 인해 조금 더 오래 걸릴 수 있다.

메모리가 허용하는 무제한의 스레드를 생성해 OutOfMemory 오류를 일으킬 수 있으므로 이 스케줄러를 사용할 때는 주의해야 한다.

Schedulers.computation() - CPU 연관 스케줄러

사용 가능한 CPU 코어와 동일한 수의 스레드를 가지는 제한된 스레드풀을 제공한다. (CPU를 주로 사용하는 작업을 위한 것)

Schedulers.computation() 는 CPU 집중적인 작업에만 사용해야한다.

이 스케줄러의 스레드가 CPU 코어를 사용 중인 상태로 유지하는데, I/O 관련이나 계산과 관련되지 않은 작업에 사용되는 경우 전체 애플리케이션의 속도를 저하 시킬 수 있기 때문이다.

계산 목적에는 Schedulers.computation() 을 고려해야 하는 이유는 스레드가 프로세서를 더 잘 활용하며 사용 가능한 CPU 코어보다 더 많은 스레드를 생성하지 않고 스레드를 재사용 하기 때문이다.

Schedulers.newThread()

제공된 각 작업에 대해 새 스레드를 만드는 스케줄러를 제공한다.

  • Schedulers.io() 와의 차이 ?

Schedulers.io() 는 스레드 풀을 사용하고 새로운 작업을 할당받을 때마다 먼저 스레드 풀을 조사해 유휴 스레드가 해당 작업을 실행할 수 있는지 확인한다. 작업을 시작하기 위해 기존의 스레드를 사용할 수 없으면 새로운 스레드를 생성한다.

Schedulers.newThread() 는 스레드 풀을 사용하지 않는다. 때문에 모든 요청에 대해 새로운 스레드를 생성하고 그 사실을 잊어버린다. 스레드를 새로 생성하는 것은 자원이 많이 들기 때문에 해당 스케줄러는 사용하지 않는 것이 좋다!

Schedulers.single()

하나의 스레드만 포함하는 스케줄러를 제공하고 모든 호출에 대해 단일 인스턴스를 반환한다.

하나의 스레드만 제공하기 때문에 순차적으로 작업을 실행해야 하는 상황에 적합한 스케줄러이다.

Schedulers.trampoline()

Schedulers.single() , Schedulers.trampoline() 은 모두 순차적으로 실행된다.

Schedulers.single()는 모든 작업이 순차적으로 실행되도록 보장하지만 호출된 스레드와 병렬로 실행될 수 있다.

Schdeulers.trampoline() 은 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 큐를 생성하는 스케줄러이다. 새로운 스레드를 생성하지 않는다 + 대기행렬을 자동으로 만들어 주는 것이 가장 큰 차이이다.

Schedulers.single() vs Schedulers.trampoline()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
GlobalScope.async{
Observable.range(1, 10)
            .subscribeOn(Schedulers.single())//(1)
            .subscribe{
				runBlocking{delay(200)}
				println("Observable1 Item Received $it / Current Thread -> ${Thread.currentThread().name}")
}

Observable.range(21, 10)
            .subscribeOn(Schedulers.single())//(2)
            .subscribe{
								runBlocking{delay(100)}
								println("Observable2 Item Received $it / Current Thread -> ${Thread.currentThread().name}")
}

for (i in 1..10) {
        delay(100)
				println("Blocking Thread $i / Current Thread -> ${Thread.currentThread().name}")
    }
}

runBlocking{delay(6000)}

Schedulers.single() 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Blocking Thread 1 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 2 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 1 / Current Thread -> RxSingleScheduler-1
Blocking Thread 3 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 2 / Current Thread -> RxSingleScheduler-1
Blocking Thread 4 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 5 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 3 / Current Thread -> RxSingleScheduler-1
Blocking Thread 6 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 7 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 4 / Current Thread -> RxSingleScheduler-1
Blocking Thread 8 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 9 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 5 / Current Thread -> RxSingleScheduler-1
Blocking Thread 10 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 6 / Current Thread -> RxSingleScheduler-1
Observable1 Item Received 7 / Current Thread -> RxSingleScheduler-1
Observable1 Item Received 8 / Current Thread -> RxSingleScheduler-1
Observable1 Item Received 9 / Current Thread -> RxSingleScheduler-1
Observable1 Item Received 10 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 21 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 22 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 23 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 24 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 25 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 26 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 27 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 28 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 29 / Current Thread -> RxSingleScheduler-1
Observable2 Item Received 30 / Current Thread -> RxSingleScheduler-1

결과를 보면 두 개의 구독이 순차적으로 실행되더라도 호출 스레드와 병렬로 실행된다는 사실을 알 수 있다.

Schedulers.trampoline() 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Observable1 Item Received 1 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 2 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 3 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 4 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 5 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 6 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 7 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 8 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 9 / Current Thread -> DefaultDispatcher-worker-1
Observable1 Item Received 10 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 21 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 22 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 23 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 24 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 25 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 26 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 27 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 28 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 29 / Current Thread -> DefaultDispatcher-worker-1
Observable2 Item Received 30 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 1 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 2 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 3 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 4 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 5 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 6 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 7 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 8 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 9 / Current Thread -> DefaultDispatcher-worker-1
Blocking Thread 10 / Current Thread -> DefaultDispatcher-worker-1

호출 스레드로 순차적으로 실행됨을 알 수 있다!

Schedulers.from

위에서 설명한 스케줄러들은 기본적으로 미리 정의된 스케줄러이다.

그러나 애플리케이션을 개발하는 동안 사용자가 정의한 스케줄러를 원할 수도 있다.

ReactiveX 에서 모든 실행 프로그램을 스케줄러로 변환할 수 있는 Schedulers.from(executor: Executor) 를 제공

스케줄러 사용법 [ subscribeOn, observeOn ]

Observable 연산자 체인에 멀티스레딩을 적용하고 싶다면, 특정 스케줄러를 사용해서 연산자(또는 특정 Observable)를 실행하면 된다.

ReactiveX의 일부 Observable 연산자는 사용할 스케줄러를 파라미터로 전달 받기도 하는데, 이 연산자들은 자신이 처리할 연산의 일부 또는 전체를 전달된 스케줄러 내에서 실행한다.

기본적으로, Observable과 연산자 체인은 이처럼 스케줄러를 통해 동작하고 Subscribe 메서드가 호출되는 스레드를 사용해서 옵저버에게 알림을 보낸다. SubscribeOn 연산자는 다른 스케줄러를 지정해서 Observable이 처리해야 할 연산자들을 실행 시킨다. 그리고, ObserveOn 연산자는 Observable이 옵저버에게 알림을 보낼때 사용 할 스케줄러를 명시한다. 아래 그림이 보여주듯, SubscribeOn 연산자는 Observable이 연산을 위해 사용할 스레드를 지정하며, 연산자 체인 중 아무 곳에서 호출해도 문제되지 않는다. 하지만, ObserveOn 연산자는 연산자 체인 중 Observable이 사용할 스레드가 호출 체인 중 어느 시점에서 할당되는지에 따라 그 후에 호출되는 연산자는 영향을 받는다. 그렇기 때문에, 어쩌면 여러분은 특정 연산자를 별도의 스레드에서 실행 시키기 위해 연산자 체인 중 한 군데 이상에서ObserveOn을 호출하게 될 것이다.

SubscribeOn → Observable 이 연산을 위해 사용할 스레드를 지정

ObserveOn → Observable 이 옵저버에게 알림을 보낼 때 사용해야할 스케줄러 명시

구독 시 스레드 변경 : subscribeOn 연산자

subscribeOn 미사용

1
2
3
4
5
6
7
8
9
10
11
12
13
listOf("1","2","3","4","5","6","7","8","9","10")
            .toObservable()
            .map{
				item ->
				println("Mapping $item - ${Thread.currentThread().name}")
                return@map item.toInt()
			}
//          .subscribeOn(Schedulers.computation())//(1)
            .subscribe{
				item -> println("Received $item - ${Thread.currentThread().name}")
}

runBlocking{delay(1000)}

결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Mapping 1 - main
Received 1 - main
Mapping 2 - main
Received 2 - main
Mapping 3 - main
Received 3 - main
Mapping 4 - main
Received 4 - main
Mapping 5 - main
Received 5 - main
Mapping 6 - main
Received 6 - main
Mapping 7 - main
Received 7 - main
Mapping 8 - main
Received 8 - main
Mapping 9 - main
Received 9 - main
Mapping 10 - main
Received 10 - main

subscribeOn 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Mapping 1 - RxComputationThreadPool-1
Received 1 - RxComputationThreadPool-1
Mapping 2 - RxComputationThreadPool-1
Received 2 - RxComputationThreadPool-1
Mapping 3 - RxComputationThreadPool-1
Received 3 - RxComputationThreadPool-1
Mapping 4 - RxComputationThreadPool-1
Received 4 - RxComputationThreadPool-1
Mapping 5 - RxComputationThreadPool-1
Received 5 - RxComputationThreadPool-1
Mapping 6 - RxComputationThreadPool-1
Received 6 - RxComputationThreadPool-1
Mapping 7 - RxComputationThreadPool-1
Received 7 - RxComputationThreadPool-1
Mapping 8 - RxComputationThreadPool-1
Received 8 - RxComputationThreadPool-1
Mapping 9 - RxComputationThreadPool-1
Received 9 - RxComputationThreadPool-1
Mapping 10 - RxComputationThreadPool-1
Received 10 - RxComputationThreadPool-1

subscribeOn 연산자는 전체 구독에 대한 스레드를 변경한다. 구독의 흐름 중 원하는 곳 어디에서나 사용할 수 있다.

다른 스레드에서 관찰 : observeOn 연산자

1
2
3
4
5
6
7
8
9
10
11
12
13
14
listOf("1","2","3","4","5","6","7","8","9","10")
        .toObservable()
        .subscribeOn(Schedulers.computation())//(1)
        .map {
			item->
			println("Mapping $item - ${Thread.currentThread().name}")
            return@map item.toInt()
		}
		.observeOn(Schedulers.io())//(2)
        .subscribe {
			item->println("Received $item - ${Thread.currentThread().name}")
		}

runBlocking{delay(1000)}

결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Mapping 1 - RxComputationThreadPool-1
Mapping 2 - RxComputationThreadPool-1
Mapping 3 - RxComputationThreadPool-1
Mapping 4 - RxComputationThreadPool-1
Mapping 5 - RxComputationThreadPool-1
Mapping 6 - RxComputationThreadPool-1
Received 1 - RxCachedThreadScheduler-1
Mapping 7 - RxComputationThreadPool-1
Received 2 - RxCachedThreadScheduler-1
Mapping 8 - RxComputationThreadPool-1
Received 3 - RxCachedThreadScheduler-1
Mapping 9 - RxComputationThreadPool-1
Received 4 - RxCachedThreadScheduler-1
Mapping 10 - RxComputationThreadPool-1
Received 5 - RxCachedThreadScheduler-1
Received 6 - RxCachedThreadScheduler-1
Received 7 - RxCachedThreadScheduler-1
Received 8 - RxCachedThreadScheduler-1
Received 9 - RxCachedThreadScheduler-1
Received 10 - RxCachedThreadScheduler-1

observeOn 연산자는 모든 연산자에 스케줄러를 지정한다.

Schedulers.computation() → map 연산 수행

Schedulers.io() → 구독 결과를 받는다.

1) → map 연산자 바로 앞에 subscribeOn( Schedulers.computation() ) 을 호출해 computation 스레드를 지정

2) → 결과를 받기전에 observeOn ( Schedulers.io() ) 를 호출해 io 스레드로 전환