플로우의 실제 구현

  • 기본적으로 플로우는 어떤 연산을 실행할지 정의한 것
    • 중단 가능한 람다식에 몇 가지 요소를 추가하였다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
interface MyFlow {
    suspend fun myCollect(collector: MyFlowCollector)
}

fun myFlowBuilder(actionBlock: suspend MyFlowCollector.() -> Unit) = object : MyFlow {
    override suspend fun myCollect(collector: MyFlowCollector) {
        collector.actionBlock()
    }
}

fun interface MyFlowCollector { // SAM 사용 
    suspend fun myEmit(value: String)
}

suspend fun main() { // SAM 사용 
    val f: MyFlow = myFlowBuilder {
        this.myEmit("A") // this = myFlowCollector, 생략 가능
        myEmit("B")
    }
    f.myCollect { println(it) }
}

///////////////////////////////////////////////////////
interface MyFlowCollector { // SAM 미 사용
    suspend fun myEmit(value: String)
}

suspend fun main() { // SAM 미 사용
    val f: MyFlow = myFlowBuilder {
        this.myEmit("A") // this = myFlowCollector, 생략 가능
        myEmit("B")
    }
    f.myCollect(object : MyFlowCollector {
        override suspend fun myEmit(value: String) {
            println(value)
        }
    })
}
  • collect를 호출하면, flow 빌더를 호출 할 때 넣은 람다식이 실행된다.

  • 빌더의 람다식이 emit을 호출하면, collect가 호출되었을 때 명시된 람다식이 호출된다.

Flow 처리 방식

  • 플로우의 각 원소를 변환하는 map 함수
    • map 함수새로운 플로우를 만들기 때문에, flow 빌더를 사용한다.
    • 플로우가 시작되면, 래핑하고 있는 플로우를 시작하게 되므로, 빌더 내부에서 collect 메소드를 호출한다.
    • 원소를 받을 때마다, map은 원소를 변환하고 새로운 플로우로 방출한다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
fun <T, R> Flow<T>.map(
    transformation: suspend (T) -> R
): Flow<R> = flow {
    collect {
        emit(transformation(it))
    }
}

suspend fun main() {
    flowOf("A", "B", "C").map {
        delay(1000)
        it.lowercase()
    }.collect { println(it) }
} // (1 sec) // a // (1 sec) // b // (1 sec) // c

동기로 작동하는 Flow

  • 플로우 또한 중단 함수 처럼 동기로 작동한다.
    • 플로우가 완료될 때까지 collect 호출이 중단(suspend) 된다.
    • 즉, 플로우는 새로운 코루틴을 시작하지 않는다.
    • 중단 함수가 코루틴을 시작할 수 있는 것처럼, 플로우의 각 단계에서도 코루틴을 시작할 수 있지만 중단 함수의 기본 동작은 아니다.
  • 플로우의 각각의 처리 단계는 동기로 실행되기 때문에, onEach 내부에 delay가 있으면 모든 원소가 처리되기 전이 아닌 원소 사이에 지연이 생긴다.
1
2
3
suspend fun main() {
    flowOf("A", "B", "C").onEach { delay(1000) }.collect { println(it) }
} // (1 sec) // A // (1 sec) // B // (1 sec) // C

플로우와 공유 상태

  • 커스텀한 플로우 처리 함수를 구현할 때, 플로우의 각 단계동기로 작동하기 때문에 동기화 없이도 플로우 내부에 변경 가능한 상태를 정의할 수 있다.

  • 플로우 단계 외부의 변수를 추출해서, 함수에서 사용하는 것이 흔히 저지르는 실수 중 하나이다.

    • 외부 변수는 같은 플로우가 모으는, 모든 코루틴이 공유하게 된다.
    • 이런 경우 동기화가 필수이며, 플로우 컬렉션이 아니라 플로우에 종속되게 된다.
    • 따라서 두 개의 코루틴이 병렬로 원소를 세게 된다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
fun Flow<*>.counter() = flow<Int> {
    var counter = 0
    collect {
        counter++ // to make it busy for a while
        List(100) { Random.nextLong() }.shuffled().sorted()
        emit(counter)
    }
}

suspend fun main(): Unit = coroutineScope {
    val f1 = List(1000) { "$it" }.asFlow()
    val f2 = List(1000) { "$it" }.asFlow().counter()

    launch { println(f1.counter().last()) } // 1000
    launch { println(f1.counter().last()) } // 1000
    launch { println(f2.last()) } // 1000
    launch { println(f2.last()) } // 1000
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
fun Flow<*>.counter(): Flow<Int> {
    var counter = 0
    return this.map {
        counter++
        List(100) { Random.nextLong() }.shuffled().sorted()
        counter
    }
}

suspend fun main(): Unit = coroutineScope {
    val f1 = List(1000) { "$it" }.asFlow()
    val f2 = List(1000) { "$it" }.asFlow().counter()

    launch { println(f1.counter().last()) } // 1000
    launch { println(f1.counter().last()) } // 1000
    launch { println(f2.last()) } // less than 2000
    launch { println(f2.last()) } // less than 2000
    // 콘솔 결과 창에서, 4개 print 결과 중 2개는 1000, 나머지는 2개는 2000보다 작다.
}

같은 변수를 사용하는 중단 함수들에서 동기화가 필요한 것처럼,

플로우에서 사용하는 변수가 함수 외부, 클래스의 스코프, 최상위 레벨에서 정의되어 있으면 동기화가 필요하다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
var counter = 0 // 최상위 레벨 변수

fun Flow<*>.counter(): Flow<Int> = this.map {
    counter++
    List(100) { Random.nextLong() }.shuffled().sorted()
    counter
}

suspend fun main(): Unit = coroutineScope {
    val f1 = List(1000) { "$it" }.asFlow()
    val f2 = List(1000) { "$it" }.asFlow().counter()

    launch { println(f1.counter().last()) } // 3496, less than 4000
    launch { println(f1.counter().last()) } // 3735
    launch { println(f2.last()) } // 3870
    launch { println(f2.last()) } // 3933
}

Reference