Kotlin-Abläufe unter Android

In Koroutinen ist ein Fluss ein Typ, der mehrere Werte nacheinander ausgeben kann, im Gegensatz zu Sperrfunktionen, die nur einen einzelnen Wert zurückgeben. Mit einem Ablauf können Sie beispielsweise Live-Updates von einer Datenbank empfangen.

Abläufe basieren auf Koroutinen und können mehrere Werte bereitstellen. Ein Fluss ist konzeptionell ein Datenstrom, der asynchron berechnet werden kann. Die ausgegebenen Werte müssen vom gleichen Typ sein. Ein Flow<Int> ist beispielsweise ein Ablauf, der Ganzzahlwerte ausgibt.

Ein Ablauf ist einem Iterator sehr ähnlich, der eine Folge von Werten erzeugt, aber Sperrfunktionen verwenden, um Werte asynchron zu erzeugen und zu nutzen. Dies bedeutet beispielsweise, dass der Ablauf sicher eine Netzwerkanfrage senden kann, um den nächsten Wert zu erzeugen, ohne den Hauptthread zu blockieren.

An Datenstreams sind drei Entitäten beteiligt:

  • Ein Producer erzeugt Daten, die dem Stream hinzugefügt werden. Dank Koroutinen können Datenflüsse auch asynchron erzeugt werden.
  • Vermittler(optional) können jeden Wert ändern, der in den Stream oder den Stream selbst ausgegeben wird.
  • Ein Nutzer nimmt die Werte aus dem Stream auf.

an Datenstreams beteiligte Rechtssubjekte; Verbraucher, optionale Vermittler und Ersteller
Abbildung 1. Entitäten, die an Datenstreams beteiligt sind: Nutzer, optionale Mittler und Producer.

In Android ist ein Repository in der Regel ein Ersteller von UI-Daten mit der Benutzeroberfläche als Nutzer, der die Daten letztendlich anzeigt. In anderen Fällen erzeugt die UI-Ebene Nutzereingabeereignisse, die von anderen Hierarchieebenen verarbeitet werden. Ebenen zwischen dem Ersteller und dem Nutzer fungieren in der Regel als Vermittler, die den Datenstrom an die Anforderungen der folgenden Ebene anpassen.

Ablauf erstellen

Verwenden Sie zum Erstellen von Abläufen die Flow Builder-APIs. Die Builder-Funktion flow erstellt einen neuen Ablauf, in dem Sie mithilfe der Funktion emit manuell neue Werte in den Datenstream ausgeben können.

Im folgenden Beispiel ruft eine Datenquelle die neuesten Nachrichten automatisch in einem festen Intervall ab. Da eine Anhalten-Funktion nicht mehrere aufeinanderfolgende Werte zurückgeben kann, erstellt und gibt die Datenquelle einen Ablauf zurück, um diese Anforderung zu erfüllen. In diesem Fall fungiert die Datenquelle als Ersteller.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Der flow-Builder wird innerhalb einer Koroutine ausgeführt. Daher werden dieselben asynchronen APIs verwendet, es gelten jedoch einige Einschränkungen:

  • Abläufe sind sequentiell. Da sich der Producer in einer Koroutine befindet, hält er beim Aufrufen einer Anhaltenfunktion den Vorgang an, bis die Funktion wieder aktiviert wird. Im Beispiel hält der Producer an, bis die fetchLatestNews-Netzwerkanfrage abgeschlossen ist. Nur dann wird das Ergebnis an den Stream gesendet.
  • Mit dem flow-Builder kann der Ersteller keine Werte aus einem anderen CoroutineContext emit hinzufügen. Rufen Sie emit daher nicht in einem anderen CoroutineContext auf. Erstellen Sie dazu neue Koroutinen oder verwenden Sie withContext-Codeblöcke. In diesen Fällen können Sie andere Ablauferstellungstools wie callbackFlow verwenden.

Stream ändern

Mittler können Zwischenoperatoren verwenden, um den Datenstrom zu ändern, ohne die Werte zu verbrauchen. Diese Operatoren sind Funktionen, die bei Anwendung auf einen Datenstrom eine Kette von Vorgängen bilden, die erst dann ausgeführt werden, wenn die Werte in Zukunft verarbeitet werden. Weitere Informationen zu Operatoren mittlerer Stufe finden Sie in der Referenzdokumentation zum Ablauf.

Im folgenden Beispiel verwendet die Repository-Ebene den Zwischenoperator map, um die Daten zu transformieren, die auf View angezeigt werden sollen:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Zwischenoperatoren können nacheinander angewendet werden und bilden eine Kette von Vorgängen, die verzögert ausgeführt werden, wenn ein Element in den Ablauf übergeben wird. Beachten Sie, dass durch das Anwenden eines Zwischenoperators auf einen Stream nicht die Flusserfassung gestartet wird.

Daten aus einem Ablauf erfassen

Verwenden Sie einen Terminal-Operator, um den Ablauf auszulösen und mit dem Monitoring von Werten zu beginnen. Mit collect können Sie alle Werte im Stream abrufen, sobald sie ausgegeben werden. Weitere Informationen zu Terminaloperatoren finden Sie in der offiziellen Ablaufdokumentation.

Da collect eine Anhaltenfunktion ist, muss sie innerhalb einer Koroutine ausgeführt werden. Sie verwendet eine Lambda-Funktion als Parameter, der bei jedem neuen Wert aufgerufen wird. Da es sich um eine Anhaltefunktion handelt, kann die Koroutine, die collect aufruft, angehalten, bis der Ablauf geschlossen wird.

Ausgehend vom vorherigen Beispiel hier eine einfache Implementierung eines ViewModel, der die Daten aus der Repository-Ebene verwendet:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Das Erfassen des Datenflusses löst den Producer aus, der die neuesten Nachrichten aktualisiert und das Ergebnis der Netzwerkanfrage in einem festen Intervall ausgibt. Da der Producer immer mit der while(true)-Schleife aktiv bleibt, wird der Datenstrom geschlossen, wenn ViewModel gelöscht und viewModelScope abgebrochen wird.

Die Ablauferfassung kann aus folgenden Gründen beendet werden:

  • Die gesammelte Koroutine wird abgebrochen, wie im vorherigen Beispiel gezeigt. Dadurch wird auch der zugrunde liegende Producer gestoppt.
  • Der Produzent schließt die Ausgabe der Elemente ab. In diesem Fall wird der Datenstrom geschlossen und die Koroutine, die collect aufgerufen hat, setzt die Ausführung fort.

Abläufe sind kalt und lazy, sofern nicht mit anderen Zwischenoperatoren angegeben. Das bedeutet, dass der Producer-Code jedes Mal ausgeführt wird, wenn ein Terminaloperator im Ablauf aufgerufen wird. Im vorherigen Beispiel führt die Datenquelle mit mehreren Fluss-Collectors dazu, dass die Datenquelle die neuesten Nachrichten mehrmals in verschiedenen festen Intervallen abruft. Verwenden Sie den Operator shareIn, um einen Ablauf zu optimieren und freizugeben, wenn mehrere Nutzer gleichzeitig Daten erheben.

Unerwartete Ausnahmen abfangen

Die Implementierung des Producers kann aus der Bibliothek eines Drittanbieters stammen. Das bedeutet, dass unerwartete Ausnahmen ausgelöst werden können. Verwenden Sie den Zwischenoperator catch, um diese Ausnahmen zu verarbeiten.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Wenn im vorherigen Beispiel eine Ausnahme auftritt, wird das Lambda collect nicht aufgerufen, da kein neues Element empfangen wurde.

catch kann auch Elemente im Ablauf emit. Die Beispiel-Repository-Ebene könnte stattdessen die im Cache gespeicherten Werte mit emit versehen:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

In diesem Beispiel wird beim Auftreten einer Ausnahme das Lambda collect aufgerufen, da aufgrund der Ausnahme ein neues Element an den Stream ausgegeben wurde.

In einem anderen CoroutineContext ausführen

Standardmäßig wird der Ersteller eines flow-Builders in der CoroutineContext der Koroutine ausgeführt, die daraus erfasst. Wie bereits erwähnt, kann er keine Werte aus einem anderen CoroutineContext-Wert emit. Dies ist möglicherweise nicht erwünscht. In den Beispielen, die in diesem Thema verwendet werden, sollte die Repository-Ebene beispielsweise keine Vorgänge für Dispatchers.Main ausführen, die von viewModelScope verwendet werden.

Verwenden Sie zum Ändern der CoroutineContext eines Ablaufs den Zwischenoperator flowOn. flowOn ändert den CoroutineContext des vorgelagerten Ablaufs, d. h. den Producer und alle Zwischenoperatoren, die vor (oder darüber) flowOn angewendet werden. Der nachgelagerte Ablauf (die Zwischenoperatoren nach flowOn und der Nutzer) ist nicht betroffen und wird vom CoroutineContext bis collect ausgeführt. Wenn es mehrere flowOn-Operatoren gibt, ändert jeder den Upstream gegenüber seinem aktuellen Standort.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Mit diesem Code verwenden die Operatoren onEach und map den defaultDispatcher. Der Operator catch und der Nutzer werden hingegen auf Dispatchers.Main ausgeführt, das von viewModelScope verwendet wird.

Da die Datenquellenebene für E/A-Vorgänge zuständig ist, sollten Sie einen Disponenten verwenden, der für E/A-Vorgänge optimiert ist:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Abläufe in Jetpack-Bibliotheken

Flow ist in viele Jetpack-Bibliotheken eingebunden und unter Android-Drittanbieterbibliotheken beliebt. Flow eignet sich hervorragend für Live-Datenaktualisierungen und endlose Datenstreams.

Sie können Flow with Room verwenden, um über Änderungen in einer Datenbank benachrichtigt zu werden. Wenn Sie Datenzugriffsobjekte (Data Access Objects, DAO) verwenden, geben Sie den Typ Flow zurück, um Liveaktualisierungen zu erhalten.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Jedes Mal, wenn eine Änderung in der Tabelle Example erfolgt, wird eine neue Liste mit den neuen Elementen in der Datenbank ausgegeben.

Callback-basierte APIs in Abläufe umwandeln

callbackFlow ist ein Ablauf-Builder, mit dem Sie Callback-basierte APIs in Abläufe umwandeln können. Die Android APIs von Firebase Firestore verwenden beispielsweise Callbacks.

Mit dem folgenden Code können Sie diese APIs in Abläufe konvertieren und auf Firestore-Datenbankaktualisierungen warten:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

Im Gegensatz zum flow-Builder erlaubt callbackFlow die Ausgabe von Werten von einer anderen CoroutineContext mit der send-Funktion oder außerhalb einer Koroutine mit der trySend-Funktion.

Intern verwendet callbackFlow einen Kanal, der einer blockierenden Warteschlange konzeptionell ähnlich ist. Ein Kanal wird mit einer capacity konfiguriert, also der maximalen Anzahl von Elementen, die zwischengespeichert werden können. Der in callbackFlow erstellte Kanal hat eine Standardkapazität von 64 Elementen. Wenn Sie versuchen, einem vollständigen Kanal ein neues Element hinzuzufügen, hält send den Producer an, bis ausreichend Platz für das neue Element vorhanden ist. offer fügt das Element hingegen nicht dem Kanal hinzu und gibt sofort false zurück.

Zusätzliche Ressourcen zum Ablauf