Kotlin-Abläufe unter Android

In Koroutinen ist ein Ablauf ein Typ, der mehrere Werte nacheinander ausgeben kann, im Gegensatz zu Sperrfunktionen, die nur einen einzelnen Wert zurückgeben. Sie können beispielsweise einen Ablauf verwenden, um Live-Updates von einer Datenbank zu empfangen.

Abläufe basieren auf Koroutinen und können mehrere Werte bereitstellen. Ein Ablauf ist konzeptionell ein Datenstrom, der asynchron berechnet werden kann. Die ausgegebenen Werte müssen denselben Typ haben. Ein Flow<Int> ist beispielsweise ein Fluss, der ganzzahlige Werte ausgibt.

Ein Ablauf ist einem Iterator sehr ähnlich, der eine Folge von Werten erzeugt. Er verwendet jedoch Beendigungsfunktionen, um Werte asynchron zu erzeugen und zu verwenden. Dies bedeutet zum Beispiel, dass der Fluss sicher eine Netzwerkanfrage senden kann, um den nächsten Wert zu erzeugen, ohne den Hauptthread zu blockieren.

An Datenstreams sind drei Entitäten beteiligt:

  • Ein Produzent erzeugt Daten, die dem Stream hinzugefügt werden. Dank Koroutinen können Datenflüsse auch asynchron erzeugt werden.
  • (Optional) Vermittler können jeden Wert ändern, der in den Stream oder den Stream selbst ausgegeben wird.
  • Ein Nutzer bezieht die Werte aus dem Stream.

an Datenströmen beteiligte Rechtspersönlichkeiten; Verbraucher, optionale Vermittler und Hersteller
Abbildung 1. Entitäten, die an Datenstreams beteiligt sind: Nutzer, optionale Vermittler und Ersteller.

In Android ist ein Repository in der Regel ein Ersteller von UI-Daten, dessen Benutzeroberfläche (UI) der Nutzer ist, der die Daten letztendlich anzeigt. In anderen Fällen generiert die UI-Ebene Nutzereingabeereignisse, die von anderen Ebenen der Hierarchie verarbeitet werden. Schichten zwischen Ersteller und Nutzer fungieren in der Regel als Vermittler, die den Datenstrom ändern, um ihn an die Anforderungen der folgenden Schicht anzupassen.

Ablauf erstellen

Verwenden Sie zum Erstellen von Abläufen die Flow Builder APIs. Die Builder-Funktion flow erstellt einen neuen Ablauf, in dem Sie mit der Funktion emit manuell neue Werte in den Datenstrom übertragen können.

Im folgenden Beispiel ruft eine Datenquelle die neuesten Nachrichten automatisch in einem festen Intervall ab. Da eine Sperrfunktion nicht mehrere aufeinanderfolgende Werte zurückgeben kann, erstellt die Datenquelle einen Ablauf und gibt diesen zurück, um diese Anforderung zu erfüllen. In diesem Fall agiert die Datenquelle als Ersteller.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Der flow-Builder wird innerhalb einer Koroutine ausgeführt. Daher profitiert er von denselben asynchronen APIs, es gelten jedoch einige Einschränkungen:

  • Abläufe sind sequentiell. Da sich der Ersteller in einer Koroutine befindet, hält der Ersteller beim Aufrufen einer Beendigungsfunktion an, bis die Unterbrechungsfunktion zurückgegeben wird. In diesem Beispiel sperrt der Producer den Vorgang, bis die fetchLatestNews-Netzwerkanfrage abgeschlossen ist. Nur dann wird das Ergebnis an den Stream ausgegeben.
  • Mit dem flow-Builder kann der Ersteller keine emit-Werte aus einer anderen CoroutineContext abrufen. Rufen Sie daher emit nicht in einem anderen CoroutineContext auf, indem Sie neue Koroutinen erstellen oder withContext-Codeblöcke verwenden. In diesen Fällen können Sie andere Ablaufgeneratoren wie callbackFlow verwenden.

Stream ändern

Vermittler können Zwischenoperatoren verwenden, um den Datenstrom zu ändern, ohne die Werte zu verarbeiten. Diese Operatoren sind Funktionen, die, wenn sie auf einen Datenstream angewendet werden, eine Kette von Vorgängen einrichten, die erst ausgeführt werden, wenn die Werte in der Zukunft verbraucht wurden. Weitere Informationen zu Zwischenoperatoren finden Sie in der Referenzdokumentation für den Fluss.

Im folgenden Beispiel verwendet die Repository-Ebene den Zwischenoperator map, um die Daten zu transformieren, die im View angezeigt werden sollen:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Zwischenoperatoren können nacheinander angewendet werden und eine Kette von Vorgängen bilden, die verzögert ausgeführt werden, wenn ein Element in den Ablauf ausgegeben wird. Durch die einfache Anwendung eines Zwischenoperators auf einen Stream wird die Ablauferfassung nicht gestartet.

Aus einem Ablauf erfassen

Verwenden Sie einen Terminaloperator, um den Ablauf auszulösen und mit dem Warten auf Werte zu beginnen. Mit collect können Sie alle Werte im Stream abrufen, sobald sie ausgegeben werden. Weitere Informationen zu Terminaloperatoren finden Sie in der offiziellen Ablaufdokumentation.

Da collect eine Beendigungsfunktion ist, muss sie innerhalb einer Koroutine ausgeführt werden. Dabei wird eine Lambda-Funktion als Parameter verwendet, der bei jedem neuen Wert aufgerufen wird. Da es sich um eine Unterbrechungsfunktion handelt, kann die Koroutine, die collect aufruft, angehalten werden, bis der Ablauf geschlossen wird.

Ausgehend vom vorherigen Beispiel ist hier eine einfache Implementierung einer ViewModel, die die Daten aus der Repository-Ebene nutzt:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Durch das Erfassen des Ablaufs wird der Producer ausgelöst, der die neuesten Nachrichten aktualisiert und das Ergebnis der Netzwerkanfrage in einem festen Intervall ausgibt. Da der Ersteller mit der while(true)-Schleife immer aktiv bleibt, wird der Datenstrom geschlossen, wenn ViewModel gelöscht und viewModelScope abgebrochen wird.

Die Ablauferfassung kann aus folgenden Gründen beendet werden:

  • Die Koroutine, die erfasst wird, wird abgebrochen, wie im vorherigen Beispiel gezeigt. Dadurch wird auch der zugrunde liegende Producer gestoppt.
  • Der Produzent beendet die Ausgabe von Elementen. In diesem Fall wird der Datenstream geschlossen und die Koroutine collect setzt die Ausführung fort.

Datenflüsse sind kalt und lazy, sofern nicht mit anderen Zwischenoperatoren angegeben. Das bedeutet, dass der Producer-Code jedes Mal ausgeführt wird, wenn ein Terminaloperator im Ablauf aufgerufen wird. Im vorherigen Beispiel führt die Verwendung mehrerer Fluss-Collectors dazu, dass die Datenquelle die neuesten Nachrichten mehrmals in verschiedenen festen Intervallen abruft. Verwenden Sie den Operator shareIn, um einen Ablauf zu optimieren und freizugeben, wenn mehrere Nutzer gleichzeitig Daten erheben.

Unerwartete Ausnahmen abfangen

Die Implementierung des Erstellers kann aus der Bibliothek eines Drittanbieters stammen. Dies bedeutet, dass unerwartete Ausnahmen ausgelöst werden können. Für diese Ausnahmen verwenden Sie den Zwischenoperator catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Im vorherigen Beispiel wird bei einer Ausnahme das Lambda collect nicht aufgerufen, da kein neues Element empfangen wurde.

catch kann auch Elemente in den Ablauf emit aufnehmen. Die Beispiel-Repository-Ebene könnte stattdessen die im Cache gespeicherten Werte mit emit versehen:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

Wenn in diesem Beispiel eine Ausnahme auftritt, wird das Lambda collect aufgerufen, da aufgrund der Ausnahme ein neues Element in den Stream ausgegeben wurde.

Wird in einem anderen CoroutineContext ausgeführt

Der Ersteller eines flow-Builders wird standardmäßig im CoroutineContext der Koroutine ausgeführt, die aus diesem Builder erfasst wird. Wie bereits erwähnt, kann er keine emit-Werte aus einem anderen CoroutineContext-Wert emit. In einigen Fällen kann dieses Verhalten unerwünscht sein. In den Beispielen in diesem Thema sollte die Repository-Ebene beispielsweise keine Vorgänge für Dispatchers.Main ausführen, die von viewModelScope verwendet werden.

Mit dem Zwischenoperator flowOn können Sie den CoroutineContext eines Ablaufs ändern. flowOn ändert den CoroutineContext des vorgelagerten Ablaufs, d. h. der Ersteller und alle Zwischenoperatoren, die vor (oder höher) flowOn angewendet wurden. Der nachgelagerte Ablauf (die Zwischenoperatoren nach flowOn und dem Nutzer) ist nicht betroffen und wird auf der CoroutineContext ausgeführt, die für collect aus dem Ablauf verwendet wurde. Wenn es mehrere flowOn-Operatoren gibt, ändert jeder die vorgelagerte Position von seiner aktuellen Position.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Bei diesem Code verwenden die Operatoren onEach und map den Operator defaultDispatcher, während der Operator catch und der Nutzer auf dem von viewModelScope verwendeten Dispatchers.Main ausgeführt werden.

Da auf der Datenquellenebene E/A-Vorgänge ausgeführt werden, sollten Sie einen Disponenten verwenden, der für E/A-Vorgänge optimiert ist:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Abläufe in Jetpack-Bibliotheken

Flow ist in viele Jetpack-Bibliotheken integriert und unter Android-Drittanbieterbibliotheken beliebt. Der Fluss eignet sich hervorragend für Live-Datenaktualisierungen und endlose Datenströme.

Sie können Ablauf mit Raum verwenden, um über Änderungen in einer Datenbank informiert zu werden. Wenn Sie Datenzugriffsobjekte (Data Access Objects, DAO) verwenden, geben Sie einen Flow-Typ zurück, um Liveaktualisierungen zu erhalten.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Bei jeder Änderung in der Tabelle Example wird eine neue Liste mit den neuen Elementen in der Datenbank ausgegeben.

Callback-basierte APIs in Abläufe konvertieren

callbackFlow ist ein Ablauf-Builder, mit dem Sie Callback-basierte APIs in Abläufe konvertieren können. Die Android APIs von Firebase Firestore verwenden beispielsweise Callbacks.

Mit dem folgenden Code können Sie diese APIs in Abläufe konvertieren und auf Firestore-Datenbankaktualisierungen warten:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

Im Gegensatz zum flow-Builder lässt callbackFlow zu, dass Werte aus einer anderen CoroutineContext mit der Funktion send oder außerhalb einer Koroutine mit der Funktion trySend ausgegeben werden.

callbackFlow verwendet intern einen Kanal, der konzeptionell einer blockierenden Warteschlange sehr ähnlich ist. Ein Kanal wird mit einer Kapazität konfiguriert, der maximalen Anzahl von Elementen, die zwischengespeichert werden können. Der in callbackFlow erstellte Kanal hat eine Standardkapazität von 64 Elementen. Wenn Sie versuchen, einem vollständigen Kanal ein neues Element hinzuzufügen, sperrt send den Produzenten so lange, bis Platz für das neue Element vorhanden ist. trySend fügt das Element hingegen nicht dem Kanal hinzu und gibt sofort false zurück.

trySend fügt das angegebene Element sofort dem Kanal hinzu, sofern dies nicht gegen die Kapazitätsbeschränkungen verstößt, und gibt dann das erfolgreiche Ergebnis zurück.

Zusätzliche Ablaufressourcen