W współrzędnych przepływ to typ, który może wysyłać wiele wartości sekwencyjnie, w przeciwieństwie do zawieszania funkcji, które zwracają tylko pojedynczą wartość. Możesz na przykład użyć procesu, aby odbierać na żywo z bazy danych.
Przepływy są oparte na współrzędnych i mogą przyjmować wiele wartości.
Przepływ to strumień danych, który można obliczyć.
asynchronicznie. Wyemitowane wartości muszą być tego samego typu. Dla:
Na przykład Flow<Int>
to przepływ, który emituje wartości całkowite.
Przepływ jest bardzo podobny do Iterator
, który tworzy sekwencję
ale wykorzystuje funkcje zawieszania do generowania i konsumpcji wartości
asynchronicznie. Oznacza to na przykład, że proces może bezpiecznie utworzyć
żądania sieciowego do wygenerowania następnej wartości bez blokowania głównego
w wątku.
W strumieniach danych są powiązane 3 encje:
- Producent tworzy dane, które są dodawane do strumienia. Dzięki w przepływach, przepływy mogą również generować dane asynchronicznie.
- (Opcjonalnie) Pośrednicy mogą modyfikować każdą wartość przesyłaną do lub sam strumień.
- Konsument pobiera wartości ze strumienia.
Na Androidzie repozytorium to jest zwykle producentem danych interfejsu, przy czym interfejs użytkownika który ostatecznie pokazuje dane. Innym razem warstwa interfejsu użytkownika zdarzenia wejściowe użytkownika oraz inne warstwy hierarchii są wykorzystywane. Warstwy w między producentem a konsumentem zwykle działają jako pośrednicy, którzy modyfikują strumienia danych, aby dostosować go do wymagań następnej warstwy.
Tworzenie przepływu
Aby utworzyć przepływy, użyj funkcji
kreator przepływu
API. Funkcja kreatora flow
tworzy nowy proces, w którym można ręcznie
emitują nowe wartości do strumienia danych za pomocą funkcji
emit
.
.
W poniższym przykładzie źródło danych pobiera najnowsze wiadomości automatycznie w ustalonych odstępach czasu. Ponieważ funkcja zawieszania nie może zwraca wiele kolejnych wartości, źródło danych tworzy i zwraca aby spełnić to wymaganie. W takim przypadku źródło danych działa jako producent.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Kreator flow
jest wykonywany we współpracy. Z tego powodu przynosi korzyści
z tych samych asynchronicznych interfejsów API, ale obowiązują pewne ograniczenia:
- Przepływy są sekwencyjne. Producent gra w współudziałie, dlatego podczas wywoływania
funkcji zawieszenia, producenta zawiesza się do czasu zakończenia funkcji
„powrót karetki”. W tym przykładzie producent zawiesza się do
fetchLatestNews
żądania sieciowe. Dopiero wtedy wynik jest wysyłany do strumienia. - Za pomocą konstruktora
flow
producent nie może określić wartościemit
z parametru innyCoroutineContext
. Dlatego nie wywołaj adresuemit
w innymCoroutineContext
, tworząc nowe współrzędne lub za pomocą funkcjiwithContext
bloki kodu. Możesz użyć innych kreatorów przepływu, takich jakcallbackFlow
w takich przypadkach.
Modyfikowanie strumienia
Pośrednicy mogą używać operatorów pośrednich do modyfikowania strumienia danych bez zużywania tych wartości. Operatory te to funkcje, które podczas stosowane do strumienia danych, utwórz łańcuch działań, które nie są jest wykonywana aż do momentu wykorzystania wartości w przyszłości. Więcej informacji o operatorów pośrednich w argumencie Dokumentacja procesu.
W poniższym przykładzie warstwa repozytorium używa operatora pośredniego
map
aby przekształcić dane wyświetlane w View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Operatory pośrednie można stosować jeden po drugim, tworząc łańcuch operacji, które są wykonywane leniwie, gdy element jest emitowany do przepływu danych. Pamiętaj, że samo zastosowanie operatora pośredniego do strumienia nie rozpoczynają zbierania danych.
Gromadzenie danych
Użyj operatora terminala, aby uruchomić proces nasłuchiwania
. Aby pobrać wszystkie wartości ze strumienia w miarę ich emitowania, użyj funkcji
collect
Więcej informacji o operatorach terminala znajdziesz w
oficjalną dokumentację procesu.
collect
jest funkcją zawieszania, więc musi zostać wykonana w ciągu
współpracą. Wywoływana jest funkcja lambda.
z każdą nową wartością. Ponieważ jest to funkcja zawieszenia, współrzędna,
połączenia z collect
mogą zostać zawieszone do czasu zakończenia procesu.
Kontynuując poprzedni przykład, oto prosta implementacja
obiekt ViewModel
pobiera dane z warstwy repozytorium:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Zadbaj o dynamikę, która stymuluje producenta, który odświeża najświeższe wiadomości.
i generuje wynik żądania sieciowego w stałym przedziale czasu. Jako
Producer pozostaje zawsze aktywny z pętlą while(true)
, strumień
danych zostanie zamkniętych po wyczyszczeniu modelu widoku danych,
Wydarzenie viewModelScope
zostało anulowane.
Zbieranie przepływów może zostać zatrzymane z tych powodów:
- Współorganizacja, która zbiera dane, jest anulowana, jak pokazano w poprzednim przykładzie. Spowoduje to też zatrzymanie producenta.
- Producent kończy wydawanie produktów. W tym przypadku strumień danych
zostanie zamknięty, a współpracownik o nazwie
collect
wznowi wykonywanie zadania.
Przepływy są zimne i leniwe, chyba że określono inaczej w inny sposób pośredni
. Oznacza to, że kod producenta jest wykonywany za każdym razem,
operator terminala jest wywoływany w trakcie przepływu. W poprzednim przykładzie
gdy jest kilka kolektorów przepływu, źródło danych pobiera
najnowsze wiadomości wielokrotnie w różnych odstępach czasu. Aby zoptymalizować
korzystają z tej samej struktury, w której różni odbiorcy zbierają dane w tym samym czasie, używaj
shareIn
.
Wychwytywanie nieoczekiwanych wyjątków
Implementacja producenta może pochodzić z biblioteki zewnętrznej.
Oznacza to, że może on zgłaszać nieoczekiwane wyjątki. Aby rozwiązać ten problem:
wyjątków, użyj funkcji
catch
pośrednim.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
W poprzednim przykładzie w wyjątku, collect
Metoda lambda nie jest wywoływana, ponieważ nie otrzymano nowego elementu.
catch
może również emit
elementu ścieżki. Przykładowe repozytorium
warstwa może zamiast tego emit
wartości z pamięci podręcznej:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
W tym przykładzie, gdy wystąpi wyjątek, lambda collect
ma postać
, ponieważ do strumienia został wysłany nowy element z powodu
wyjątek.
Wykonywanie w innym kontekście CoroutineContext
Domyślnie producent konstruktora flow
wykonuje polecenie
CoroutineContext
współpracy, która z niej gromadzi, oraz
wcześniej nie można emit
wartości z innego
CoroutineContext
W niektórych przypadkach takie zachowanie może być niepożądane.
Na przykład w przykładach używanych w tym temacie repozytorium
warstwa nie powinna wykonywać operacji na elemencie Dispatchers.Main
, który
jest używany przez usługę viewModelScope
.
Aby zmienić CoroutineContext
przepływu, użyj operatora pośredniego
flowOn
flowOn
zmienia CoroutineContext
procesu nadrzędnego, co oznacza
producenta i wszelkich operatorów pośrednich stosowanych przed nim (lub powyżej);
flowOn
przebieg procesu uzupełniającego (operatory pośrednie po flowOn
,
wraz z konsumentem) nie wpływa na nią i jest wykonywana
CoroutineContext
przyzwyczaiło się do collect
z procesu. Jeśli
wielu operatorów flowOn
, każdy z nich zmienia ruch nadrzędny względem jego
bieżącej lokalizacji.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
W przypadku tego kodu operatory onEach
i map
używają defaultDispatcher
,
natomiast operator catch
i konsument są wykonywane na
Dispatchers.Main
zużyte przez: viewModelScope
.
Ponieważ warstwa źródła danych wykonuje operacje wejścia-wyjścia, należy użyć dyspozytora, zoptymalizowaną pod kątem operacji wejścia-wyjścia:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Przepływy w bibliotekach Jetpack
Flow jest zintegrowany z wieloma bibliotekami Jetpack i cieszy się popularnością Biblioteki zewnętrzne na Androida. Flow idealnie nadaje się do aktualizacji danych na żywo i niekończących się strumieni danych.
Za pomocą
Fryzjer z pokojem
otrzymywania powiadomień o zmianach w bazie danych. W przypadku użycia funkcji
obiekty dostępu do danych (DAO),
zwraca typ Flow
, aby uzyskać aktualizacje na żywo.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Za każdym razem, gdy nastąpi zmiana w tabeli Example
, wysyłana jest nowa lista
z nowymi elementami w bazie danych.
Przekształcanie interfejsów API opartych na wywołaniach zwrotnych na przepływy
callbackFlow
to kreator przepływu, który umożliwia przekształcanie interfejsów API opartych na wywołaniach zwrotnych na przepływy.
Na przykład Firebase Firestore
Interfejsy API Androida używają wywołań zwrotnych.
Aby przekonwertować te interfejsy API na przepływy i nasłuchiwać aktualizacji bazy danych Firestore, może użyć tego kodu:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
W przeciwieństwie do kreatora flow
, callbackFlow
pozwala na emitowanie wartości z innego parametru CoroutineContext
z wartością
send
lub poza współrzędną z parametrem
trySend
.
Wewnętrznie usługa callbackFlow
używa
channel,
które koncepcyjnie jest bardzo podobne do blokowania,
queue.
Kanał jest skonfigurowany z pojemnością, czyli maksymalną liczbą elementów.
które można buforować. Kanał utworzony w callbackFlow
ma domyślny
z 64 elementów. Kiedy próbujesz dodać nowy element do pełnego obrazu,
kanał send
zawiesza producenta do czasu, aż pojawi się miejsce
, natomiast trySend
nie dodaje go do kanału i zwraca
natychmiast false
.
trySend
natychmiast dodaje określony element do kanału,
tylko wtedy, gdy nie zostanie naruszone jego ograniczenia pojemności, a następnie zwraca
.
Dodatkowe materiały dotyczące przepływu
- Testowanie procesów Kotlin na Androidzie
StateFlow
iSharedFlow
- Dodatkowe materiały na temat współrzędnych i przepływu Kotlin