学习采用 Kotlin Flow 和 LiveData 的高级协程

在本 Codelab 中,您将学习如何在 Android 应用中使用 LiveData 构建器组合 Kotlin 协程LiveData。我们还将使用协程异步 flow,协程库中的该 flow 类型用于表示值的异步序列(或数据流),以实现相同的功能。

您将从一款使用 Android 架构组件构建的现有应用入手,该应用使用 LiveDataRoom 数据库获取对象列表,然后在 RecyclerView 网格布局中显示这些对象。

下面给出了一些代码段,从中您可以大致了解将要进行的操作。下面是用于查询 Room 数据库的现成代码:

val plants: LiveData<List<Plant>> = plantDao.getPlants()

系统将结合使用 LiveData 构建器和具有其他排序逻辑的协程更新 LiveData

val plants: LiveData<List<Plant>> = liveData<List<Plant>> {
   val plantsLiveData = plantDao.getPlants()
   val customSortOrder = plantsListSortOrderCache.getOrAwait()
   emitSource(plantsLiveData.map { plantList -> plantList.applySort(customSortOrder) })
}

您还可以使用 Flow 实现相同的逻辑:

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       .combine(customSortFlow) { plants, sortOrder ->
           plants.applySort(sortOrder)
       }
       .flowOn(defaultDispatcher)
       .conflate()

前提条件

  • 具有使用 ViewModelLiveDataRepositoryRoom 架构组件的经验。
  • 具有使用 Kotlin 语法(包括扩展函数和 lambda)的经验。
  • 具有使用 Kotlin 协程的经验。
  • 对于在 Android 上使用线程(包括主线程、后台线程和回调)有基本的了解。

您应执行的操作

  • 转换现有的 LiveData,从而使用支持 Kotlin 协程的 LiveData 构建器。
  • LiveData 构建器中添加逻辑。
  • 使用 Flow 执行异步操作。
  • 合并 Flows 并转换多个异步源。
  • 使用 Flows 控制并发。
  • 了解如何在 LiveDataFlow. 之间做选择

所需条件

  • Android Studio 4.1 或更高版本。此 Codelab 也可使用其他版本,但某些内容也许会缺失或有所不同。

如果在此 Codelab 操作期间遇到任何问题(代码错误、语法错误、措辞含义不明等),可以通过 Codelab 左下角的“报告错误”链接报告相应问题。

下载代码

点击下面的链接可下载此 Codelab 的所有代码:

下载 zip 文件

…或从命令行使用下列命令克隆 GitHub 代码库:

$ git clone https://github.com/googlecodelabs/kotlin-coroutines.git

此 Codelab 的代码位于 advanced-coroutines-codelab 目录中。

常见问题解答

首先,我们来看看起始示例应用的外观。按照下列说明在 Android Studio 中打开示例应用。

  1. 如果已下载 kotlin-coroutines zip 文件,请将其解压缩。
  2. 在 Android Studio 中打开 advanced-coroutines-codelab 目录。
  3. 确保在配置下拉菜单中选择了 start
  4. 点击 Run execute.png 按钮,然后选择模拟设备或连接您的 Android 设备。该设备必须能够运行 Android Lollipop(支持的最低 SDK 版本为 21)。

应用首次运行时,系统会显示一个卡片列表,每张卡片上显示特定植物的名称和图片:

2faf7cd0b97434f5.png

每个 Plant 都有一个 growZoneNumber,该属性表示植物最可能生长的区域。用户可以点按过滤器图标 ee1895257963ae84.png,在显示全部植物以及显示某个特定生长区域(硬编码为区域 9)的植物之间进行切换。多次按下过滤器按钮可以实际查看该过程

8e150fb2a41417ab.png

架构概览

此应用使用架构组件MainActivityPlantListFragment 中的界面代码与 PlantListViewModel 的应用逻辑分隔开。PlantRepositoryViewModelPlantDao 之间架起了一座桥梁,它可访问 Room 数据库并返回 Plant 对象列表。然后,界面会获取此植物列表,并在 RecyclerView 网格布局中显示这些植物。

在开始修改代码之前,让我们快速了解一下数据是如何从数据库流到界面的。下面介绍植物列表如何加载到 ViewModel 中:

PlantListViewModel.kt

val plants: LiveData<List<Plant>> = growZone.switchMap { growZone ->
    if (growZone == NoGrowZone) {
        plantRepository.plants
    } else {
        plantRepository.getPlantsWithGrowZone(growZone)
    }
}

GrowZone 是一个内嵌类,仅包含表示其区域的 IntNoGrowZone 表示某个区域不存在,仅用于过滤。

Plant.kt

inline class GrowZone(val number: Int)
val NoGrowZone = GrowZone(-1)

点按过滤器按钮时,系统会对 growZone 进行切换。我们使用 switchMap 来确定要返回的植物列表。

用于从数据库中获取植物数据的代码库和数据访问对象 (DAO) 如下所示:

PlantDao.kt

@Query("SELECT * FROM plants ORDER BY name")
fun getPlants(): LiveData<List<Plant>>

@Query("SELECT * FROM plants WHERE growZoneNumber = :growZoneNumber ORDER BY name")
fun getPlantsWithGrowZoneNumber(growZoneNumber: Int): LiveData<List<Plant>>

PlantRepository.kt

val plants = plantDao.getPlants()

fun getPlantsWithGrowZone(growZone: GrowZone) =
    plantDao.getPlantsWithGrowZoneNumber(growZone.number)

虽然大多数代码修改在 PlantListViewModelPlantRepository 中进行,但建议您花一点时间了解项目结构,重点关注植物数据如何经过不同层从数据库流到 Fragment。在下一步中,我们将使用 LiveData 构建器修改该代码,以添加自定义排序。

植物列表目前按字母顺序显示,但我们想更改此列表的顺序,先列出某些植物,然后再按字母顺序显示其余的植物。这类似于购物应用在可购买商品列表的顶部显示赞助商的商品。我们的产品团队希望能够在不发布新版本应用的情况下动态更改排序顺序,因此我们会从后端获取要排在前面的植物列表。

应用自定义排序后的应用如下所示:

ca3c67a941933bd9.png

自定义排序顺序列表包含以下四种植物:Orange、Sunflower、Gracle 和 Avocado。请注意它们是如何首先显示在列表中的,其后是按字母顺序排序的其余植物。

现在,如果按下过滤器按钮(仅显示 GrowZone 9 的植物),Sunflower 会从列表中消失,因为它的 GrowZone 不是 9。自定义排序列表中的其他三种植物都在 GrowZone 9 中,因此它们仍将处于列表顶部。其余植物中只有 Tomato 在 GrowZone 9 中,因此它显示在此列表的最后。

50efd3b656d4b97.png

现在让我们开始编写代码,以实现自定义排序。

首先,我们编写一个挂起函数,用来从网络中获取自定义排序顺序,然后再将该顺序缓存到内存中。

将以下内容添加到 PlantRepository 中:

PlantRepository.kt

private var plantsListSortOrderCache =
    CacheOnSuccess(onErrorFallback = { listOf<String>() }) {
        plantService.customPlantSortOrder()
    }

plantsListSortOrderCache 用作自定义排序顺序的内存中缓存。如果出现网络连接错误,这将回退为一个空列表,因此应用即使未获取到排序顺序,也可以显示数据。

此代码使用 sunflower 模块中的 CacheOnSuccess 实用程序类来处理缓存。通过像这样抽象出实现缓存的细节,可使应用代码更加直观。由于 CacheOnSuccess 已经过充分测试,我们无需为了确保操作正确而为代码库编写大量测试。使用 kotlinx-coroutines 时,最好在代码中引入类似的高级抽象化功能。

现在让我们合并一些逻辑,对植物列表应用排序。

将以下内容添加到 PlantRepository:

PlantRepository.kt

private fun List<Plant>.applySort(customSortOrder: List<String>): List<Plant> {
    return sortedBy { plant ->
        val positionForItem = customSortOrder.indexOf(plant.plantId).let { order ->
            if (order > -1) order else Int.MAX_VALUE
        }
        ComparablePair(positionForItem, plant.name)
    }
}

此扩展函数将重新排列此列表,把 customSortOrder 中的 Plants 置于此列表的前面。

现在,排序逻辑已准备就绪,请将 plantsgetPlantsWithGrowZone 的代码替换为下面的 LiveData 构建器

PlantRepository.kt

val plants: LiveData<List<Plant>> = liveData<List<Plant>> {
   val plantsLiveData = plantDao.getPlants()
   val customSortOrder = plantsListSortOrderCache.getOrAwait()
   emitSource(plantsLiveData.map {
       plantList -> plantList.applySort(customSortOrder)
   })
}

fun getPlantsWithGrowZone(growZone: GrowZone) = liveData {
    val plantsGrowZoneLiveData = plantDao.getPlantsWithGrowZoneNumber(growZone.number)
    val customSortOrder = plantsListSortOrderCache.getOrAwait()
    emitSource(plantsGrowZoneLiveData.map { plantList ->
        plantList.applySort(customSortOrder)
    })
}

现在,如果您运行应用,系统会显示自定义排序的植物列表:

ca3c67a941933bd9.png

LiveData 构建器允许我们以异步方式计算值,因为协程已对 liveData 进行备份。这里,我们有一个挂起函数用于从数据库中获取植物的 LiveData 列表,并且还会调用一个挂起函数来获取自定义排序顺序。然后,使用构建器将这两个值合并到一起,对植物列表进行排序并返回值。

协程在被观察到时开始执行;当协程执行成功,或者数据库或网络调用失败时,协程即被取消。

接下来,我们将借助转换来了解 getPlantsWithGrowZone 的变体。

现在,我们将修改 PlantRepository,以便在处理每个值时实现一次挂起转换,以此了解如何在 LiveData 中构建复杂的异步转换。首先,我们创建一个可在主线程上安全使用的排序算法版本。可以使用 withContext 切换到另一个仅针对 lambda 的调度程序,然后恢复使用我们开始时使用的调度程序。

将以下内容添加到 PlantRepository 中:

PlantRepository.kt

@AnyThread
suspend fun List<Plant>.applyMainSafeSort(customSortOrder: List<String>) =
    withContext(defaultDispatcher) {
        this@applyMainSafeSort.applySort(customSortOrder)
    }

然后,我们可以通过 LiveData 构建器使用这个新的主线程安全排序。更新代码块以使用 switchMap,这可让您在每次收到新值时指向新的 LiveData

PlantRepository.kt

fun getPlantsWithGrowZone(growZone: GrowZone) =
   plantDao.getPlantsWithGrowZoneNumber(growZone.number)
       .switchMap { plantList ->
           liveData {
               val customSortOrder = plantsListSortOrderCache.getOrAwait()
               emit(plantList.applyMainSafeSort(customSortOrder))
           }
       }

与先前版本相比,一旦从网络收到自定义排序顺序,此代码即可用于新的主线程安全 applyMainSafeSort。然后,系统会将此结果作为 getPlantsWithGrowZone 返回的新值发送到 switchMap

与上面的 plants LiveData 类似,此协程会在被观察到时开始执行,并且会在完成时或数据库/网络调用失败时终止。这里的区别在于,可以安全地在 map 中进行网络调用,因为它已经缓存。

现在,我们来看看如何使用 flow 实现此代码,并比较各个实现的不同。

我们将使用来自 kotlinx-coroutinesflow 构建相同的逻辑。在开始构建之前,我们先了解一下 flow 是什么,以及如何将 flow 纳入您的应用中。

flow 是序列的异步版本,这是一种收集类型,其中的值是逐个生成的。与序列一样,只有需要某个值时,flow 才会根据需要生成该值,而且 flow 可以包含无限数量的值。

那么,为什么 Kotlin 要引入新的 Flow 类型,它与常规序列有什么不同?答案就在于异步性的魔力。Flow 全面支持协程。这意味着您可以使用协程构建、转换和耗用 Flow。您还可以控制并发,即利用 Flow 通过声明的方式协调多个协程的执行。

这带来了许多令人兴奋的可能性。

Flow 可以在完全响应式风格的编程中使用。如果您之前使用过 RxJava 这类元素,Flow 提供类似的功能。可以使用 mapflatMapLatestcombine 等函数运算符对 flow 进行转换,从而简洁明了地表达应用逻辑。

Flow 还支持在大多数运算符中调用挂起函数。这样您就可以在 map 等运算符中执行连续的异步任务。通过在 flow 中使用挂起操作,通常会产生比等效的完全响应式代码更短、更易读的代码。

在此 Codelab 中,我们将了解如何使用这两种方法。

flow 如何运行

如需了解如何使用 flow 根据需要(或逐个)生成值,请查看下面的 flow。此 flow 发出值 (1, 2, 3),并在生成每一项之前、期间和之后输出相应结果。

fun makeFlow() = flow {
   println("sending first value")
   emit(1)
   println("first value collected, sending another value")
   emit(2)
   println("second value collected, sending a third value")
   emit(3)
   println("done")
}

scope.launch {
   makeFlow().collect { value ->
       println("got $value")
   }
   println("flow is completed")
}

运行上面的代码会输出以下内容:

sending first value
got 1
first value collected, sending another value
got 2
second value collected, sending a third value
got 3
done
flow is completed

您可以看到 collect lambda 与 flow 构建器如何交替执行。每次 flow 构建器调用 emit 时,它都会 suspends,直到元素完全处理为止。当从 flow 中请求另一个值时,它会从上次停止的位置 resumes,直到它再次调用 emit。flow 构建器完成后,Flow 将被取消,同时 collect 恢复,从而允许调用协程输出“flow is completed”。

collect 的调用非常重要。Flow 使用挂起运算符(例如 collect),而不是公开 Iterator 界面,以便始终知晓 flow 何时在被主动耗用。更重要的是,它可以在调用方无法再请求更多值时获知消息,以便清理资源。

flow 何时运行

collect 运算符运行时,上述示例中的 Flow 就会开始运行。通过调用 flow 构建器或其他 API 来创建新的 Flow 不会执行其他工作。挂起运算符 collectFlow 中被称为终端运算符。还有其他挂起终端运算符,例如 kotlinx-coroutines 附带的 toListfirstsingle,您也可以构建您自己的终端运算符。

默认情况下,Flow 将在以下情况下执行:

  • 每次应用终端运算符时(且每个新调用均与之前启动的任何调用无关)
  • 直到运行 flow 的协程被取消
  • 当上一个值已完全处理,并且又请求了另一个值时

由于这些规则,Flow 可以参与结构化并发操作,并且可以安全地从 Flow 启动长时间运行的协程。Flow 不会泄露资源,因为在调用方被取消时,系统始终会按照协程合作取消规则清理这些资源。

下面,我们将使用 take 运算符修改上面的 flow,以便仅查看前两个元素,然后再收集该 flow 两次。

scope.launch {
   val repeatableFlow = makeFlow().take(2)  // we only care about the first two elements
   println("first collection")
   repeatableFlow.collect()
   println("collecting again")
   repeatableFlow.collect()
   println("second collection completed")
}

运行此代码后,您将看到以下输出:

first collection
sending first value
first value collected, sending another value
collecting again
sending first value
first value collected, sending another value
second collection completed

每次调用 collect 时,flow lambda 都会从头开始执行。如果 flow 执行高成本的工作(例如发出网络请求),则这一点非常重要。此外,由于我们应用了 take(2) 运算符,因此该 flow 将只生成两个值。第二次调用 emit 后,将不会再次恢复 flow lambda,因此将不会再输出“second value collected...”这一行。

那么,FlowSequence 一样是惰性的,但它如何又是异步的呢?我们来看一下异步序列的示例 - 观察对数据库的更改。

在此示例中,我们需要使用另一个线程(如主线程或界面线程)上的观察器来协调数据库线程池上生成的数据。并且,随着数据发生更改,我们将反复发出结果,所以这种情况非常符合异步序列模式。

假设您正在为 Flow 编写 Room 集成。如果您从 Room 中已有的挂起查询支持开始,则可能会编写如下代码:

// This code is a simplified version of how Room implements flow
fun <T> createFlow(query: Query, tables: List<Tables>): Flow<T> = flow {
    val changeTracker = tableChangeTracker(tables)

    while(true) {
        emit(suspendQuery(query))
        changeTracker.suspendUntilChanged()
    }
}

此代码依靠两个虚构挂起函数生成 Flow

  • suspendQuery - 该主线程安全函数用于运行常规 Room 挂起查询
  • suspendUntilChanged - 该函数用于挂起协程,直至其中一个表发生更改

flow 被收集后,最初会 emits 查询的第一个值。处理该值后,flow 将恢复并调用 suspendUntilChanged,正如其预期实现的操作一样 - 挂起 flow,直至其中一个表发生更改。此时,系统中不会发生任何变化,直至其中一个表发生更改并且 flow 恢复。

当 flow 恢复时,系统将执行另一个主线程安全查询并 emits 结果。这个过程会永远无限循环下去。

flow 和结构化并发

不过,我们不想发生工作泄漏。协程本身的资源开销并不高,但它会反复唤醒自身去执行数据库查询。泄漏的代价相当高。

虽然我们创建了无限循环,但 Flow 可以通过支持结构化并发帮助我们解决这个问题。

耗用值或对 flow 进行迭代的唯一方法就是使用终端运算符。因为所有终端运算符都是挂起函数,因此该工作受限于调用它们的作用域的生命周期。该作用域取消后,flow 将按照常规协程合作取消规则自动取消。因此,即使我们在 flow 构建器中编写了无限循环,由于结构化并发,我们仍然可以安全地耗用 flow,不会发生泄漏。

在此步骤中,您将学习如何在 Room 中使用 Flow,并将其连接到界面。

Flow 的许多用法中,都需要用到此步骤。当按这种方式使用时,Room 运算符中的 Flow 作为类似于 LiveData 的可观察数据库查询运行。

更新 Dao

首先,请打开 PlantDao.kt,然后添加两个返回 Flow<List<Plant>> 的新查询:

PlantDao.kt

@Query("SELECT * from plants ORDER BY name")
fun getPlantsFlow(): Flow<List<Plant>>

@Query("SELECT * from plants WHERE growZoneNumber = :growZoneNumber ORDER BY name")
fun getPlantsWithGrowZoneNumberFlow(growZoneNumber: Int): Flow<List<Plant>>

请注意,除了返回类型之外,这些函数与 LiveData 版本的函数完全相同。但是,我们会同时开发这两种版本,以进行比较。

指定 Flow 返回类型后,Room 执行的查询将具有以下特征:

  • 主线程安全 - 具有 Flow 返回类型的查询始终在 Room 执行程序上运行,因此查询始终可在主线程上安全运行。您无需对代码进行任何更改,即可使其在主线程之外运行。
  • 观察更改 - Room 会自动观察更改,并将新值发给 flow。
  • 异步序列 - Flow 会在每次更改时发出完整的查询结果,且不会引入任何缓冲区。如果返回 Flow<List<T>>,则 flow 会发出包含查询结果中所有行的 List<T>。flow 会像序列一样执行,一次发出一个查询结果并挂起,直至系统向其请求下一个结果。
  • 可取消 - 当收集这些 flow 的作用域被取消时,Room 会取消观察此查询。

以上几项特征相结合,使 Flow 成为了非常好用的返回类型,用来观察来自界面层的数据库。

更新代码库

如需继续将新的返回值连接到界面,请打开 PlantRepository.kt 并添加以下代码:

PlantRepository.kt

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()

fun getPlantsWithGrowZoneFlow(growZoneNumber: GrowZone): Flow<List<Plant>> {
   return plantDao.getPlantsWithGrowZoneNumberFlow(growZoneNumber.number)
}

目前,我们只是将 Flow 值传递给调用方。这与本 Codelab 开始时介绍的将 ViewModel 传递给 LiveData 完全相同。

更新 ViewModel

PlantListViewModel.kt 中,让我们先从简单的示例入手,仅公开 plantsFlow。在接下来的几个步骤中,我们会在 flow 版本中添加生长区域切换功能。

PlantListViewModel.kt

// add a new property to plantListViewModel

val plantsUsingFlow: LiveData<List<Plant>> = plantRepository.plantsFlow.asLiveData()

我们也会使用 LiveData 版本 (val plants) 进行比较。

由于我们希望在此 Codelab 的界面层中保留 LiveData,因此我们将使用 asLiveData 扩展函数,将 Flow 转换为 LiveData。就像 LiveData 构建器一样,该函数会为生成的 LiveData 添加可配置超时。此功能非常有用,因为它会阻止在每次配置更改(例如设备旋转)时重启查询。

由于 flow 提供主线程安全性以及取消功能,因此您可以选择将 Flow 一直传递到界面层,无需将其转换为 LiveData。不过,在此 Codelab 中,我们将继续在界面层使用 LiveData

同时,在 ViewModel 中,将缓存更新添加到 init 代码块中。目前此步骤是可选的,不过如果您清除了缓存但未添加此调用,您将不会在应用中看到任何数据。

PlantListViewModel.kt

init {
    clearGrowZoneNumber()  // keep this

    // fetch the full plant list
    launchDataLoad { plantRepository.tryUpdateRecentPlantsCache() }
}

更新 fragment

打开 PlantListFragment.kt,将 subscribeUi 函数更改为使用新的 plantsUsingFlow LiveData

PlantListFragment.kt

private fun subscribeUi(adapter: PlantAdapter) {
   viewModel.plantsUsingFlow.observe(viewLifecycleOwner) { plants ->
       adapter.submitList(plants)
   }
}

使用 flow 运行应用

如果您再次运行应用,您应该会看到您现在正在使用 Flow 加载数据。由于我们尚未实现 switchMap,因此过滤器选项不会执行任何操作。

在下一步中,我们将了解如何在 Flow 中转换数据。

在这一步中,您将对 plantsFlow 应用排序顺序。我们将使用 flow 的声明式 API 执行此操作。

通过使用 mapcombinemapLatest 等转换,我们能够以声明的方式描述当每个元素在 flow 中移动时,我们希望如何对其进行转换。借助这种方式,我们甚至能够以声明的方式描述并发,这将极大地简化代码。在本部分中,您将了解如何使用运算符指示 Flow 启动两个协程,并以声明的方式合并协程结果。

首先,请打开 PlantRepository.kt,然后定义一个名为 customSortFlow 的新私有 flow:

PlantRepository.kt

private val customSortFlow = flow { emit(plantsListSortOrderCache.getOrAwait()) }

这可以定义一个 Flow,该 flow 会在被收集时调用 getOrAwait,然后 emit 排序顺序。

由于此 flow 只发出一个值,因此您还可以使用 asFlow 直接从 getOrAwait 函数构建该 flow。

// Create a flow that calls a single function
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

此代码会创建一个新的 Flow,该 flow 调用 getOrAwait 并将结果作为其第一个和唯一的值发出。具体操作方法是,使用 :: 引用 getOrAwait 方法,然后对生成的 Function 对象调用 asFlow

这两个 flow 执行相同的操作,在完成之前调用 getOrAwait 并发出结果。

以声明的方式合并多个 flow

现在我们有两个 flow,即 customSortFlowplantsFlow,所以让我们以声明的方式合并这两个 flow。

combine 运算符添加到 plantsFlow

PlantRepository.kt

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       // When the result of customSortFlow is available,
       // this will combine it with the latest value from
       // the flow above.  Thus, as long as both `plants`
       // and `sortOrder` are have an initial value (their
       // flow has emitted at least one value), any change
       // to either `plants` or `sortOrder`  will call
       // `plants.applySort(sortOrder)`.
       .combine(customSortFlow) { plants, sortOrder ->
          plants.applySort(sortOrder)
       }

combine 运算符将两个 flow 合并在一起。两个 flow 都在自己的协程中运行,然后,每当一个 flow 生成一个新值时,将使用另一个 flow 中的最新值调用转换。

通过使用 combine,我们可以将缓存的网络查询与我们的数据库查询合并。这两个 flow 将在不同的协程上并发运行。这意味着在 Room 启动网络请求时,Retrofit 可以启动网络查询。然后,一旦两个 flow 的某个结果可用,该函数就会调用 combine lambda,我们可以借此对加载的植物应用加载的排序顺序。

如需探索 combine 运算符的工作方式,请修改 customSortFlow,在 onStart 中发出两次数据(需包含长时间延迟),如下所示:

// Create a flow that calls a single function
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()
   .onStart {
       emit(listOf())
       delay(1500)
   }

当某个观察器先于其他运算符进行监听时,将会发生转换 onStart,并发出占位值。因此,我们将发出一个空列表,将 getOrAwait 调用延迟 1500 毫秒,然后继续运行原始 flow。如果您现在运行应用,您将看到 Room 数据库查询会立即返回,并与空列表合并(这意味着其将按字母顺序排序)。大约 1500 毫秒后,它开始应用自定义排序。

在继续进行本 Codelab 的学习之前,请从 customSortFlow 中移除 onStart 转换。

flow 和主线程安全

就像我们在本 Codelab 中的操作一样,Flow 可以调用主线程安全函数,而且它可以保证协程正常的主线程安全性。RoomRetrofit 将为我们提供主线程安全性,我们无需执行其他任何操作,即可使用 flow 发出网络请求或数据库查询。

此 flow 已使用以下线程:

  • plantService.customPlantSortOrder 在 Retrofit 线程上运行(调用 Call.enqueue
  • getPlantsFlow 将在 Room 执行器上运行查询
  • applySort 将在收集调度程序上运行(在本例中为 Dispatchers.Main

因此,如果我们要做的只是在 Retrofit 中调用挂起函数并使用 Room flow,就不需要出于主线程安全性考虑而使这段代码复杂化。

不过,随着数据集的大小增加,调用 applySort 的速度可能会变慢,最终阻塞主线程。Flow 提供了一个名为 flowOn 的声明式 API,用于控制 flow 在哪个线程上运行。

请将 flowOn 添加到 plantsFlow,如下所示:

PlantRepository.kt

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       .combine(customSortFlow) { plants, sortOrder ->
          plants.applySort(sortOrder)
       }
       .flowOn(defaultDispatcher)
       .conflate()

调用 flowOn 会对代码的执行方式产生两个重要影响:

  1. defaultDispatcher 上启动新的协程(本例中为 Dispatchers.Default),以在调用 flowOn 之前运行和收集 flow。
  2. 引入一个缓冲区,以将结果从新协程发送给之后的调用。
  3. 在执行 flowOn 之后,将该缓冲区的值发出到 Flow。在本例中,这些值即为 ViewModel 中的 asLiveData

这与 withContext 切换调度程序的工作方式非常相似,但它会在转换过程中引入一个缓冲区,该缓冲区会改变 flow 的工作状况。由 flowOn 启动的协程产生结果的速度要快于调用方耗用结果的速度,所以默认情况下它会缓冲大量结果。

在本例中,我们计划将结果发送到界面,因此我们只关注最新的结果。这就是 conflate 运算符执行的操作,它会修改 flowOn 的缓冲区,仅存储上一个结果。如果前一个数据还没有读取,另一个数据就进入,那么前一个数据将被覆盖。

运行应用

如果您再次运行应用,您应该会看到您正在加载数据,并使用 Flow 应用自定义排序顺序。由于我们尚未实现 switchMap,因此过滤器选项不会执行任何操作。

在下一步中,我们将了解另一种使用 flow 提供主线程安全性的方法。

如需结束此 API 的 flow 版本,请打开 PlantListViewModel.kt,根据 GrowZone 切换 flow,就像在 LiveData 版本中的操作一样。

plants liveData 中添加以下代码:

PlantListViewModel.kt

private val growZoneFlow = MutableStateFlow<GrowZone>(NoGrowZone)

val plantsUsingFlow: LiveData<List<Plant>> = growZoneFlow.flatMapLatest { growZone ->
        if (growZone == NoGrowZone) {
            plantRepository.plantsFlow
        } else {
            plantRepository.getPlantsWithGrowZoneFlow(growZone)
        }
    }.asLiveData()

此模式显示了如何将事件(生长区域会有所更改)集成到 flow 中。这与 LiveData.switchMap 版本完全相同,也就是根据某个事件在两个数据源之间切换。

代码解析

PlantListViewModel.kt

private val growZoneFlow = MutableStateFlow<GrowZone>(NoGrowZone)

该代码使用 NoGrowZone 的初始值定义了一个新的 MutableStateFlow。这是一个特殊类型的 flow 值容器,仅保留给予它的最后一个值。它是原始线程安全并发,因此您可以同时从多个线程中写入内容(“最新”项优先)。

您还可以进行订阅,获取当前值的更新。总体来说,该代码与 LiveData 的行为类似:其只保留最后一个值,并允许您观察对其的更改。

PlantListViewModel.kt

val plantsUsingFlow: LiveData<List<Plant>> = growZoneFlow.flatMapLatest { growZone ->

StateFlow 也是一个常规 Flow,因此您可以像平常一样使用所有的运算符。

在这里我们使用 flatMapLatest 运算符,它与 LiveData 中的 switchMap 完全相同。只要 growZone 更改其值,该 lambda 就会启用,且必须返回一个 Flow。返回的 Flow 将用作所有下游运算符的 Flow

从根本上来讲,这使得我们能够根据 growZone 的值在不同 flow 之间切换。

PlantListViewModel.kt

if (growZone == NoGrowZone) {
    plantRepository.plantsFlow
} else {
    plantRepository.getPlantsWithGrowZoneFlow(growZone)
}

flatMapLatest 内,我们根据 growZone 进行切换。此代码与 LiveData.switchMap 版本几乎完全相同,唯一的区别是它返回 Flows,而不是 LiveDatas

PlantListViewModel.kt

   }.asLiveData()

最后,我们将 Flow 转换为 LiveData,因为我们的 Fragment 希望从 ViewModel 公开 LiveData

更改 StateFlow 的值

如果需要让应用知道过滤器的更改,我们可以设置 MutableStateFlow.value。这样我们可以像这里的操作一样,它可以轻松将事件传递到协程中。

PlantListViewModel.kt

fun setGrowZoneNumber(num: Int) {
    growZone.value = GrowZone(num)
    growZoneFlow.value = GrowZone(num)

    launchDataLoad {
        plantRepository.tryUpdateRecentPlantsForGrowZoneCache(GrowZone(num)) }
    }

fun clearGrowZoneNumber() {
    growZone.value = NoGrowZone
    growZoneFlow.value = NoGrowZone

    launchDataLoad {
        plantRepository.tryUpdateRecentPlantsCache()
    }
}

再次运行应用

如果您再次运行应用,过滤器会同时适用于 LiveData 版本和 Flow 版本。

在下一步中,我们将对 getPlantsWithGrowZoneFlow 应用自定义排序。

Flow 中最令人兴奋的一项功能是它为挂起函数提供绝佳的支持。flow 构建器以及几乎每一个转换都会公开一个可调用任何挂起函数的 suspend 运算符。因此,可以通过从 flow 内部调用常规挂起函数来确保网络和数据库调用的主线程安全性,以及安排多个异步操作。

实际上,这允许您自然地将声明式转换与命令式代码混合使用。如本例中所示,在常规 map 运算符内,您可以安排多个异步操作,而无需应用任何额外的转换。在很多地方,与完全声明式方式相比,使用这种方式的代码更简单。

使用挂起函数安排异步工作

接下来我们将对 Flow 进行总结,使用挂起运算符实现自定义排序。

打开 PlantRepository.kt 并向 getPlantsWithGrowZoneNumberFlow 添加 map 转换。

PlantRepository.kt

fun getPlantsWithGrowZoneFlow(growZone: GrowZone): Flow<List<Plant>> {
   return plantDao.getPlantsWithGrowZoneNumberFlow(growZone.number)
       .map { plantList ->
           val sortOrderFromNetwork = plantsListSortOrderCache.getOrAwait()
           val nextValue = plantList.applyMainSafeSort(sortOrderFromNetwork)
           nextValue
       }
}

此 map 依赖常规挂起函数处理异步工作,因此具有主线程安全性,即便它合并了两项异步操作。

从数据库中返回每项结果后,我们会获得缓存的排序顺序。如果该顺序未准备好,它将等待异步网络请求。一旦我们有了排序顺序,您就可以安全地调用 applyMainSafeSort,它将在默认调度程序上运行该排序。

现在,该代码将主线程安全问题迁移到了常规挂起函数中,因此已经完全具备主线程安全性。这比在 plantsFlow 中实现的相同转换简单得多。

不过值得注意的是,执行的方式略有不同。每当数据库发出新值时,都会获取缓存值。这没有问题,因为我们将在 plantsListSortOrderCache 中正确缓存该值,但如果启动了新的网络请求,此实现将产生许多不必要的网络请求。此外,在 .combine 版本中,网络请求和数据库查询会并发运行,而在此版本中,将按序列运行。

由于存在这些差异,因此构建此代码并没有明确的规则。在许多情况下,可以像我们在这里的操作一样使用挂起转换,这样可以使所有异步操作按顺序进行。但在其他情况下,最好使用运算符来控制并发以及提供主线程安全性。

即将大功告成!作为最后(可选)步骤,让我们将网络请求移至基于 flow 的协程中。

这样,我们便可从 onClick 调用的处理程序中移除进行网络调用的逻辑,并从 growZone 驱动它们。这可帮助我们创建单一可信来源并避免代码重复 - 在没有刷新缓存时,任何代码都无法更改过滤器。

打开 PlantListViewModel.kt,然后将它添加到 init 代码块:

PlantListViewModel.kt

init {
   clearGrowZoneNumber()

   growZone.mapLatest { growZone ->
           _spinner.value = true
           if (growZone == NoGrowZone) {
               plantRepository.tryUpdateRecentPlantsCache()
           } else {
               plantRepository.tryUpdateRecentPlantsForGrowZoneCache(growZone)
           }
       }
       .onEach {  _spinner.value = false }
       .catch { throwable ->  _snackbar.value = throwable.message  }
       .launchIn(viewModelScope)
}

此代码将启动一个新协程,观察发送到 growZoneChannel 的值。您现在可通过以下方法把网络调用注释掉,因为它们只适用于 LiveData 版本。

PlantListViewModel.kt

fun setGrowZoneNumber(num: Int) {
    growZone.value = GrowZone(num)
    growZoneFlow.value = GrowZone(num)

    // launchDataLoad {
    //    plantRepository.tryUpdateRecentPlantsForGrowZoneCache(GrowZone(num))
    // }
}

fun clearGrowZoneNumber() {
    growZone.value = NoGrowZone
    growZoneFlow.value = NoGrowZone

    // launchDataLoad {
    //    plantRepository.tryUpdateRecentPlantsCache()
    // }
}

再次运行应用

如果您现在再次运行应用,您会看到网络刷新现在由 growZone 控制。我们对代码进行了实质性的改进,因为作为过滤条件激活的唯一真实来源,渠道中有了更多更改过滤器的方法。这样一来,网络请求和当前过滤器便不可避免地需要同步。

代码解析

让我们从外部开始,逐一解析一下代码中所用的所有新函数:

PlantListViewModel.kt

growZone
    // ...
    .launchIn(viewModelScope)

这一次,我们使用 launchIn 运算符在 ViewModel 内收集 flow。

运算符 launchIn 创建了一个新协程,并从 flow 中收集每个值。它将在所提供的 CoroutineScope 中启动(本例中为 viewModelScope)。非常棒,因为此 ViewModel 会被清除,收集将被取消。

如果没有提供任何其他运算符,那这样做不会有太大作用,但是由于 Flow 会在其所有运算符中提供挂起 lambda,所以您可以根据每个值轻松执行异步操作。

PlantListViewModel.kt

.mapLatest { growZone ->
    _spinner.value = true
    if (growZone == NoGrowZone) {
        plantRepository.tryUpdateRecentPlantsCache()
    } else {
        plantRepository.tryUpdateRecentPlantsForGrowZoneCache(growZone)
    }
}

这就是神奇之处 - mapLatest 会为每个值使用此 map 函数。但是与常规 map 不同,每次调用 map 转换它都会启动新的协程。然后,如果 growZoneChannel 在上一个协程完成之前发出了新值,它会在启动新协程之前取消该新值。

我们可以使用 mapLatest 控制并发。不用我们自己手动构建取消/重启逻辑,flow 转换可以解决此问题。与手动编写相同的取消逻辑相比,此代码更简洁和易于理解。

取消 Flow 遵循正常的协程合作取消规则

PlantListViewModel.kt

.onEach {  _spinner.value = false }
.catch { throwable -> _snackbar.value = throwable.message }

每当上面的 flow 发出一个值时,onEach 就会被调用。在本示例中,我们将在处理完成后使用它来重置旋转图标。

catch 运算符将捕获 flow 中针对其抛出的所有异常。它可以向 flow 发出一个新值(例如错误状态),将异常重新抛入 flow,或像我们在这里所做的操作一样。

如果发生错误,我们只需告知 _snackbar 显示错误消息即可。

总结

此步骤展示的是如何使用 Flow 控制并发,以及如何使用 ViewModel 中的 Flows,而无需依赖于界面观察器。

作为一个具有挑战性的步骤,请尝试定义一个函数来封装此 flow 的数据加载,使用以下签名:

fun <T> loadDataFor(source: StateFlow<T>, block: suspend (T) -> Unit) {