ABOUT ME

-

  • Kotlin) Coroutine 공식 가이드 번역 06 - Channels
    Kotlin 2021. 1. 25. 20:39


    Channels

    Deffered는 코루틴 간에 단일 값을 편하게 전달하는 방법을 제공하지만,

    Channel은 Stream의 값을 전달하는 방법을 제공합니다.

     

    Channel은 Blocking Queue와 유사하게 동작합니다. 차이점은 아래와 같습니다.

     

    Blocking Queue Channel
    put send
    take  receive

     

    동시성이 필요한 여러 코루틴에서 순서를 보장받으면서 공유하여 사용할 수 있습니다.

     

    code
    fun main() = runBlocking {
        val channel = Channel<Int>()
        launch {
            for (x in 1..5) channel.send(x * x)
        }
        repeat(5) { println(channel.receive()) }
        println("Done!")
    }
    result
    1
    4
    9
    16
    25
    Done!

    Closing and iteration over channels

    Blocking Queue와 다르게 Channel은 더 이상 사용하지 않을 때 채널을 close로 닫음으로서 더 이상 오지 않음을 표현할 수 있습니다.

    Channel stream을 반환하기 때문에 close를 시킬 경우 반환 값을 받아 사용하는 for-loop에서 Channel 사용이 끝났음을 알 수 있습니다.

     

    code
    fun main() = runBlocking {
        val channel = Channel<Int>()
        launch {
            for (x in 1..5) channel.send(x * x)
            channel.close()
        }
        for (y in channel) println(y)
        println("Done!")
    }
    result
    1
    4
    9
    16
    25
    Done!

    close는 특별한 token의 형태로 이 값 역시 channel에 들어갑니다.

    그래서 iterator는 이 값을 만나면 iteration을 멈춥니다. 즉 close를 호출해도 그 이전에 넣은 값들은 받아올 수 있음을 보장합니다.


    Building channel producers

    코루틴으로 흔하게 사용하는 형태중 하나가 sequential 한 데이터를 생산하는 쪽과 소비하는 쪽을 구현하는 producer-consumer 패턴입니다.

    생산하는 형태를 쉽게 구현하도록 제공하는 coroutine builderproduce가 있습니다.

    소비하는 쪽에서 사용하는 extension function으로 cosumeEach가 있습니다.(for-loop 대체 가능)

     

    code
    fun main() = runBlocking {
        val squares = produceSquares()
        squares.consumeEach { println(it) }
        println("Done!")
    }
    
    fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
        for (x in 1..5) send(x * x)
    }
    result
    1
    4
    9
    16
    25
    Done!

    위의 코드는 Coroutine Scope의 확장 함수로 만든 경우입니다.

     

    확장함수로 만들고 싶지 않다면 아래의 코드처럼 만들 수 있습니다.

     

    code
    fun produceSquares2(coroutineScope: CoroutineScope): ReceiveChannel<Int> =
            coroutineScope.produce(Dispatchers.Default) {
                for (x in 1..5) send(x * x)
            }

    Pipelines

    무한한 sequence를 만들어내고 동시에 이를 소비하는 패턴을 pipelining으로 만들 수 있습니다.

    하나의 코루틴이 데이터 스트림을 생산해내고 다른 하나 이상의 코루틴들이 이 스트림을 수신 받아 필요한 작업을 수행 한 후 가공된 결과를 다시 전송하는 패턴을 말합니다.

     

    code
    fun main() = runBlocking {
        val numbers = produceNumbers()
        val squares = square(numbers)
        for (i in 1..5) println(squares.receive())
        println("Done!")
        coroutineContext.cancelChildren()
    }
    
    fun CoroutineScope.produceNumbers() = produce<Int> {
        var x = 1
        while (true) send(x * x)
    }
    
    fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce<Int> {
        for (x in numbers) send(x * x)
    }

    produceNumbers() 함수에서 수를 무한하게 생산하고, square() 함수에서 넘겨받은 channel에서 값을 소비하여 제곱합니다.

     

    produce를 사용하기 때문에 함수 실행이 끝날 때까지 blocking 되는 게 아니라 결괏값이 생성될 때마다 async 하게 channel로 보내지고,

    for문에서 5개를 recieve 할 때까지 대기합니다.


    Prime numbers with pipeline

    pipleline을 이용해 소수를 찾는 예제입니다.

     

    code
    fun main() = runBlocking {
        var cur = numbersFrom(2)
        for (i in 1..10) {
            val prime = cur.receive()
            println(prime)
            cur = filter(cur, prime)
        }
        coroutineContext.cancelChildren()
    }
    
    fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
        var x = start
        while (true) send(x++)
    }
    
    fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
        for (x in numbers) if (x % prime != 0) send(x)
    }
    result
    2
    3
    5
    7
    11
    13
    17
    19
    23
    29

    소수를 찾는 예제입니다. cancelChildren을 통해 10개까지 출력하고 모든 하위 코루틴을 취합니다.

    예제의 이해를 돕기 위해 아래의 코드처럼 로그를 찍어 출력할 수 있습니다.

     

    code
    fun main() = runBlocking {
        var cur = numbersFrom(2)
        for (i in 1..10) {
            val prime = cur.receive()
            println(prime)
            cur = filter(cur, prime, i)
        }
        coroutineContext.cancelChildren()
    }
    
    fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
        var x = start
        while (true) send(x++)
    }
    
    fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int, sequence: Int): ReceiveChannel<Int> = produce {
        for (x in numbers) {
            println("Sequence#$sequence, number: $x, prime:$prime")
            if (x % prime != 0) {
                send(x)
            }
        }
    }

     

    결과는 아래에서 확인해주세요.

     

    result
    더보기

    2
    Sequence#1, number: 3, prime:2
    3
    Sequence#1, number: 4, prime:2
    Sequence#1, number: 5, prime:2
    Sequence#2, number: 5, prime:3
    Sequence#1, number: 6, prime:2
    5
    Sequence#1, number: 7, prime:2
    Sequence#1, number: 8, prime:2
    Sequence#2, number: 7, prime:3
    Sequence#3, number: 7, prime:5
    Sequence#1, number: 9, prime:2
    Sequence#1, number: 10, prime:2
    7
    Sequence#2, number: 9, prime:3
    Sequence#1, number: 11, prime:2
    Sequence#1, number: 12, prime:2
    Sequence#2, number: 11, prime:3
    Sequence#3, number: 11, prime:5
    Sequence#1, number: 13, prime:2
    Sequence#1, number: 14, prime:2
    Sequence#4, number: 11, prime:7
    Sequence#2, number: 13, prime:3
    11
    Sequence#3, number: 13, prime:5
    Sequence#1, number: 15, prime:2
    Sequence#1, number: 16, prime:2
    Sequence#4, number: 13, prime:7
    Sequence#2, number: 15, prime:3
    Sequence#5, number: 13, prime:11
    Sequence#1, number: 17, prime:2
    Sequence#1, number: 18, prime:2
    13
    Sequence#2, number: 17, prime:3
    Sequence#3, number: 17, prime:5
    Sequence#1, number: 19, prime:2
    Sequence#1, number: 20, prime:2
    Sequence#4, number: 17, prime:7
    Sequence#2, number: 19, prime:3
    Sequence#5, number: 17, prime:11
    Sequence#3, number: 19, prime:5
    Sequence#1, number: 21, prime:2
    Sequence#1, number: 22, prime:2
    Sequence#6, number: 17, prime:13
    Sequence#4, number: 19, prime:7
    Sequence#2, number: 21, prime:3
    17
    Sequence#5, number: 19, prime:11
    Sequence#1, number: 23, prime:2
    Sequence#1, number: 24, prime:2
    Sequence#6, number: 19, prime:13
    Sequence#2, number: 23, prime:3
    Sequence#7, number: 19, prime:17
    Sequence#3, number: 23, prime:5
    Sequence#1, number: 25, prime:2
    Sequence#1, number: 26, prime:2
    19
    Sequence#4, number: 23, prime:7
    Sequence#2, number: 25, prime:3
    Sequence#5, number: 23, prime:11
    Sequence#3, number: 25, prime:5
    Sequence#1, number: 27, prime:2
    Sequence#1, number: 28, prime:2
    Sequence#6, number: 23, prime:13
    Sequence#2, number: 27, prime:3
    Sequence#7, number: 23, prime:17
    Sequence#1, number: 29, prime:2
    Sequence#1, number: 30, prime:2
    Sequence#8, number: 23, prime:19
    Sequence#2, number: 29, prime:3
    23
    Sequence#3, number: 29, prime:5
    Sequence#1, number: 31, prime:2
    Sequence#1, number: 32, prime:2
    Sequence#4, number: 29, prime:7
    Sequence#2, number: 31, prime:3
    Sequence#5, number: 29, prime:11
    Sequence#3, number: 31, prime:5
    Sequence#1, number: 33, prime:2
    Sequence#1, number: 34, prime:2
    Sequence#6, number: 29, prime:13
    Sequence#4, number: 31, prime:7
    Sequence#2, number: 33, prime:3
    Sequence#7, number: 29, prime:17
    Sequence#5, number: 31, prime:11
    Sequence#1, number: 35, prime:2
    Sequence#1, number: 36, prime:2
    Sequence#8, number: 29, prime:19
    Sequence#6, number: 31, prime:13
    Sequence#2, number: 35, prime:3
    Sequence#9, number: 29, prime:23
    Sequence#7, number: 31, prime:17
    Sequence#3, number: 35, prime:5
    Sequence#1, number: 37, prime:2
    Sequence#1, number: 38, prime:2
    29

    또한, 위의 예제는 코루틴이 아닌 일반 함수로 대체할 수 있습니다.

     

    • produce -> buildIterator
    • send -> yield
    • receive -> next
    • receiveChannel -> Iterator

    Fan-out

    여러 개의 코루틴이 하나의 channel에 값을 receive 할 수 있습니다.

     

    code
    fun main() = runBlocking {
        val producer = produceNumbers()
        repeat(5) { launchProcessor(it, producer) }
        delay(950)
        producer.cancel()
    }
    
    fun CoroutineScope.produceNumbers() = produce<Int> {
        var x = 1
        while (true) {
            send(x++)
            delay(100)
        }
    }
    
    fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
        for (msg in channel) {
            println("Processor #$id received $msg")
        }
    }
    result
    Processor #0 received 1
    Processor #0 received 2
    Processor #1 received 3
    Processor #2 received 4
    Processor #3 received 5
    Processor #4 received 6
    Processor #0 received 7
    Processor #1 received 8
    Processor #2 received 9
    Processor #3 received 10

    여러개의 코루틴이 채널의 데이터를 소비하는 경우 for-loop를 쓰는 것이 안전합니다.

    for-loop사용하면 코루틴 중 하나가 실패하더라도 나머지 코루틴이 channel의 데이터를 처리합니다.

    하지만 consumeEach사용한다면 코루틴 중 하나라도 실패하는 경우 channel이 닫히면서 다른 코루틴까지 수행이 끝납니다.

     

    code
    fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
        try {
            channel.consumeEach {
                if(id==3) {
                    throw  IllegalAccessException()
                }
                println("Processor #$id received $it")
            }
        }catch (e : Exception) {
            println("3 exception")
        }
    }

    위의 예제에서 consumeEach로 사용한다면 id가 3인 코루틴에서 강제로 예외가 발생합니다.


    Fan-in

    여러 코루틴이 동일한 채널로 전송될 수 있습니다. 

     

    code
    fun main() = runBlocking {
        val channel = Channel<String>()
        launch { sendString(channel, "foo", 200L) }
        launch { sendString(channel, "BAR!", 500L) }
        repeat(6) {
            println(channel.receive())
        }
        coroutineContext.cancelChildren()
    }
    
    suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
        while (true) {
            delay(time)
            channel.send(s)
        }
    }
    result
    foo
    foo
    BAR!
    foo
    foo
    BAR!

    예로 문자열 채널을 하나 가지고 있을 때, 일정 시간마다 특정 문자열을 반복적으로 이 채널로 전송하는 suspending function이 있는 예제를 보겠습니다.

    Fan-out과 반대로 하나의 channel에 여러 개의 코루틴이 send도 가능합니다.


    Buffered channels

     

    channel은 버퍼가 없습니다.

    따라서 channelsendreceive는 항상 을 이뤄야 합니다.

     

    send가 먼저 발생하면 channelreceive가 발생할 때까지 suspend 됩니다.

    반대로 receive가 먼저 발생하면 channelsend가 올 때까지 suspend 합니다.

     

    code
    fun main(args: Array<String>) = runBlocking<Unit> {
        val channel = Channel<Int>(4)
    
        val sender = launch {
            repeat(10) {
                print("Try to send $it : ")
                channel.send(it)
                print("Done\n")
            }
        }
    
        delay(1000)
        sender.cancel()
    }
    result
    Try to send 0 : Done
    Try to send 1 : Done
    Try to send 2 : Done
    Try to send 3 : Done
    Try to send 4 :

    0~3까지는 버퍼에 쌓이고 4번을 send 했을 때 즉 5번째 송신에서 blocking 되어 송신자가 중단되었음을 알 수 있습니다.


    Channels are fair

    여러 코루틴에서 하나의 channelsendrecieve를 하는 경우, 이를 요청한 순서는 보장됩니다.

    즉 실행 순서는 그 호출 순서에 따라 공평하게 할당되며 스케쥴링됩니다.

     

    code
    fun main() = runBlocking {
        val table = Channel<Ball>()
        launch { player("ping", table) }
        launch { player("pong", table) }
        table.send(Ball(0))
        delay(1000)
        coroutineContext.cancelChildren()
    }
    
    suspend fun player(name: String, table: Channel<Ball>) {
        for (ball in table) {
            ball.hits++
            println("$name $ball")
            // Comment out below delay to see the fairness a bit more.
            delay(300)
            table.send(ball)
        }
    }
    
    data class Ball(var hits: Int)
    result
    ping Ball(hits=1)
    pong Ball(hits=2)
    ping Ball(hits=3)
    pong Ball(hits=4)

    channelFIFO로 동작하면서 먼저 receive를 요청한 코루틴이 해당 원소를 받아갑니다.

    예제를 보면 "ping" 코루틴이 먼저 시작했기 때문에 먼저 공을 수신합니다.

    주석의 내용처럼 delay() 를 지운다고 해서 순서가 변경되지는 않습니다. "ping" 코루틴이 송신하고 바로 수신으로 돌아간다고 하더라도 

    이미 "pong" 코루틴이 수신 대기 중이기 때문입니다.

     

    간혹 채널이 공평하지 못한 실행을 하기도 하는데 이것은 사용되는 Executor 특성 때문입니다.


    Ticker channels

    Ticker channel은 특별한 채널로, 마지막 수신 이후에 지정된 delay(지연)시간이 지나면 Unit 오브젝트를 송신하는 channel 입니다.

    이 채널은 실용성 없어 보일수도 있겠지만, 시간 기반으로 무언가 처리해야 하는 pipiline 같은 곳에서 의미가 있을 수 있습니다.

     

    code
    fun main() = runBlocking<Unit> {
        val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
        var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
        println("Initial element is available immediately: $nextElement") // no initial delay
    
        nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
        println("Next element is not ready in 50 ms: $nextElement")
    
        nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
        println("Next element is ready in 100 ms: $nextElement")
    
        // Emulate large consumption delays
        println("Consumer pauses for 150ms")
        delay(150)
        // Next element is available immediately
        nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
        println("Next element is available immediately after large consumer delay: $nextElement")
        // Note that the pause between `receive` calls is taken into account and next element arrives faster
        nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
        println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
    
        tickerChannel.cancel() // indicate that no more elements are needed
    }
    result
    Initial element is available immediately: kotlin.Unit
    Next element is not ready in 50 ms: null
    Next element is ready in 100 ms: kotlin.Unit
    Consumer pauses for 150ms
    Next element is available immediately after large consumer delay: kotlin.Unit
    Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

    delay가 100인 주기 ticker 채널을 만듭니다. 100ms 마다 Unit을 채널에 send 합니다.

    withTimeoutOrNull로 코드에 블락을 걸면서 채널에 있는 값을 receive 하면 0.1초 간격으로 Unit이 보내져 있습니다.

     

    main code 중간에 지연이 있더라도 ticker 채널은 상관없이 0.1초 간격으로 Unit을 채널에 송신합니다.

    코드 중간에 150ms 지연이 있는 경우, receive 한 시간 간격과 상관없이 Unit이 반환됩니다.

     

    또한 mode parameter로 아래 두 가지 타입을 옵션으로 줄 수 있습니다.

    • TickerMode.FIXED_PERIOD
    • TikcerMode.FIXED_DELAY

     

    ticker 채널을 생성하면 기본 TickerMode가 FIXED_PERIOD 입니다.

    이 모드는 수신자가 지연되는 것을 인지하고 지연이 발생하면 다음 송신을 그에 맞게 조절하여 데이터 발생을 지연된 시간에 최대한 맞게 맞추어 줍니다.

     

    ticker 채널FIXED_DELAY 모드로 사용하면 수신 후 지정한 지연 시간이 지나야 다음 데이터를 수신할 수 있습니다.

     

    code
    val tickerChannel = ticker(
            delayMillis = 100,
            initialDelayMillis = 0,
            mode = TickerMode.FIXED_DELAY
        ) // create ticker channel
    result
    Initial element is available immediately: kotlin.Unit
    Next element is not ready in 50 ms: null
    Next element is ready in 100 ms: kotlin.Unit
    Consumer pauses for 300ms
    Next element is available immediately after large consumer delay: kotlin.Unit
    Next element is ready in 50ms after consumer pause in 150ms: null

     

    Reference

    코루틴 공식 가이드 자세히 읽기 - Part 3

    코틀린 - 코루틴#7 - Channels

     

    반응형

    댓글

Designed by Me.