Android 上的 Kotlin 資料流

在協同程式中,「資料流」是一種可依序發出多個值的類型,而不是僅傳回單一值的「暫停函式」。例如,您可以使用資料流從資料庫接收即時更新。

資料流建構於協同程式之上,可提供多個值。從概念來看,資料流是可透過非同步方式計算的「資料串流」。發出的值必須屬於相同類型。例如,Flow<Int> 是發出整數值的資料流。

資料流與產生值序列的 Iterator 非常類似,但會使用暫停函式,以非同步方式產生和取用值。這意味著,資料流可以安全地發出網路要求,並在不封鎖主執行緒的情況下產生下一個值。

資料串流涉及三個實體:

  • 生產端會產生加入串流的資料。藉助協同程式,資料流也可以非同步產生資料。
  • (不一定存在) 中繼端可修改發至串流的值或串流本身。
  • 取用端會從串流取用值。

資料串流涉及的實體;取用端、不一定存在的中繼端和生產端
圖 1. 資料串流涉及的實體:取用端、不一定存在的中繼端和生產端。

在 Android 中,「存放區」通常是使用者介面資料的生產端,並將使用者介面 (UI) 做為最終顯示資料的取用端。在其他時候,使用者介面層是使用者輸入內容事件的生產端,而階層的其他層會取用這些事件。生產端和取用端之間的層,通常是中繼端。這些中繼端可以修改資料串流,使其符合下一層的要求。

建立資料流

如要建立資料流,請使用資料流建構工具 API。flow 建構工具函式會建立新的資料流。在該資料流中,您可以使用 emit 函式,手動將新值發至資料串流內。

在以下範例中,資料來源會以固定的時間間隔自動擷取最新消息。由於暫停函式無法傳回多個連續的值,所以資料來源會建立並傳回資料流,以滿足此要求。在此情況下,資料來源會做為生產端。

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

系統會在協同程式中執行 flow 建構工具。因此,它會受益於相同的非同步 API,但會受到一些限制:

  • 資料流會「依序」。由於生產端位於協同程式內,所以呼叫暫停函式時,生產端會暫停,直到暫停函式傳回為止。在這個範例中,生產端會暫停,直到 fetchLatestNews 網路請求完成為止。也只有到那時,結果才會發至串流。
  • 在使用 flow 建構工具時,生產端不能從其他 CoroutineContext 發出 emit 值。因此,請勿透過建立新的協同程式,或使用 withContext 程式碼區塊,呼叫另一個 CoroutineContext 中的 emit。在這些情況下,您可以使用其他資料流建構工具,例如 callbackFlow

修改串流

中繼端可以使用「中繼運算子」修改資料串流,而不必取用值。這些運算子是函式,在套用至資料串流時,會設定一連串在未來取用值時才執行的運算子。如要進一步瞭解中繼運算子,請參閱資料流參考說明文件

在以下範例中,存放區層使用中繼運算子 map 轉換將在 View 上顯示的資料:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

項目發至資料流時,中繼運算子可以依序套用,串連成延遲執行的運算子。請注意,如果只將中繼運算子套用至串流,則不會啟動資料流收集作業。

從資料流收集

使用「終端運算子」觸發資料流,開始監聽值。如要在發出值時取得串流中的所有值,請使用 collect。如要進一步瞭解終端運算子,請參閱官方資料流說明文件

由於 collect 是暫停函式,所以需要在協同程式中執行。該函式將 lambda 視為在每個新值上呼叫的參數。由於它是暫停函式,呼叫 collect 的協同程式可能會暫停,直到資料流關閉為止。

延續上一個範例,以下說明如何簡單實作從存放區層取用資料的 ViewModel

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

收集資料流,會觸發生產端重新整理最新消息,並於固定的時間間隔發出網路請求結果。由於生產端持續使用 while(true) 迴圈,當清除 ViewModel 和取消 viewModelScope 時,資料串流將會關閉。

由於以下原因,系統可能會停止收集資料流:

  • 取消了收集資料流的協同程式,如上一個範例所示。這樣做也會停止基礎生產端。
  • 生產端已發完所有項目。在此情況中,資料串流會關閉,而 collect 協同程式會繼續執行。

除非指定與其他中繼運算子一起使用,否則資料流會處於「冷」和「延遲」狀態。這意味著,每次在資料流中呼叫終端運算子時,都會執行生產端程式碼。在上一個範例中,由於設定了多個資料流收集器,所以資料來源多次以不同的固定時間間隔擷取最新消息。如果有多個取用端同時收集資料流,為對資料流進行最佳化調整,並共用資料流,請使用 shareIn 運算子。

擷取非預期的例外狀況

第三方程式庫可能會實作生產端。這意味著,此做法可能會擲回非預期的例外狀況。如要處理這些例外狀況,請使用 catch 中繼運算子。

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

在上一個範例中,在發生例外狀況時,由於尚未收到新項目,所以沒有呼叫 collect lambda。

catch 也可以向資料流執行對項目的 emit 作業。範例存放區層可以emit快取值:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

在此範例中,在發生例外狀況時,由於出現例外狀況,系統向串流發出新項目,所以會呼叫 collect lambda。

在另一個 CoroutineContext 中執行

根據預設,flow 建構工具的生產端會在協同程式 (從該生產端收集資料流) 的 CoroutineContext 中執行,且如前所述,它不能從另一個 CoroutineContext 執行 emit 作業。在某些情況下,這樣的行為未必是您想要的。例如,在本主題使用的範例中,存放區層不應在 viewModelScope 使用的 Dispatchers.Main 上執行作業。

如要變更資料流的 CoroutineContext,請使用中繼運算子 flowOnflowOn 會變更「上游資料流」CoroutineContext,也就是在 flowOn「之前」(或之上)的生產端和任何中繼運算子。「下游資料流」(flowOn 「之後」的中繼運算子,伴隨取用端) 不會受到影響,並會在從資料流執行 collect 作業的 CoroutineContext 執行。如果有多個 flowOn 運算子,每個運算子都會從其目前位置變更上游。

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

藉助這個程式碼,onEachmap 運算子會使用 defaultDispatcher,而 catch 運算子和取用端會在 viewModelScope 使用的 Dispatchers.Main 上執行。

由於資料來源層執行 I/O 工作,建議您使用針對 I/O 作業進行最佳化調整的調派程式:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Jetpack 程式庫內的資料流

資料流已整合至許多 Jetpack 程式庫中,在 Android 第三方程式庫中也廣受歡迎。資料流非常適合即時資料更新和無限資料串流。

您可以使用 Flow with Room,接收資料庫變更的通知。使用資料存取物件 (DAO) 時,傳回 Flow 類型,即可取得即時更新。

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

每當 Example 資料表發生變更時,都會發出包含資料庫中新項目的新清單。

將回呼式 API 轉換為資料流

callbackFlow 是一個資料流建構工具,可將回呼式 API 轉換為資料流。例如,Firebase Firestore Android API 就使用了回呼。

如要將這些 API 轉換為資料流,並監聽 Firestore 資料庫更新,可以使用下方的程式碼:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow 建構工具不同,callbackFlow 可讓您從包含 send 函式的另一個 CoroutineContext 發出值,或在包含 trySend 函式的協同程式以外的位置發出值。

callbackFlow 在內部使用管道。從概念來看,管道與封鎖的佇列非常類似。管道設定了容量,也就是可緩衝的元素數量上限。在 callbackFlow 中建立的管道,其預設容量為 64 個元素。當您嘗試將新元素新增至飽和的管道時,send 會暫停生產端,直到有空間存放新元素為止,同時 offer 不會將元素新增到管道中,並立即傳回 false

其他資料流資源