본문 바로가기

IT/kotlin언어

[flow] coroutine flow 문서 읽기 -1

안녕하세요 남갯입니다.

 

오늘은 Coroutine flow 문서를 읽어볼 예정입니다.

 

https://developer.android.com/kotlin/flow?hl=ko

 

Flow란? 

flow는 기존 단일값만 반환하는 정지함수와는 달리 여러값을 순차적으로 보낼수 있는 유형입니다.

기존 Rx를 사용해보셨다면 비슷한 개념으로 이해할 수 있을것 같은데요.

코루틴기반으로 빌드되며, 여러값을 제공할 수 있는 데이터스트림의 개념입니다.

 

Flow의 데이터 스트림

 

Flow의 데이터 스트림에는 3가지의 항목이 존재합니다.

1. 생산자 : 스트림에 추가되는 데이터를 생산합니다. 코루틴 덕분에 흐름을 비동기적으로 데이터가 생산 가능

2. 중개자(선택사항) : 스트림에 내보내는 값을 수정 가능

3. 소비자 : 스트림에서 받아온 값을 사용.

 

구글에서도 권장하고 있는 아키텍처를 사용하게 되면,  DB 혹은 네트워크를 통해 데이터를 만들어주는

생산자 (레포지토리) 있게되는거고 받은 데이터를 통해 데이터를 표시하면 소비자가 되게 됩니다.

 

Flow 생성하기

Flow를 생성하려면 FlowBuilder를 이용합니다. 

Flowof() , asFlow() , Flow {} , channelFlow , MutableStateFlow , MutableSharedFlow 

 

val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }

 

여기서 동작의 흐름은 순차적으로 일어나게 되고, suspend 함수가 반환될때 까지 정지상태를 유지합니다.

flow 빌더에서는 생산자가 다른 CoroutineContext의 값을 emit 할수 없으므로, 새로운 루틴을 만들거나

withContext 를 사용해서 호출하지 말고 callbackFlow와 같은 다른 flow빌더를 호출하면 됩니다.

 

스트림수정

emit 하지 않고 데이터 스트림을 수정 가능합니다. 기존 RxJava와 같이 스트림의 데이터를 변경하고 Lazy 즉 

Cold 하게 설정이 가능합니다.

 

 val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }

 

Flow에서 Collect란?

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

 

스트림에서 모든값을 가져오게 하려면 collect를 사용하면 됩니다.  collect는 suspend 함수이므로 코루틴 내부에서 실행해야하고, 내부적으로 무한으로 활성상태가 유지됩니다. 뷰모델의 내부에 존재하므로 viewModelScope는 뷰모델이 삭제가 된다면 데이터 스트림이 종료가 됩니다.

 

다만 종료가 될수도 있는 이유 

- 수집된 코루틴 즉 viewModelScope가 무슨 이유로던 취소된 경우

- 생산자가 emit을 완료한경우 데이터 스트림은 종료되고 collect 호출 코루틴 실행을 다시 시작합니다.

 

데이터를 구독 즉 수지바는 Flow가 여러개일경우 ShareIn 연산자를 사용

 

Flow 예외처리 

중간 예외에 대한 처리를 하기 위해서는 catch 중간 연산자를 이용해서 가져옵니다.

        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }

 

catch의 상에서 exception을 아래와 같이 emit 가능합니다.

실제 Result를 처리하기 위한 래퍼클래스를 생성했을경우 catch 내부에서 emit을 통해 에러를 처리하면 됩니다.

 

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

 

Flow를 다른 CoroutineContext 에서 실행하기

Flow 빌더의 생산자는 기본적으로 CoroutineContext에서 실행됩니다. 또한 다른 CoroutineContext에서 emit 할 수 없습니다. 따라서 이 동작은 Dispatchers.Main에서 작업을 실행하면 안 됩니다.

CoroutineContext 를 중간에 변경하기 위해서는 flowOn (업스트림 변화) 과 같은 중간 연산자를 사용합니다.

* 기존 Rx의 ObserveOn과 같은 경우는 다운스트림을 변경하게 되는데, flowOn의 경우 업스트림을 변경하기 때문에

flowOn아래의 Flow 에는 영향을 받지 않습니다.

 

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

즉 여기서 보면 flowOn 위쪽의 동작은 defaultDispatcher 에 의해서 동작하게 되고

아래 catch동작은 Dispatchers.Main에서 실행하게 됩니다.

 

   val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)

 

위와 같은 코드의 경우 flowOn 위의 동작이 ioDispatcher를 사용한다는 점 입니다.

 

JetPack에서의 Flow 사용

1. Room에서 Flow를 사용하기

 

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

 

Dao 를 사용해서 업데이트를 받아서 사용 가능합니다.

 

Callback 기반으로 변경하기

callbackFlow 는 콜백 기반 API를 Flow로 변환할수 있는 흐름 빌더입니다.

예를들어 Firebase를 이용한다고 가정했을때. 

 

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

callbackFlow를 감싼 뒤 이벤트를 받아서 처린 한뒤 awaitClose 즉 해당 부분 리무브 될때까지

기다렸다가 flow를 닫는 동작이 가능합니다.

 

또한 callbackFlow는 flow빌더와는 다르게 send함수를 통해 다른 CoroutineContext에서 갑을 보내거나

offer 함수를 사용해서 외부로 값을 전달 가느합니다.

내부적으로 Queue와 Channel의 개념을 사용하고, buffering이 가능합니다. (default 64)

 

 

Flow code lab

https://developer.android.com/codelabs/advanced-kotlin-coroutines?hl=ko#2 

 

코루틴 및 flow 추가 리소스

https://developer.android.com/kotlin/already-using-resources?hl=ko