플로우란 무엇인가?

  • 플로우(flow)는 비동기적으로 계산해야 할 값의 스트림을 나타낸다.
  • 플로우는 시퀀스와 달리 코루틴을 지원하며, 비동기적으로 계산되는 값을 나타낸다.
  • Flow 인터페이스 자체는 떠다니는 원소들을 모으는 역할을 하며, 플로우의 끝에 도달할 때까지 각 값을 처리하는 것 의미한다.
    • Flow의 collect컬렉션의 forEach와 비슷하다.
    • Flow의 유일한 멤버 함수는 collect(최종 연산) 이다. 다른 함수는 확장 함수로 정의되어 있다.
1
2
3
interface Flow<out T> {
    suspend fun collect(collector: FlowCollector<T>)
}
  • 시퀀스(Sequence)의 최종 연산은 중단 함수가 아니기 때문에, 시퀀스 빌더 내부에 중단점이 있다면 값을 기다리는 스레드블로킹 된다.
    • 따라서 sequencee 빌더의 스코프에서는 yield, yieldAll 외에 다른 중단 함수를 사용할 수 없다.
    • Sequence의 iterator가 중단 함수가 아니기 때문에, 시퀀스의 원소를 소비할 때 블로킹이 되는 것이 문제가 된다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Don't do that, we should use Flow instead of Sequence
fun allUsersSequence(
    api: UserApi
): Sequence<User> = sequence {
        var page = 0
        do {
            val users = api.takePage(page++)
            //suspend 함수 임으로, so compilation error
            yieldAll(users)
        } while (!users.isNullOrEmpty())
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 하나의 코루틴이 다른 코루틴을 블로킹하게 된다.
// 같은 스레드에서 launch로 시작된 코루틴이 대기 하게 된다.
fun getSequence(): Sequence<String> = sequence {
        repeat(3) {
            Thread.sleep(1000)
            // the same result as if there were delay(1000) here
            yield("User$it")
        }
    }

suspend fun main() {
    withContext(newSingleThreadContext("main")) {
        launch {
            repeat(3) {
                delay(100)
                println("Processing on coroutine")
            }
        }

        val list = getSequence()
        list.forEach { println(it) }
    }
}
// (1 sec)
// User0
// (1 sec)
// User1
// (1 sec)
// User2
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
  • Sequence를 사용했기 때문에 forEach가 블로킹 연산이 된다.

    • 따라서 같은 스레드에서 launch로 시작된 코루틴이 대기하게 되며, 하나의 코루틴이 다른 코루틴을 블로킹하게 된다.
  • 이런 상황에서 Sequence 대신 Flow를 사용해야 한다.

  • 플로우를 사용하면 코루틴이 연산을 수행하는 데 필요한 기능을 전부 사용할 수 있다.

  • 플로우의 빌더와 연산은 중단 함수(suspend) 이며, 구조화된 동시성과 적절한 예외 처리를 지원한다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
fun getFlow(): Flow<String> = flow {
    repeat(3) {
        delay(1000)
        emit("User$it")
    }
}

suspend fun main() {
    withContext(newSingleThreadContext("main")) {
        launch {
            repeat(3) {
                delay(100)
                println("Processing on coroutine")
            }
        }

        val list = getFlow()
        list.collect { println(it) }
    }
}
// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (1 - 3 * 0.1 = 0.7 sec)
// User0
// (1 sec)
// User1
// (1 sec)
// User2

플로우의 특징

  • collect(중단 함수)와 같은 플로우의 최종 연산은 스레드를 블로킹하는 대신 코루틴을 중단시킨다.
  • 플로우는 Coroutine Context 를 활용하고, 예외를 처리하는 등의 코루틴 기능도 제공한다.
  • 플로우 처리는 취소(cancel) 가능하며, 구조화된 동시성을 갖추고 있다.
  • flow 빌더는 중단 함수가 아니며, 어떠한 스코프도 필요하지 않다.
  • 플로우의 최종 연산중단 가능하며, 연산이 실행될 때, 부모 코루틴과의 관계가 정립된다. (coroutineScope 함수와 비슷)
    • launch 를 취소하면, 플로우 처리도 적절하게 취소된다.

플로우 명명법

  • 플로우는 어딘가에서 시작 되어야 한다. (플로우 빌더, 다른 객체에서의 변환 또는 헬퍼 함수로 시작된다.)

  • 플로우의 마지막 연산은 최종 연산이라고 불리며, 중단 가능하거나 스코프를 필요로 하는 유일한 연산이다.

    • 최종 연산은 주로 람다 표현식을 가지거나 가지지 않는 collect 가 된다.
  • 시작 연산과 최종 연산 사이에 플로우를 변경하는 중간 연산(intermediate operation)을 가질 수 있다.

플로우 사용 예시

  • 주로 이벤트를 감지해야 할 필요가 있을 때, 사용
  • 서버가 보낸 이벤트를 통해 전달된 메시지를 받는 경우
  • 텍스트 입력 또는 클릭과 같은 사용자 액션이 감지된 경우
  • 센서 또는 위치나 지도와 같은 기기의 정보 변경을 받는 경우
  • 데이터베이스의 변경을 감지하는 경우

플로우는 이 밖의 경우에도 동시성 처리 를 위해 유용하게 사용 될 수 있다.

1
2
3
4
5
6
7
8
9
suspend fun getOffers(
    sellers: List<Seller>
): List<Offer> = coroutineScope {
    sellers
        .map { seller ->
            async { api.requestOffers(seller.id) }
        }
        .flatMap { it.await() }
}

컬렉션 처리 내부에서 async를 사용하면 동시 처리를 할 수 있지만,

많은 요청을 한번에 보내면 client 뿐 아니라 server 모두에게 좋지 않다.

1
2
3
4
5
6
7
8
suspend fun getOffers(
    sellers: List<Seller>
): List<Offer> = sellers
    .asFlow()
    .flatMapMerge(concurrency = 20) { seller ->
        suspend { api.requestOffers(seller.id) }.asFlow()
    }
    .toList()

컬렉션 대신 플로우로 처리하면 동시 처리, 컨텍스트, 예외를 조절할 수 있다.

Reference