-
Kotlin) Coroutine 공식 가이드 번역 06 - ChannelsKotlin 2021. 1. 25. 20:39
Channels
Deffered는 코루틴 간에 단일 값을 편하게 전달하는 방법을 제공하지만,
Channel은 Stream의 값을 전달하는 방법을 제공합니다.
Channel은 Blocking Queue와 유사하게 동작합니다. 차이점은 아래와 같습니다.
Blocking Queue Channel put send take receive 동시성이 필요한 여러 코루틴에서 순서를 보장받으면서 공유하여 사용할 수 있습니다.
code
fun main() = runBlocking { val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x * x) } repeat(5) { println(channel.receive()) } println("Done!") }
result
1 4 9 16 25 Done!
Closing and iteration over channels
Blocking Queue와 다르게 Channel은 더 이상 사용하지 않을 때 채널을 close로 닫음으로서 더 이상 오지 않음을 표현할 수 있습니다.
Channel은 stream을 반환하기 때문에 close를 시킬 경우 반환 값을 받아 사용하는 for-loop에서 Channel 사용이 끝났음을 알 수 있습니다.
code
fun main() = runBlocking { val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x * x) channel.close() } for (y in channel) println(y) println("Done!") }
result
1 4 9 16 25 Done!
close는 특별한 token의 형태로 이 값 역시 channel에 들어갑니다.
그래서 iterator는 이 값을 만나면 iteration을 멈춥니다. 즉 close를 호출해도 그 이전에 넣은 값들은 받아올 수 있음을 보장합니다.
Building channel producers
코루틴으로 흔하게 사용하는 형태중 하나가 sequential 한 데이터를 생산하는 쪽과 소비하는 쪽을 구현하는 producer-consumer 패턴입니다.
생산하는 형태를 쉽게 구현하도록 제공하는 coroutine builder로 produce가 있습니다.
소비하는 쪽에서 사용하는 extension function으로 cosumeEach가 있습니다.(for-loop 대체 가능)
code
fun main() = runBlocking { val squares = produceSquares() squares.consumeEach { println(it) } println("Done!") } fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { for (x in 1..5) send(x * x) }
result
1 4 9 16 25 Done!
위의 코드는 Coroutine Scope의 확장 함수로 만든 경우입니다.
확장함수로 만들고 싶지 않다면 아래의 코드처럼 만들 수 있습니다.
code
fun produceSquares2(coroutineScope: CoroutineScope): ReceiveChannel<Int> = coroutineScope.produce(Dispatchers.Default) { for (x in 1..5) send(x * x) }
Pipelines
무한한 sequence를 만들어내고 동시에 이를 소비하는 패턴을 pipelining으로 만들 수 있습니다.
하나의 코루틴이 데이터 스트림을 생산해내고 다른 하나 이상의 코루틴들이 이 스트림을 수신 받아 필요한 작업을 수행 한 후 가공된 결과를 다시 전송하는 패턴을 말합니다.
code
fun main() = runBlocking { val numbers = produceNumbers() val squares = square(numbers) for (i in 1..5) println(squares.receive()) println("Done!") coroutineContext.cancelChildren() } fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x * x) } fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce<Int> { for (x in numbers) send(x * x) }
produceNumbers() 함수에서 수를 무한하게 생산하고, square() 함수에서 넘겨받은 channel에서 값을 소비하여 제곱합니다.
produce를 사용하기 때문에 함수 실행이 끝날 때까지 blocking 되는 게 아니라 결괏값이 생성될 때마다 async 하게 channel로 보내지고,
for문에서 5개를 recieve 할 때까지 대기합니다.
Prime numbers with pipeline
pipleline을 이용해 소수를 찾는 예제입니다.
code
fun main() = runBlocking { var cur = numbersFrom(2) for (i in 1..10) { val prime = cur.receive() println(prime) cur = filter(cur, prime) } coroutineContext.cancelChildren() } fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { var x = start while (true) send(x++) } fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { for (x in numbers) if (x % prime != 0) send(x) }
result
2 3 5 7 11 13 17 19 23 29
소수를 찾는 예제입니다. cancelChildren을 통해 10개까지 출력하고 모든 하위 코루틴을 취합니다.
예제의 이해를 돕기 위해 아래의 코드처럼 로그를 찍어 출력할 수 있습니다.
code
fun main() = runBlocking { var cur = numbersFrom(2) for (i in 1..10) { val prime = cur.receive() println(prime) cur = filter(cur, prime, i) } coroutineContext.cancelChildren() } fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { var x = start while (true) send(x++) } fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int, sequence: Int): ReceiveChannel<Int> = produce { for (x in numbers) { println("Sequence#$sequence, number: $x, prime:$prime") if (x % prime != 0) { send(x) } } }
결과는 아래에서 확인해주세요.
result
더보기2
Sequence#1, number: 3, prime:2
3
Sequence#1, number: 4, prime:2
Sequence#1, number: 5, prime:2
Sequence#2, number: 5, prime:3
Sequence#1, number: 6, prime:2
5
Sequence#1, number: 7, prime:2
Sequence#1, number: 8, prime:2
Sequence#2, number: 7, prime:3
Sequence#3, number: 7, prime:5
Sequence#1, number: 9, prime:2
Sequence#1, number: 10, prime:2
7
Sequence#2, number: 9, prime:3
Sequence#1, number: 11, prime:2
Sequence#1, number: 12, prime:2
Sequence#2, number: 11, prime:3
Sequence#3, number: 11, prime:5
Sequence#1, number: 13, prime:2
Sequence#1, number: 14, prime:2
Sequence#4, number: 11, prime:7
Sequence#2, number: 13, prime:3
11
Sequence#3, number: 13, prime:5
Sequence#1, number: 15, prime:2
Sequence#1, number: 16, prime:2
Sequence#4, number: 13, prime:7
Sequence#2, number: 15, prime:3
Sequence#5, number: 13, prime:11
Sequence#1, number: 17, prime:2
Sequence#1, number: 18, prime:2
13
Sequence#2, number: 17, prime:3
Sequence#3, number: 17, prime:5
Sequence#1, number: 19, prime:2
Sequence#1, number: 20, prime:2
Sequence#4, number: 17, prime:7
Sequence#2, number: 19, prime:3
Sequence#5, number: 17, prime:11
Sequence#3, number: 19, prime:5
Sequence#1, number: 21, prime:2
Sequence#1, number: 22, prime:2
Sequence#6, number: 17, prime:13
Sequence#4, number: 19, prime:7
Sequence#2, number: 21, prime:3
17
Sequence#5, number: 19, prime:11
Sequence#1, number: 23, prime:2
Sequence#1, number: 24, prime:2
Sequence#6, number: 19, prime:13
Sequence#2, number: 23, prime:3
Sequence#7, number: 19, prime:17
Sequence#3, number: 23, prime:5
Sequence#1, number: 25, prime:2
Sequence#1, number: 26, prime:2
19
Sequence#4, number: 23, prime:7
Sequence#2, number: 25, prime:3
Sequence#5, number: 23, prime:11
Sequence#3, number: 25, prime:5
Sequence#1, number: 27, prime:2
Sequence#1, number: 28, prime:2
Sequence#6, number: 23, prime:13
Sequence#2, number: 27, prime:3
Sequence#7, number: 23, prime:17
Sequence#1, number: 29, prime:2
Sequence#1, number: 30, prime:2
Sequence#8, number: 23, prime:19
Sequence#2, number: 29, prime:3
23
Sequence#3, number: 29, prime:5
Sequence#1, number: 31, prime:2
Sequence#1, number: 32, prime:2
Sequence#4, number: 29, prime:7
Sequence#2, number: 31, prime:3
Sequence#5, number: 29, prime:11
Sequence#3, number: 31, prime:5
Sequence#1, number: 33, prime:2
Sequence#1, number: 34, prime:2
Sequence#6, number: 29, prime:13
Sequence#4, number: 31, prime:7
Sequence#2, number: 33, prime:3
Sequence#7, number: 29, prime:17
Sequence#5, number: 31, prime:11
Sequence#1, number: 35, prime:2
Sequence#1, number: 36, prime:2
Sequence#8, number: 29, prime:19
Sequence#6, number: 31, prime:13
Sequence#2, number: 35, prime:3
Sequence#9, number: 29, prime:23
Sequence#7, number: 31, prime:17
Sequence#3, number: 35, prime:5
Sequence#1, number: 37, prime:2
Sequence#1, number: 38, prime:2
29또한, 위의 예제는 코루틴이 아닌 일반 함수로 대체할 수 있습니다.
- produce -> buildIterator
- send -> yield
- receive -> next
- receiveChannel -> Iterator
Fan-out
여러 개의 코루틴이 하나의 channel에 값을 receive 할 수 있습니다.
code
fun main() = runBlocking { val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() } fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) { send(x++) delay(100) } } fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { println("Processor #$id received $msg") } }
result
Processor #0 received 1 Processor #0 received 2 Processor #1 received 3 Processor #2 received 4 Processor #3 received 5 Processor #4 received 6 Processor #0 received 7 Processor #1 received 8 Processor #2 received 9 Processor #3 received 10
여러개의 코루틴이 채널의 데이터를 소비하는 경우 for-loop를 쓰는 것이 안전합니다.
for-loop를 사용하면 코루틴 중 하나가 실패하더라도 나머지 코루틴이 channel의 데이터를 처리합니다.
하지만 consumeEach를 사용한다면 코루틴 중 하나라도 실패하는 경우 channel이 닫히면서 다른 코루틴까지 수행이 끝납니다.
code
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { try { channel.consumeEach { if(id==3) { throw IllegalAccessException() } println("Processor #$id received $it") } }catch (e : Exception) { println("3 exception") } }
위의 예제에서 consumeEach로 사용한다면 id가 3인 코루틴에서 강제로 예외가 발생합니다.
Fan-in
여러 코루틴이 동일한 채널로 전송될 수 있습니다.
code
fun main() = runBlocking { val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { println(channel.receive()) } coroutineContext.cancelChildren() } suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } }
result
foo foo BAR! foo foo BAR!
예로 문자열 채널을 하나 가지고 있을 때, 일정 시간마다 특정 문자열을 반복적으로 이 채널로 전송하는 suspending function이 있는 예제를 보겠습니다.
Fan-out과 반대로 하나의 channel에 여러 개의 코루틴이 send도 가능합니다.
Buffered channels
channel은 버퍼가 없습니다.
따라서 channel에 send와 receive는 항상 짝을 이뤄야 합니다.
send가 먼저 발생하면 channel은 receive가 발생할 때까지 suspend 됩니다.
반대로 receive가 먼저 발생하면 channel은 send가 올 때까지 suspend 합니다.
code
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>(4) val sender = launch { repeat(10) { print("Try to send $it : ") channel.send(it) print("Done\n") } } delay(1000) sender.cancel() }
result
Try to send 0 : Done Try to send 1 : Done Try to send 2 : Done Try to send 3 : Done Try to send 4 :
0~3까지는 버퍼에 쌓이고 4번을 send 했을 때 즉 5번째 송신에서 blocking 되어 송신자가 중단되었음을 알 수 있습니다.
Channels are fair
여러 코루틴에서 하나의 channel에 send와 recieve를 하는 경우, 이를 요청한 순서는 보장됩니다.
즉 실행 순서는 그 호출 순서에 따라 공평하게 할당되며 스케쥴링됩니다.
code
fun main() = runBlocking { val table = Channel<Ball>() launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) delay(1000) coroutineContext.cancelChildren() } suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { ball.hits++ println("$name $ball") // Comment out below delay to see the fairness a bit more. delay(300) table.send(ball) } } data class Ball(var hits: Int)
result
ping Ball(hits=1) pong Ball(hits=2) ping Ball(hits=3) pong Ball(hits=4)
channel은 FIFO로 동작하면서 먼저 receive를 요청한 코루틴이 해당 원소를 받아갑니다.
예제를 보면 "ping" 코루틴이 먼저 시작했기 때문에 먼저 공을 수신합니다.
주석의 내용처럼 delay() 를 지운다고 해서 순서가 변경되지는 않습니다. "ping" 코루틴이 송신하고 바로 수신으로 돌아간다고 하더라도
이미 "pong" 코루틴이 수신 대기 중이기 때문입니다.
간혹 채널이 공평하지 못한 실행을 하기도 하는데 이것은 사용되는 Executor 특성 때문입니다.
Ticker channels
Ticker channel은 특별한 채널로, 마지막 수신 이후에 지정된 delay(지연)시간이 지나면 Unit 오브젝트를 송신하는 channel 입니다.
이 채널은 실용성 없어 보일수도 있겠지만, 시간 기반으로 무언가 처리해야 하는 pipiline 같은 곳에서 의미가 있을 수 있습니다.
code
fun main() = runBlocking<Unit> { val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // no initial delay nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay println("Next element is not ready in 50 ms: $nextElement") nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 100 ms: $nextElement") // Emulate large consumption delays println("Consumer pauses for 150ms") delay(150) // Next element is available immediately nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // Note that the pause between `receive` calls is taken into account and next element arrives faster nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") tickerChannel.cancel() // indicate that no more elements are needed }
result
Initial element is available immediately: kotlin.Unit Next element is not ready in 50 ms: null Next element is ready in 100 ms: kotlin.Unit Consumer pauses for 150ms Next element is available immediately after large consumer delay: kotlin.Unit Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
delay가 100인 주기 ticker 채널을 만듭니다. 100ms 마다 Unit을 채널에 send 합니다.
withTimeoutOrNull로 코드에 블락을 걸면서 채널에 있는 값을 receive 하면 0.1초 간격으로 Unit이 보내져 있습니다.
main code 중간에 지연이 있더라도 ticker 채널은 상관없이 0.1초 간격으로 Unit을 채널에 송신합니다.
코드 중간에 150ms 지연이 있는 경우, receive 한 시간 간격과 상관없이 Unit이 반환됩니다.
또한 mode parameter로 아래 두 가지 타입을 옵션으로 줄 수 있습니다.
- TickerMode.FIXED_PERIOD
- TikcerMode.FIXED_DELAY
ticker 채널을 생성하면 기본 TickerMode가 FIXED_PERIOD 입니다.
이 모드는 수신자가 지연되는 것을 인지하고 지연이 발생하면 다음 송신을 그에 맞게 조절하여 데이터 발생을 지연된 시간에 최대한 맞게 맞추어 줍니다.
ticker 채널을 FIXED_DELAY 모드로 사용하면 수신 후 지정한 지연 시간이 지나야 다음 데이터를 수신할 수 있습니다.
code
val tickerChannel = ticker( delayMillis = 100, initialDelayMillis = 0, mode = TickerMode.FIXED_DELAY ) // create ticker channel
result
Initial element is available immediately: kotlin.Unit Next element is not ready in 50 ms: null Next element is ready in 100 ms: kotlin.Unit Consumer pauses for 300ms Next element is available immediately after large consumer delay: kotlin.Unit Next element is ready in 50ms after consumer pause in 150ms: null
Reference
반응형'Kotlin' 카테고리의 다른 글
Kotlin) 정규 표현식 정리 (0) 2021.08.17 Kotlin) Coroutine 공식 가이드 번역 05 - Asynchronous Flow(2/2) (0) 2021.01.20 Kotlin) Coroutine 공식 가이드 번역 05 - Asynchronous Flow(1/2) (0) 2021.01.20 Kotlin) Coroutine 공식 가이드 번역 04 - Coroutine Context and Dispatchers (0) 2021.01.15 Kotlin) Coroutine 공식 가이드 번역 03 - Composing Suspending Functions (0) 2021.01.15