[Coroutine] 플로우

코틀린 코루틴을 공부하며 정리한 글입니다.
혼자 공부하고 정리한 내용이며, 틀린 부분은 지적해주시면 감사드리겠습니다 😀

플로우란

플로우란, 비동기적으로 계산해야할 값의 스트림을 나타낸다.

플로우를 간단하게 구현하면서 동작 방식을 알아보자.

fun main() {  
    val f: () -> Unit = {  
        print("A")  
        print("B")  
        print("C")  
    }  
    f()  // ABC
}

위와 같이 간단한 문자열을 출력하는 람다 함수가 있다. 이렇게는 일반적인 람다 함수이기 때문에, 블록 안에 DB 조회 등과 같은 지연이 생길 수 있는 환경을 만들어보자.

suspend fun main() {  
    val f: suspend () -> Unit = {  
        print("A")  
        delay(1000)  
        print("B")  
        delay(1000)  
        print("C")  
    }  
    f()
    f() // A (1초 후) B (1초 후) C A (1초 후) B (1초 후) C
}

람다식은 순차적으로 호출되기 때문에, 이전 호출이 완료되기 전에 같은 람다식을 추가적으로 호출할 수 없다. 즉, 호출 순서가 보장되는 것이다. 그렇다면 이제 print가 아닌, f()를 호출할 때, 어떤 작업을 할지 정할 수 있도록하고, A, B, C를 넘길 수 있도록 변경해보자.

suspend fun main() {  
    val f: suspend ((String) -> Unit) -> Unit = { emit ->  
        emit("A")  
        emit("B")  
        emit("C")  
    }  
    f { println(it) }  // ABC
    f { println(it) }  // ABC
}

f를 정의한 람다 함수 내부에서는 f를 호출할 때, 지정한 블록을 emit이라는 이름으로 사용하였고, 각 A, B, C를 넘겨주었다. 혹, 호출부에서 중단될 수 있을 수 있기 때문에 emit은 중단 함수가 되어야 한다. 구조가 복잡해졌으니, 함수형 인터페이스로 정의해보자.

fun interface FlowCollector {  
    suspend fun emit(value: String)  
}  

suspend fun main() {  
    val f: suspend FlowCollector.() -> Unit = {  
        emit("A")  
        emit("B")  
        emit("C")  
    }  
    f { println(it) }  // ABC
    f { println(it) }  // ABC
}

FlowCollector를 리시버로 지정하여, emit을 바로 사용할 수 있는 형태가 되었다. 이제 람다 함수 f가 보내주는 A, B, C를 받을 수 있도록 Flow를 만들어보자.

fun interface FlowCollector {  
    suspend fun emit(value: String)  
}  

interface Flow {  
    suspend fun collect(collector: FlowCollector)  
}  

suspend fun main() {  
    val builder: suspend FlowCollector.() -> Unit = {  
        emit("A")  
        emit("B")  
        emit("C")  
    }  
    val flow: Flow = object : Flow {  
        override suspend fun collect(collector: FlowCollector) {  
            collector.builder()  
        }  
    }  
    flow.collect { println(it) }  // ABC
    flow.collect { println(it) }  // ABC
}

복잡해보이지만, flow 변수는, builder를 통해 받은 A, B, C를 받아서, collect를 통해 처리하게 된다. 마지막으로, flow를 함수로 변경해서, 빌더를 직접 만들 수 있도록 변경해보자.

fun interface FlowCollector {  
    suspend fun emit(value: String)  
}  

interface Flow {  
    suspend fun collect(collector: FlowCollector)  
}  

fun flow(  
    builder: suspend FlowCollector.() -> Unit  
): Flow = object : Flow {  
    override suspend fun collect(collector: FlowCollector) {  
        collector.builder()  
    }  
}  

suspend fun main() {  
    val f: Flow = flow {  
        emit("A")  
        emit("B")  
        emit("C")  
    }  
    f.collect { println(it) }  
    f.collect { println(it) }  
}

여기서 String을 제네릭으로 변경하면 실제 Flow의 구현과 거의 동일해진다. 플로우의 collect를 호출하면, flow 빌더를 호출할 때, 넣은 람다식이 실행되게 된다.

플로우의 특징

플로우는 이전 포스팅에서 작성한 콜드 데이터이다. 즉, 모든 원소를 한 번에 처리하는 것이 아닌 하나씩 처리한다. 때문에, DB 조회와 같이 지연되어 나오는 데이터가 많을 경우 나오는 원소마다 처리하는 것이 더 효율적일 수 있다.

class PostService(  
    private val postRepository: PostRepository  
) {  
    suspend fun findAll(  
        successBlock: (Post) -> Unit  
    ) {  
        return postRepository.findAll().collect {  
            successBlock(it.toClientObject())  
        }  
    }  
}  

class PostController(
    private val postService: PostService
) {
    suspend fun actionFindAll(...) {  
        postService.findAll { post ->  
            // 클라이언트에게 스트림 형태로 데이터를 보낼 수 있음  
        }  
    }
}

플로우는, 시퀀스와 유사하게 최종 연산을 통해 실제 코드가 실행된다.

정리

플로우는 시퀀스와 달리 코루틴을 지원한다. 또한, 일반적인 컬렉션과 달리 플로우로 처리하면, 동시 처리, 예외와 같은 것들을 효율적으로 조절할 수 있다. 즉, 플로우는 비동기적으로 계산되는 값을 나타내기에 좋으며, 중단 함수로서 활용할 수도 있다.