플로우 처리
- 플로우 생성과 최종 연산 사이의 값을 변경하는 연산들을 플로우 처리(flow processing) 이라고 한다.
- 플로우는 값이 흐르기 때문에 제외하고, 곱하고, 변형하거나, 합치는 등의 여러 가지 방법으로 변경 가능하다.
컬렉션 처리는 어떻게 작동할까?
flow 빌더
와 람다식을 가진collect
만으로 구현 가능하다.
|
|
- 1에서 플로우 시작, 7에서 원소들을 모은다.
- 모으기 시작할 때, 1에서 시작하는
@map
람다식을 수행하며, 이 람다식은 2에서 또 다른 빌더를 호출하고, 5에서 원소들을 모은다. - 원소 들을 모을 때, 2에서 시작하는
@flowOf
람다식을 시작한다. - 2의 람다식은 ‘a’, ‘b’를 가진 배열을 탐색한다.
- 첫 번째 값인 ‘a’를 4에서 내보내며, 5 람다식이 호출된다.
- 5의 람다식은 갑을 ‘A’로 변경하며
@map
플로우로 내보낸 뒤, 7의 람다식이 호출 된다. - 값이 출력된 후 7의 람다식이 종료되고 6에서 람다식이 재개 된다.
- 람다식이 끝났기 때문에 4의
@flowOf
가 재개되며, 탐색이 다시 시작되어 4에서 ‘b’를 내보낸다. - 5에서 람다식이 호출되고, ‘B’로 값을 변형한 뒤 6에서
@map
플로우로 내보낸다. - 값은 7에서 모이며 8에서 출력된다.
- 7의 람다식이 종료되므로 6의 람다식이 재개 된다. 이 람다식도 종료되었기 때문에 4의
@flowOf
람다식이 다시 시작된다. - 4도 종료되었기 때문에 5의
collect
에서@map
이 재개된다. 더 이상 남은 것이 없기 때문에@map
의 마지막 부분에 도달한다. - 7의
collect
에서 다시 시작하면main 함수
의 끝에 도달한다.
map
- 플로우의 각 원소를 변환 함수에 따라 변환하는
map
함수.
|
|
filter
- 원래 플로우에서 주어진 조건에 맞는 값들만 가진 플로우를 반환한다.
- 관심 없는 원소를 제거할 때 주로 사용
|
|
take
- 특정 수의 원소만 통과시킬 때 사용
|
|
drop
- 특정 수의 원소를 무시할 때 사용
|
|
merge
- 두 개의 플로우를 하나의 플로우로 합칠 때 사용되는 함수 =>
merge, zip, combine
merge
: 두 개의 플로우에서 생성된 원소들을 하나로 합칠 때 사용merge
를 사용하면 한 플로우의 원소가 다른 플로우를 기다리지 않는다는 것이 중요하다.- 첫 번째 플로우의 원소 생성이 지연된다고 해서, 두 번째 플로우의 원소 생성이 중단되지 않는다.
- 여러 개의 이벤트들을 똑같은 방법으로 처리할 때
merge
를 사용한다.
|
|
|
|
zip
- 두 플로우로 부터 쌍을 만들때 사용한다.
- 원소가 쌍을 이루는 방법을 정하는 함수도 필요하다.
- 각 원소는 한 쌍의 일부가 되므로 쌍이 될 원소를 기다려야 한다.
- 쌍을 이루지 못하고 남은 원소는 유실되므로 한 플로우에서 zipping이 완료되면 생성되는 플로우 또한 완료된다.
zip
은 쌍을 필요로 하기 때문에 첫 번째 플로우가 닫히면 함수 또한 끝나게 된다.
|
|
combine
combine
을 사용하면 모든 새로운 원소가 전임자를 대체 하게 된다.zip
의 경우 느린 플로우를 기다려야 한다.
- 첫 번째 쌍이 이미 만들어졌다면 다른 플로우의 이전 원소와 함께 새로운 쌍이 만들어진다.
combine
은 두 플로우 모두 닫힐 때까지 원소를 내보낸다.combine
은 두 데이터 소스의 변화를 능동적으로 감지할 때 주로 사용된다.
|
|
- 변화가 발생할 때마다 원소가 내보내지길 원한다면 합쳐질 각 플로우에 초기 값을 더하면 된다.
- Ex.
View
가 감지 가능한 원소 두 가지 중에 하나라도 변경될 때 반응해야 하는 경우combine
을 주로 사용한다.
|
|
fold
- 초기 값부터 시작하여 주어진 원소 각각에 대해 두 개의 값을 하나로 합치는 연산을 적용하여
컬렉션
의 모든 값을 하나로 합칠때 사용 fold
는 최종 연산이고, 플로우에서도 사용할 수 있으며,collect
처럼 플로우가 완료될 때까지 중단(suspend) 된다.
|
|
scan
scan
은 누적되는 과정의 모든 값을 생성하는 중간 연산이다.scan
은 이전 단계에서 값을 받은 즉시 새로운 값을 만들기 만든다.
flatMapConcat
컬렉션
의 경우flatMap
은map
과 비슷하지만 변환 함수가 평탄화된 컬렉션 을 반환해야 한다는 점이 다르다.- 플로우의 경우 변환 함수가 평탄화된 플로우를 반환한다고 생각하는게 직관적이다.
- 문제는 플로우 원소가 나오는 시간이 다르다는 점이다.
- 이러한 이유로 플로우에는
flatMap
함수가 없으며, 대신flatMapConcat, flatMapMerge, flatMapLatest
와 같은 함수가 존재한다.
flatMapConcat
함수는 생성된 플로우를 하나씩 처리한다.- 그래서 두 번째 플로우는 첫 번째 플로우가 완료되었을 때 시작할 수 있다.
|
|
flatMapMerge
- 특징: 만들어진 플로우를 동시에 처리한다.
|
|
concurreny
인자를 사용해 동시에 처리할 수 있는 플로우의 수를 설정할 수 있다.- 기본값은 16이지만, 변경 가능하다.
|
|
flatMapMerge
는 플로우의 각 원소에 대한 데이터를 요청할 때 주로 사용된다.- 예를 들어 종류를 목록으로 가지고 있다면, 종류별로 요청을 보내야 한다.
async
함수 대신, 플로우와 함께flatMapMerge
를 사용하면 두 가지 이점이 있다.- 동시성 인자를 제어하고(같은 시간에 수백 개의 요청을 보내는 걸 피하기 위해) 같은 시간에 얼마만큼의 종류를 처리할지 결정할 수 있다.
Flow
를 반환하여 데이터가 생성될 때마다, 다음 원소를 보낼 수 있다.- 함수를 사용하는 측면에서 보면, 데이터를 즉시 처리할 수 있다.
|
|
flatMapLatest
- 특징: 새로운 플로우가 나타나면 이전에 처리하던 플로우를 잊어버린다.
|
|
|
|
retry(재시도)
- 예외는 플로우를 따라 흐르면서 각 단계를 하나씩 종료한다.
- 종료된 단계는 비활성화되기 때문에, 예외가 발생한 뒤 메시지를 보내는 건 불가능하지만, 각 단계가 이전 단계에 대한 참조를 가지고 있으며, 플로우를 다시 시작하기 위해 참조를 사용할 수 있다.
- 이 원리에 기반하여, 코틀린은
retry
와retryWhen
함수를 제공한다.
|
|
|
|
retryWhen
은 플로우의 이전 단계에서 예외가 발생할 때마다조건자(predicate)
를 확인한다.- 몇 번까지 재시도할지와 특정 예외 클래스가 발생했을 때만 처리할지를 명시한다.
|
|
- 어떤 예외든지 항상 재시도 하는 경우, log를 남기고 새로운 연결 맺는 걸 시도할 때 시간 간격을 주기 위해 predict(조건자) 를 정의한다.
|
|
- 연결을 계속해서 재시도할 때 시간 간격을 점진적으로 증가시키는 방법도 자주 사용된다.
- 예외가 특정 타입일 때 재시도하는 조건자를 구현할 수 도 있다.
|
|
최종 연산
- 플로우를 처리를 끝내는 연산 => 최종 연산 이라고 부른다.
- 기본적인
collect
외에도,Collection(컬렉션)
과Sequence
가 제공하는 것과 비슷한count, first, firstOrNull, fold, reduce
또한 최종 연산이다. - 최종 연산은 중단 가능(suspend) 가능하며, 플로우가 완료되었을 때 또는 최종 연산 자체가 플로우를 완료 시켰을 때 값을 반환한다.
collect
메서드를 사용해서 또 다른 최종 연산을 얼마든지 구현할 수 도 있다.
|
|
Reference
- https://kotlinlang.org/docs/coroutines-guide.html
- 코틀린 코루틴 Kotlin Coroutines: Deep Dive (Marcin Moskała, 인사이트)