在協同程式中,「資料流」是一種可依序發出多個值的類型,而不是僅傳回單一值的「暫停函式」。例如,您可以使用資料流從資料庫接收即時更新。
資料流建構於協同程式之上,可提供多個值。從概念來看,資料流是可透過非同步方式計算的「資料串流」。發出的值必須屬於相同類型。例如,Flow<Int>
是發出整數值的資料流。
資料流與產生值序列的 Iterator
非常類似,但會使用暫停函式,以非同步方式產生和取用值。這意味著,資料流可以安全地發出網路要求,並在不封鎖主執行緒的情況下產生下一個值。
資料串流涉及三個實體:
- 生產端會產生加入串流的資料。藉助協同程式,資料流也可以非同步產生資料。
- (不一定存在) 中繼端可修改發至串流的值或串流本身。
- 取用端會從串流取用值。
在 Android 中,「存放區」通常是使用者介面資料的生產端,並將使用者介面 (UI) 做為最終顯示資料的取用端。在其他時候,使用者介面層是使用者輸入內容事件的生產端,而階層的其他層會取用這些事件。生產端和取用端之間的層,通常是中繼端。這些中繼端可以修改資料串流,使其符合下一層的要求。
建立資料流
如要建立資料流,請使用資料流建構工具 API。flow
建構工具函式會建立新的資料流。在該資料流中,您可以使用 emit
函式,手動將新值發至資料串流內。
在以下範例中,資料來源會以固定的時間間隔自動擷取最新消息。由於暫停函式無法傳回多個連續的值,所以資料來源會建立並傳回資料流,以滿足此要求。在此情況下,資料來源會做為生產端。
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
系統會在協同程式中執行 flow
建構工具。因此,它會受益於相同的非同步 API,但會受到一些限制:
- 資料流會「依序」。由於生產端位於協同程式內,所以呼叫暫停函式時,生產端會暫停,直到暫停函式傳回為止。在這個範例中,生產端會暫停,直到
fetchLatestNews
網路請求完成為止。也只有到那時,結果才會發至串流。 - 在使用
flow
建構工具時,生產端不能從其他CoroutineContext
發出emit
值。因此,請勿透過建立新的協同程式,或使用withContext
程式碼區塊,呼叫另一個CoroutineContext
中的emit
。在這些情況下,您可以使用其他資料流建構工具,例如callbackFlow
。
修改串流
中繼端可以使用「中繼運算子」修改資料串流,而不必取用值。這些運算子是函式,在套用至資料串流時,會設定一連串在未來取用值時才執行的運算子。如要進一步瞭解中繼運算子,請參閱資料流參考說明文件。
在以下範例中,存放區層使用中繼運算子 map
轉換將在 View
上顯示的資料:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
項目發至資料流時,中繼運算子可以依序套用,串連成延遲執行的運算子。請注意,如果只將中繼運算子套用至串流,則不會啟動資料流收集作業。
從資料流收集
使用「終端運算子」觸發資料流,開始監聽值。如要在發出值時取得串流中的所有值,請使用 collect
。如要進一步瞭解終端運算子,請參閱官方資料流說明文件。
由於 collect
是暫停函式,所以需要在協同程式中執行。該函式將 lambda 視為在每個新值上呼叫的參數。由於它是暫停函式,呼叫 collect
的協同程式可能會暫停,直到資料流關閉為止。
延續上一個範例,以下說明如何簡單實作從存放區層取用資料的 ViewModel
:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
收集資料流,會觸發生產端重新整理最新消息,並於固定的時間間隔發出網路請求結果。由於生產端持續使用 while(true)
迴圈,當清除 ViewModel 和取消 viewModelScope
時,資料串流將會關閉。
由於以下原因,系統可能會停止收集資料流:
- 取消了收集資料流的協同程式,如上一個範例所示。這樣做也會停止基礎生產端。
- 生產端已發完所有項目。在此情況中,資料串流會關閉,而
collect
協同程式會繼續執行。
除非指定與其他中繼運算子一起使用,否則資料流會處於「冷」和「延遲」狀態。這意味著,每次在資料流中呼叫終端運算子時,都會執行生產端程式碼。在上一個範例中,由於設定了多個資料流收集器,所以資料來源多次以不同的固定時間間隔擷取最新消息。如果有多個取用端同時收集資料流,為對資料流進行最佳化調整,並共用資料流,請使用 shareIn
運算子。
擷取非預期的例外狀況
第三方程式庫可能會實作生產端。這意味著,此做法可能會擲回非預期的例外狀況。如要處理這些例外狀況,請使用 catch
中繼運算子。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
在上一個範例中,在發生例外狀況時,由於尚未收到新項目,所以沒有呼叫 collect
lambda。
catch
也可以向資料流執行對項目的 emit
作業。範例存放區層可以emit
快取值:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
在此範例中,在發生例外狀況時,由於出現例外狀況,系統向串流發出新項目,所以會呼叫 collect
lambda。
在另一個 CoroutineContext 中執行
根據預設,flow
建構工具的生產端會在協同程式 (從該生產端收集資料流) 的 CoroutineContext
中執行,且如前所述,它不能從另一個 CoroutineContext
執行 emit
作業。在某些情況下,這樣的行為未必是您想要的。例如,在本主題使用的範例中,存放區層不應在 viewModelScope
使用的 Dispatchers.Main
上執行作業。
如要變更資料流的 CoroutineContext
,請使用中繼運算子 flowOn
。flowOn
會變更「上游資料流」的 CoroutineContext
,也就是在 flowOn
「之前」(或之上)的生產端和任何中繼運算子。「下游資料流」(flowOn
「之後」的中繼運算子,伴隨取用端) 不會受到影響,並會在從資料流執行 collect
作業的 CoroutineContext
執行。如果有多個 flowOn
運算子,每個運算子都會從其目前位置變更上游。
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
藉助這個程式碼,onEach
和 map
運算子會使用 defaultDispatcher
,而 catch
運算子和取用端會在 viewModelScope
使用的 Dispatchers.Main
上執行。
由於資料來源層執行 I/O 工作,建議您使用針對 I/O 作業進行最佳化調整的調派程式:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Jetpack 程式庫內的資料流
資料流已整合至許多 Jetpack 程式庫中,在 Android 第三方程式庫中也廣受歡迎。資料流非常適合即時資料更新和無限資料串流。
您可以使用 Flow with Room,接收資料庫變更的通知。使用資料存取物件 (DAO) 時,傳回 Flow
類型,即可取得即時更新。
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
每當 Example
資料表發生變更時,都會發出包含資料庫中新項目的新清單。
將回呼式 API 轉換為資料流
callbackFlow
是一個資料流建構工具,可將回呼式 API 轉換為資料流。例如,Firebase Firestore Android API 就使用了回呼。
如要將這些 API 轉換為資料流,並監聽 Firestore 資料庫更新,可以使用下方的程式碼:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
與 flow
建構工具不同,callbackFlow
可讓您從包含 send
函式的另一個 CoroutineContext
發出值,或在包含 trySend
函式的協同程式以外的位置發出值。
callbackFlow
在內部使用管道。從概念來看,管道與封鎖的佇列非常類似。管道設定了容量,也就是可緩衝的元素數量上限。在 callbackFlow
中建立的管道,其預設容量為 64 個元素。當您嘗試將新元素新增至飽和的管道時,send
會暫停生產端,直到有空間存放新元素為止,同時 offer
不會將元素新增到管道中,並立即傳回 false
。