4. 코루틴의 실제 구현

  • 코틀린 중단 함수는 continuation-passing style(CPS) 로 구현되어 있다.

    • 여러 중단 함수 구현 방식 중 CPS를 선택하였다.
  • Continuation 객체는 상태를 나타내는 숫자와 로컬 데이터를 가지고 있다.

  • Continuation 은 함수 인자로 전달된다.

  • 중단 함수는 상태(state) 를 가지는 state machine 으로 볼 수 있다.

  • 중단 함수의 Continuation 객체가 이 함수를 부르는 다른 함수의 Continuation 객체를 decorate(장식) 한다.

  • 모든 Continuation 객체는 resume 하거나 함수가 완료 될때 사용되는 콜 스택으로 사용된다.

1
2
3
4
5
6
7
8
suspend fun getUser(): User?
suspend fun setUser(user: User)
suspend fun checkAvailability(flight: Flight): Boolean

// 내부
fun getUser(continuation: Continuation<*>): Any?
fun setUser(user: User, continuation: Continuation<*>): Any
fun checkAvailability(flight: Flight, continuation: Continuation<*>): Any

아주 간단한 중단 함수 알아보기

  • 함수가 시작되는 지점은 함수의 시작점과 중단 이후 재개 시점(컨티뉴에이션이 resume을 호출할 때) 두 곳이다.
    • 현재 상태를 저장에는 label 이라는 field 를 사용한다. (처음에는 0으로 설정)
    • 이후에는 중단되기 전에 다음 상태로 설정되어 재개 될 시점을 알 수 있게 해준다.
  • delay에 의해 중단된 경우 COROUTINE_SUSPENDED 를 반환
  • myFunction 을 호출한 함수부터 시작해 콜 스택에 있는 모든 함수도 똑 같다.
    • 따라서, 중단이 일어나면 콜 스택에 있는 모든 함수가 종료된다
    • 중단된 코루틴을 실행하던 스레드를 (다른 종류의 코루틴을 포함해) 실행 가능한 코드가 사용할 수 있게 된다.
1
2
3
4
5
6
7
8
suspend fun myFunction() {
    println("Before")
    var counter = 0
    delay(1000) // suspending
    counter++
    println("Counter: $counter")
    println("After")
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 중단 함수를의 바이트 코드를 자바로 디컴파일한 코드를 코틀린으로 표현한 코드
fun myFunction(continuation: Continuation<Unit>): Any {
    val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation)
    var counter = continuation.counter

    if (continuation.label == 0) {
        println("Before")
        counter = 0
        continuation.counter = counter
        continuation.label = 1
        if (delay(1000, continuation) == COROUTINE_SUSPENDED) {
            return COROUTINE_SUSPENDED
        }
    }
    if (continuation.label == 1) {
        counter = (counter as Int) + 1
        println("Counter: $counter")
        println("After")
        return Unit
    }
    error("Impossible")
}

class MyFunctionContinuation( // 컨트뉴에이션 객체
    val completion: Continuation<Unit>
) : Continuation<Unit> {
    override val context: CoroutineContext
        get() = completion.context

    var result: Result<Unit>? = null
    var label = 0
    var counter = 0

    override fun resumeWith(result: Result<Unit>) {
        this.result = result
        val res = try {
            val r = myFunction(this)
            if (r == COROUTINE_SUSPENDED) return
            Result.success(r as Unit)
        } catch (e: Throwable) {
            Result.failure(e)
        }
        completion.resumeWith(res)
    }
}

상태를 가진 함수

  • 함수가 중단된 후에 다시 사용할 지역 변수나 파라미터와 같은 상태를 가지고 있다면, 함수의 컨티뉴에이션 객체에 상태를 저장해야 한다.
    • 함수 내에서 사용되던 값들은 중단되기 직전에 저장되고, 이후 함수가 재개될 때 복구된다.

값을 받아 재개되는 함수

  • 중단 함수로 부터 값을 받아야 하는 경우 좀 더 복잡해짐
  • token은 상태 0과 1에서 사용
  • userId는 상태 1과 2에서 사욛
  • Result 타입인 result는 함수가 어떻게 재개되었는지 나타냄
    • 함수가 값으로 재개 되면 결과는 Result.Success(result)
    • 함수가 예외로 재개 되면 결과는 Result.Failure(exception)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
suspend fun printUser(token: String) {
    println("Before")
    val userId = getUserId(token) // 중단 함수
    println("Got userId: $userId")
    val userName = getUserName(userId, token) // 중단 함수
    println(User(userId, userName))
    println("After")
}

fun printUser(
    token: String,
    continuation: Continuation<*>
): Any {
    val continuation = continuation as? PrintUserContinuation
        ?: PrintUserContinuation( continuation as Continuation<Unit>, token )

    var result: Result<Any>? = continuation.result
    var userId: String? = continuation.userId
    val userName: String

    if (continuation.label == 0) {
        println("Before")
        continuation.label = 1
        val res = getUserId(token, continuation)
        if (res == COROUTINE_SUSPENDED) {
            return COROUTINE_SUSPENDED
        }
        result = Result.success(res)
    }
    if (continuation.label == 1) {
        userId = result!!.getOrThrow() as String
        println("Got userId: $userId")
        continuation.label = 2							// 다음 label을 설정
        continuation.userId = userId					// 컨티뉴에이션 객체에 상태를 저장
        val res = getUserName(userId, continuation)     // 중단 함수를 호출
        if (res == COROUTINE_SUSPENDED) {				// 중단 상태일 때
            return COROUTINE_SUSPENDED
        }
        result = Result.success(res)					// 중단되지 않았으면 결괏값을 설정
    }
    if (continuation.label == 2) {					   
        result!!.throwOnFailure()					    // 실패할 경우 에외를 던짐
        userName = result.getOrNull as String	        // 결괏값을 읽음
        println(User(userId as String, userName))
        println("After")
        return Unit
    }
    error("Impossible")
}

class PrintUserContinuation(
    val completion: Continuation<Unit>,
    val token: String
) : Continuation<String> {
    override val context: CoroutineContext
        get() = completion.context

    var label = 0
    var result: Result<Any>? = null
    var userId: String? = null

    override fun resumeWith(result: Result<String>) {
        this.result = result
        val res = try {
            val r = printUser(token, this)
            if (r == COROUTINE_SUSPENDED) return
            Result.success(r as Unit)
        } catch (e: Throwable) {
            Result.failure(e)
        }
        completion.resumeWith(res)
    }
}

콜 스택

  • 함수 a가 함수 b를 호출하면 가상 머신은 a의 상태와 b가 끝나면 실행이 될 지점을 콜 스택(call stack)이라는 자료 구조 에 저장한다.

  • 코루틴을 중단하면 스레드를 반환해 콜 스택에 있는 정보가 사라질 것이다.

    • 따라서 코루틴을 재개 할 때, 콜 스택을 사용할 수 없다.
    • 대신 Continuation 객체가 콜 스택의 역할을 대신한다.
  • Continuation 객체는 중단 되었을때 상태(label)와 함수의 지역 변수와 파라미터(필드), 중단 함수를 호출한 함수가 재개될 위치 정보를 가지고 있다.

    • 하나의 Continuation 객체는 다른 하나를 참조하는 식으로 동작한다.
    • Continuation 객체가 재개 될 때, 각 Continuation 객체는 자신이 담당하는 함수를 호출하고,
    • 그 함수의 실행이 끝나면, 자신을 호출한 함수의 Continuation 객체을 재개 합니다.
      • complete.resumeWith(res)
    • 이 과정은 스택의 끝에 다다를 때까지 반복된다.
  • 중단된 함수가 재개했을 때 Continuation 객체로 부터 상태를 복원하고, 얻은 결과를 사용하거나 예외를 던진다.

  • 컨티뉴에이션 객체에 상태가 저장된며, 중단을 처리하기 위한 과정이 있어야 한다.

  • 중단된 함수가 재개했을 때 컨티뉴에이션 객체로부터 상태를 복원하고, 얻은 결과를 사용하거나 예외를 던져야 한다.

  • 예외를 던질 때도 비슷하다, 처리되지 못한 예외가 resumeWith 에서 잡히면 Result.failure(e)로 래핑되며, 예외를 던진 함수를 호출한 함수는 포장된 결과를 받는다.

  • 함수 c에서 중단된 상황을 예로 들면

    • 실행이 재개되면 c의 컨티뉴에이션 객체는 c 함수를 먼저 재개

    • 함수가 완료되면 c 컨티뉴에이션 객체는 b 함수를 호출하는 b 컨티뉴에이션 객체를 재개

    • b 함수가 완료되면 b 컨티뉴에이션은 a 컨티뉴에이션을 재개하고 a 함수가 호출된다.

image-20240716201908358.png

image-20240716201918214.png

image-20240716201934555.png

중단 함수의 성능

  • 함수를 상태로 나누는 것은 숫자를 비교하는 것만큼 쉬운 일이며 실행점이 변하는 비용 또한 거의 들지 않음
  • 컨티뉴에이션 객체에 상태를 저장하는 것 또한 간단하다
    • 지역 변수를 복사하지 않고 새로운 변수가 메모리 내 특정 값을 가리키게 한다.

Reference