Kotlin na Androidzie

W korutynach przepływ to typ, który może emitować wiele wartości sekwencyjnie w przeciwieństwie do funkcji zawieszania, które zwracają tylko jedną wartość. Możesz np. użyć przepływu, aby otrzymywać aktualizacje na żywo z bazy danych.

Przepływy są tworzone na podstawie współprogramów i mogą mieć wiele wartości. Przepływ to koncepcyjnie strumień danych, który można obliczać asynchronicznie. Wyemitowane wartości muszą być tego samego typu. Na przykład Flow<Int> to przepływ, który przesyła wartości całkowite.

Przepływ jest bardzo podobny do elementu Iterator, który generuje sekwencję wartości, ale wykorzystuje funkcje zawieszania do asynchronicznego generowania i wykorzystywania wartości. Oznacza to na przykład, że przepływ może bezpiecznie wysłać żądanie sieciowe do wygenerowania następnej wartości bez blokowania wątku głównego.

W strumieniach danych zaangażowane są 3 elementy:

  • Producent generuje dane, które są dodawane do strumienia. Dzięki współpracom przepływy mogą również generować dane asynchronicznie.
  • (Opcjonalnie) Pośrednicy mogą modyfikować każdą wartość przesyłaną do strumienia lub samego strumienia.
  • Konsument wykorzystuje wartości ze strumienia.

podmioty zaangażowane w strumienie danych, konsument, opcjonalny pośrednik i producent;
Rysunek 1. Podmioty zaangażowane w strumienie danych: konsument, opcjonalny pośrednik i producent.

W przypadku Androida repozytorium jest zwykle producentem danych interfejsu, których interfejs to interfejs użytkownika, który ostatecznie wyświetla dane. Innym razem warstwa interfejsu produkuje zdarzenia wejściowe użytkownika i wykorzystują je inne warstwy hierarchii. Warstwy między producentem a konsumentem zazwyczaj działają jako pośrednicy, którzy modyfikują strumień danych, aby dostosować go do wymagań następnej warstwy.

Tworzenie przepływu

Do tworzenia przepływów używaj interfejsów API kreatora przepływów. Funkcja kreatora flow tworzy nowy przepływ, w którym możesz ręcznie wysyłać nowe wartości do strumienia danych za pomocą funkcji emit.

W tym przykładzie źródło danych automatycznie pobiera najnowsze wiadomości w stałych odstępach czasu. Funkcja zawieszania nie może zwrócić wielu następujących po sobie wartości, dlatego źródło danych tworzy i zwraca przepływ spełniający to wymaganie. W tym przypadku źródłem danych jest producent.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Kreator flow jest wykonywany w współprogramie. Zapewnia on korzyści związane z tymi samymi asynchronicznymi interfejsami API, ale istnieją pewne ograniczenia:

  • Przepływy są sekwencyjne. Ponieważ producent znajduje się w korutynie, podczas wywoływania funkcji zawieszania producent zawiesza się do momentu, gdy funkcja zawieszania zostanie zwrócona. W tym przykładzie producent zawiesza się do czasu zakończenia żądania sieciowego fetchLatestNews. Dopiero wtedy wynik jest przesyłany do strumienia.
  • W konstruktorze flow producent nie może emit wartości z innego obiektu CoroutineContext. Dlatego nie wywołuj funkcji emit w innym miejscu CoroutineContext przez tworzenie nowych współprogramów ani za pomocą bloków kodu withContext. W takich przypadkach możesz używać innych kreatorów przepływów, takich jak callbackFlow.

Modyfikowanie strumienia

Pośrednicy mogą używać operatorów pośrednich do modyfikowania strumienia danych bez konieczności zużywania wartości. Te operatory to funkcje, które po zastosowaniu do strumienia danych konfigurują łańcuch operacji, które nie są wykonywane do czasu wykorzystania tych wartości w przyszłości. Więcej informacji o operatorach pośrednich znajdziesz w dokumentacji przepływu.

W poniższym przykładzie warstwa repozytorium używa operatora pośredniego map do przekształcenia danych wyświetlanych w View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Operatory pośrednie można stosować jeden po drugim, tworząc łańcuch operacji, które są wykonywane leniwie, gdy element emitowany jest. Pamiętaj, że samo zastosowanie w strumieniu operatora pośredniego nie uruchamia zbierania danych.

Zbieranie danych z przepływu

Aby aktywować proces i zacząć nasłuchiwać wartości, użyj operatora terminala. Aby pobierać wszystkie emitowane wartości w strumieniu, użyj funkcji collect. Więcej informacji o operatorach terminali znajdziesz w oficjalnej dokumentacji procesu.

collect to funkcja zawieszania, więc musi być wykonywana w korektynie. Przyjmuje ona lambda jako parametr, który jest wywoływany przy każdej nowej wartości. Ponieważ jest to funkcja zawieszania, kohorta wywołująca collect może zawiesić działanie do czasu zamknięcia przepływu.

Nawiązując do poprzedniego przykładu, oto prosta implementacja interfejsu ViewModel korzystającego z danych z warstwy repozytorium:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Zbieranie tych informacji powoduje, że producent odświeża najnowsze wiadomości i w ustalonym czasie wysyła wynik żądania sieciowego. Ponieważ producent pozostaje zawsze aktywny w pętli while(true), strumień danych zostanie zamknięty po wyczyszczeniu obiektu ViewModel i anulowaniu metody viewModelScope.

Zbieranie przepływu może zostać zatrzymane z tych powodów:

  • Gromadząca współpraca została anulowana, jak pokazano w poprzednim przykładzie. Spowoduje to także zatrzymanie producenta.
  • Producent kończy wysyłanie przedmiotów. W takim przypadku strumień danych zostanie zamknięty, a współpraca o nazwie collect wznowi wykonanie.

Przepływy są zimne i leniwe, chyba że zostały określone przy użyciu innych operatorów pośrednich. Oznacza to, że kod producenta jest wykonywany przy każdym wywołaniu operatora terminala w procesie. W poprzednim przykładzie zastosowanie wielu kolektorów przepływu powoduje, że źródło danych pobiera najnowsze wiadomości wiele razy w różnych stałych odstępach czasu. Aby zoptymalizować i udostępniać przepływ, gdy wielu klientów zbiera dane w tym samym czasie, użyj operatora shareIn.

Wykrywanie nieoczekiwanych wyjątków

Implementacja producenta może pochodzić z biblioteki zewnętrznej. Oznacza to, że może zawierać nieoczekiwane wyjątki. Aby obsługiwać te wyjątki, użyj operatora pośredniego catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

W poprzednim przykładzie, gdy wystąpi wyjątek, lambda collect nie jest wywoływana, ponieważ nie otrzymano nowego elementu.

catch może też emit elementów w procesie. Przykładowa warstwa repozytorium może emit wczytać wartości z pamięci podręcznej:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

W tym przykładzie, gdy wystąpi wyjątek, wywoływana jest funkcja lambda collect, ponieważ ten wyjątek został wyemitowany do strumienia.

Wykonywanie w innym kontekście Coroutine

Domyślnie producent konstruktora flow uruchamia się w CoroutineContextkoordynacji, która pobiera z niej dane, i jak już wspomniano, nie może emit wartości z innej wartości CoroutineContext. W niektórych przypadkach takie zachowanie może być niepożądane. Na przykład w przykładach użytych w tym temacie warstwa repozytorium nie powinna wykonywać operacji na obiekcie Dispatchers.Main używanym przez viewModelScope.

Aby zmienić CoroutineContext przepływu, użyj operatora pośredniego flowOn. flowOn zmienia CoroutineContext w procesie nadrzędnym, co oznacza producenta i wszystkie operatory pośrednie zastosowane przed (lub wyżej) flowOn. Nie dotyczy to przepływu w dół (operatorów pośrednich po flowOn wraz z konsumentem) i jest wykonywany w elemencie CoroutineContext używanym do collect z procesu. Jeśli jest kilka operatorów flowOn, każdy z nich zmienia poprzednią lokalizację w stosunku do bieżącej lokalizacji.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Z tym kodem operatory onEach i map używają defaultDispatcher, podczas gdy operator catch i konsument są wykonywane na platformie Dispatchers.Main, z której korzysta viewModelScope.

Gdy warstwa źródła danych wykonuje operacje wejścia-wyjścia, użyj dyspozytora zoptymalizowanego pod kątem operacji wejścia-wyjścia:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Przepływy w bibliotekach Jetpack

Aplikacja Flow jest zintegrowana z wieloma bibliotekami Jetpack i jest popularna wśród bibliotek zewnętrznych na Androida. Flow doskonale sprawdzi się w przypadku aktualizacji danych bieżących i niekończących się strumieni danych.

Funkcja Flow with Room pozwala otrzymywać powiadomienia o zmianach w bazie danych. W przypadku korzystania z obiektów dostępu do danych zwróć typ Flow, aby otrzymywać aktualizacje na żywo.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Za każdym razem, gdy następuje zmiana w tabeli Example, w bazie danych jest wysyłana nowa lista z nowymi elementami.

Przekształć interfejsy API oparte na wywołaniach zwrotnych na przepływy

callbackFlow to kreator przepływów, który umożliwia konwertowanie interfejsów API opartych na wywołaniach zwrotnych w przepływy. Na przykład interfejsy API na Androida Firebase Firestore używają wywołań zwrotnych.

Aby przekonwertować te interfejsy API na przepływy i nasłuchiwać aktualizacji bazy danych Firestore, możesz użyć tego kodu:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

W przeciwieństwie do konstruktora flow callbackFlow umożliwia wysyłanie wartości z innego obiektu CoroutineContext za pomocą funkcji send lub poza współpracą za pomocą funkcji trySend.

Wewnętrznie callbackFlow używa kanału, który jest koncepcyjnie bardzo podobny do kolejki blokującej. Kanał ma skonfigurowaną pojemność, czyli maksymalną liczbę elementów, które mogą być buforowane. Kanał utworzony w callbackFlow ma domyślnie pojemność 64 elementów. Gdy próbujesz dodać nowy element do pełnego kanału, send zawiesza producenta do chwili, w którym znajdzie się miejsce na nowy element, natomiast offer nie dodaje go do kanału i od razu zwraca wartość false.

Dodatkowe materiały na temat przepływu