Kotlin na Androidzie

W współrzędnych przepływ to typ, który może wysyłać wiele wartości sekwencyjnie, w przeciwieństwie do zawieszania funkcji, które zwracają tylko pojedynczą wartość. Możesz na przykład użyć procesu, aby odbierać na żywo z bazy danych.

Przepływy są oparte na współrzędnych i mogą przyjmować wiele wartości. Przepływ to strumień danych, który można obliczyć. asynchronicznie. Wyemitowane wartości muszą być tego samego typu. Dla: Na przykład Flow<Int> to przepływ, który emituje wartości całkowite.

Przepływ jest bardzo podobny do Iterator, który tworzy sekwencję ale wykorzystuje funkcje zawieszania do generowania i konsumpcji wartości asynchronicznie. Oznacza to na przykład, że proces może bezpiecznie utworzyć żądania sieciowego do wygenerowania następnej wartości bez blokowania głównego w wątku.

W strumieniach danych są powiązane 3 encje:

  • Producent tworzy dane, które są dodawane do strumienia. Dzięki w przepływach, przepływy mogą również generować dane asynchronicznie.
  • (Opcjonalnie) Pośrednicy mogą modyfikować każdą wartość przesyłaną do lub sam strumień.
  • Konsument pobiera wartości ze strumienia.

podmioty zaangażowane w strumienia danych; konsument, opcjonalny
              pośrednicy oraz producent
Rysunek 1. Podmioty związane ze strumieniami danych: konsumenta, opcjonalnie pośredników i producenta.

Na Androidzie repozytorium to jest zwykle producentem danych interfejsu, przy czym interfejs użytkownika który ostatecznie pokazuje dane. Innym razem warstwa interfejsu użytkownika zdarzenia wejściowe użytkownika oraz inne warstwy hierarchii są wykorzystywane. Warstwy w między producentem a konsumentem zwykle działają jako pośrednicy, którzy modyfikują strumienia danych, aby dostosować go do wymagań następnej warstwy.

Tworzenie przepływu

Aby utworzyć przepływy, użyj funkcji kreator przepływu API. Funkcja kreatora flow tworzy nowy proces, w którym można ręcznie emitują nowe wartości do strumienia danych za pomocą funkcji emit. .

W poniższym przykładzie źródło danych pobiera najnowsze wiadomości automatycznie w ustalonych odstępach czasu. Ponieważ funkcja zawieszania nie może zwraca wiele kolejnych wartości, źródło danych tworzy i zwraca aby spełnić to wymaganie. W takim przypadku źródło danych działa jako producent.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

Kreator flow jest wykonywany we współpracy. Z tego powodu przynosi korzyści z tych samych asynchronicznych interfejsów API, ale obowiązują pewne ograniczenia:

  • Przepływy są sekwencyjne. Producent gra w współudziałie, dlatego podczas wywoływania funkcji zawieszenia, producenta zawiesza się do czasu zakończenia funkcji „powrót karetki”. W tym przykładzie producent zawiesza się do fetchLatestNews żądania sieciowe. Dopiero wtedy wynik jest wysyłany do strumienia.
  • Za pomocą konstruktora flow producent nie może określić wartości emit z parametru inny CoroutineContext. Dlatego nie wywołaj adresu emit w innym CoroutineContext, tworząc nowe współrzędne lub za pomocą funkcji withContext bloki kodu. Możesz użyć innych kreatorów przepływu, takich jak callbackFlow w takich przypadkach.

Modyfikowanie strumienia

Pośrednicy mogą używać operatorów pośrednich do modyfikowania strumienia danych bez zużywania tych wartości. Operatory te to funkcje, które podczas stosowane do strumienia danych, utwórz łańcuch działań, które nie są jest wykonywana aż do momentu wykorzystania wartości w przyszłości. Więcej informacji o operatorów pośrednich w argumencie Dokumentacja procesu.

W poniższym przykładzie warstwa repozytorium używa operatora pośredniego map aby przekształcić dane wyświetlane w View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Operatory pośrednie można stosować jeden po drugim, tworząc łańcuch operacji, które są wykonywane leniwie, gdy element jest emitowany do przepływu danych. Pamiętaj, że samo zastosowanie operatora pośredniego do strumienia nie rozpoczynają zbierania danych.

Gromadzenie danych

Użyj operatora terminala, aby uruchomić proces nasłuchiwania . Aby pobrać wszystkie wartości ze strumienia w miarę ich emitowania, użyj funkcji collect Więcej informacji o operatorach terminala znajdziesz w oficjalną dokumentację procesu.

collect jest funkcją zawieszania, więc musi zostać wykonana w ciągu współpracą. Wywoływana jest funkcja lambda. z każdą nową wartością. Ponieważ jest to funkcja zawieszenia, współrzędna, połączenia z collect mogą zostać zawieszone do czasu zakończenia procesu.

Kontynuując poprzedni przykład, oto prosta implementacja obiekt ViewModel pobiera dane z warstwy repozytorium:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Zadbaj o dynamikę, która stymuluje producenta, który odświeża najświeższe wiadomości. i generuje wynik żądania sieciowego w stałym przedziale czasu. Jako Producer pozostaje zawsze aktywny z pętlą while(true), strumień danych zostanie zamkniętych po wyczyszczeniu modelu widoku danych, Wydarzenie viewModelScope zostało anulowane.

Zbieranie przepływów może zostać zatrzymane z tych powodów:

  • Współorganizacja, która zbiera dane, jest anulowana, jak pokazano w poprzednim przykładzie. Spowoduje to też zatrzymanie producenta.
  • Producent kończy wydawanie produktów. W tym przypadku strumień danych zostanie zamknięty, a współpracownik o nazwie collect wznowi wykonywanie zadania.

Przepływy są zimne i leniwe, chyba że określono inaczej w inny sposób pośredni . Oznacza to, że kod producenta jest wykonywany za każdym razem, operator terminala jest wywoływany w trakcie przepływu. W poprzednim przykładzie gdy jest kilka kolektorów przepływu, źródło danych pobiera najnowsze wiadomości wielokrotnie w różnych odstępach czasu. Aby zoptymalizować korzystają z tej samej struktury, w której różni odbiorcy zbierają dane w tym samym czasie, używaj shareIn.

Wychwytywanie nieoczekiwanych wyjątków

Implementacja producenta może pochodzić z biblioteki zewnętrznej. Oznacza to, że może on zgłaszać nieoczekiwane wyjątki. Aby rozwiązać ten problem: wyjątków, użyj funkcji catch pośrednim.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

W poprzednim przykładzie w wyjątku, collect Metoda lambda nie jest wywoływana, ponieważ nie otrzymano nowego elementu.

catch może również emit elementu ścieżki. Przykładowe repozytorium warstwa może zamiast tego emit wartości z pamięci podręcznej:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

W tym przykładzie, gdy wystąpi wyjątek, lambda collect ma postać , ponieważ do strumienia został wysłany nowy element z powodu wyjątek.

Wykonywanie w innym kontekście CoroutineContext

Domyślnie producent konstruktora flow wykonuje polecenie CoroutineContext współpracy, która z niej gromadzi, oraz wcześniej nie można emit wartości z innego CoroutineContext W niektórych przypadkach takie zachowanie może być niepożądane. Na przykład w przykładach używanych w tym temacie repozytorium warstwa nie powinna wykonywać operacji na elemencie Dispatchers.Main, który jest używany przez usługę viewModelScope.

Aby zmienić CoroutineContext przepływu, użyj operatora pośredniego flowOn flowOn zmienia CoroutineContext procesu nadrzędnego, co oznacza producenta i wszelkich operatorów pośrednich stosowanych przed nim (lub powyżej); flowOn przebieg procesu uzupełniającego (operatory pośrednie po flowOn, wraz z konsumentem) nie wpływa na nią i jest wykonywana CoroutineContext przyzwyczaiło się do collect z procesu. Jeśli wielu operatorów flowOn, każdy z nich zmienia ruch nadrzędny względem jego bieżącej lokalizacji.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

W przypadku tego kodu operatory onEach i map używają defaultDispatcher, natomiast operator catch i konsument są wykonywane na Dispatchers.Main zużyte przez: viewModelScope.

Ponieważ warstwa źródła danych wykonuje operacje wejścia-wyjścia, należy użyć dyspozytora, zoptymalizowaną pod kątem operacji wejścia-wyjścia:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Przepływy w bibliotekach Jetpack

Flow jest zintegrowany z wieloma bibliotekami Jetpack i cieszy się popularnością Biblioteki zewnętrzne na Androida. Flow idealnie nadaje się do aktualizacji danych na żywo i niekończących się strumieni danych.

Za pomocą Fryzjer z pokojem otrzymywania powiadomień o zmianach w bazie danych. W przypadku użycia funkcji obiekty dostępu do danych (DAO), zwraca typ Flow, aby uzyskać aktualizacje na żywo.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Za każdym razem, gdy nastąpi zmiana w tabeli Example, wysyłana jest nowa lista z nowymi elementami w bazie danych.

Przekształcanie interfejsów API opartych na wywołaniach zwrotnych na przepływy

callbackFlow to kreator przepływu, który umożliwia przekształcanie interfejsów API opartych na wywołaniach zwrotnych na przepływy. Na przykład Firebase Firestore Interfejsy API Androida używają wywołań zwrotnych.

Aby przekonwertować te interfejsy API na przepływy i nasłuchiwać aktualizacji bazy danych Firestore, może użyć tego kodu:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

W przeciwieństwie do kreatora flow, callbackFlow pozwala na emitowanie wartości z innego parametru CoroutineContext z wartością send lub poza współrzędną z parametrem trySend .

Wewnętrznie usługa callbackFlow używa channel, które koncepcyjnie jest bardzo podobne do blokowania, queue. Kanał jest skonfigurowany z pojemnością, czyli maksymalną liczbą elementów. które można buforować. Kanał utworzony w callbackFlow ma domyślny z 64 elementów. Kiedy próbujesz dodać nowy element do pełnego obrazu, kanał send zawiesza producenta do czasu, aż pojawi się miejsce , natomiast trySend nie dodaje go do kanału i zwraca natychmiast false.

trySend natychmiast dodaje określony element do kanału, tylko wtedy, gdy nie zostanie naruszone jego ograniczenia pojemności, a następnie zwraca .

Dodatkowe materiały dotyczące przepływu