在 Android 上测试 Kotlin 数据流

数据流进行通信的单元或模块的测试方式,取决于受测对象使用数据流作为输入还是输出。

  • 如果受测对象观察到数据流,您可以在虚构依赖项中生成数据流,而这些可以通过测试进行控制。
  • 如果单元或模块公开了数据流,您可以读取并验证测试中的数据流所发出的一个或多个数据项。

创建虚构数据提供方

当受测对象是数据流使用方时,一种常见的测试方法是用虚构实现替换提供方。例如,假设有一个用于观察存储库的类,该存储库从两个生产数据源获取数据:

受测对象和数据层
图 1. 受测对象和数据层。

为了使测试具有确定性,您可以使用始终发出同一虚构数据的虚构存储库,替换存储库及其依赖项:

依赖项已替换为虚构实现
图 2. 依赖项已替换为虚构实现。

如需在数据流中发出一系列预定义值,请使用 flow 构建器:

class MyFakeRepository : MyRepository {
    fun observeCount() = flow {
        emit(ITEM_1)
    }
}

在测试中,注入此虚构存储库,替换真实实现:

@Test
fun myTest() {
    // Given a class with fake dependencies:
    val sut = MyUnitUnderTest(MyFakeRepository())
    // Trigger and verify
    ...
}

现在,您可以控制受测对象的输出,接下来可以通过检查其输出状况来验证其是否正常运行。

在测试中断言数据流发出

如果受测对象公开了数据流,该测试需对数据流元素做出断言。

假设上个示例的存储库公开了数据流:

含有虚构依赖项的存储库公开了数据流
图 3. 含有虚构依赖项的存储库(受测对象)公开了数据流。

对于某些测试,您只需要检查来自数据流的第一个发出项或有限数量的项。

您可通过调用 first() 使用数据流的第一个发出项。此函数将等待首个数据项接收完成,然后将取消信号发送给提供方。

@Test
fun myRepositoryTest() = runTest {
    // Given a repository that combines values from two data sources:
    val repository = MyRepository(fakeSource1, fakeSource2)

    // When the repository emits a value
    val firstItem = repository.counter.first() // Returns the first item in the flow

    // Then check it's the expected item
    assertEquals(ITEM_1, firstItem)
}

如果该测试需要检查多个值,则调用 toList() 会使数据流等待数据源发出其所有值,然后以列表形式返回这些值。这仅适用于有限数据流。

@Test
fun myRepositoryTest() = runTest {
    // Given a repository with a fake data source that emits ALL_MESSAGES
    val messages = repository.observeChatMessages().toList()

    // When all messages are emitted then they should be ALL_MESSAGES
    assertEquals(ALL_MESSAGES, messages)
}

对于需要更复杂地收集数据项或未返回有限数据项的数据流,您可使用 Flow API 选取并转换数据项。以下是一些示例:

// Take the second item
outputFlow.drop(1).first()

// Take the first 5 items
outputFlow.take(5).toList()

// Takes the first item verifying that the flow is closed after that
outputFlow.single()

// Finite data streams
// Verify that the flow emits exactly N elements (optional predicate)
outputFlow.count()
outputFlow.count(predicate)

测试期间持续收集

如上例所示地使用 toList() 收集数据流会在内部使用 collect(),并直到可返回整个结果列表时才暂停。

如需交错执行导致数据流发出值和对发出的值进行断言的操作,您可以在测试期间持续收集数据流中的值。

例如,下面是要测试的 Repository 类以及附带的虚构数据源实现,该实现具有 emit 方法,可在测试期间动态生成值:

class Repository(private val dataSource: DataSource) {
    fun scores(): Flow<Int> {
        return dataSource.counts().map { it * 10 }
    }
}

class FakeDataSource : DataSource {
    private val flow = MutableSharedFlow<Int>()
    suspend fun emit(value: Int) = flow.emit(value)
    override fun counts(): Flow<Int> = flow
}

在测试中使用此虚构实现时,您可以创建一个收集协程,该协程会持续接收 Repository 中的值。在此示例中,我们将它们收集到一个列表中,然后对其内容执行断言:

@Test
fun continuouslyCollect() = runTest {
    val dataSource = FakeDataSource()
    val repository = Repository(dataSource)

    val values = mutableListOf<Int>()
    val collectJob = launch(UnconfinedTestDispatcher()) {
        repository.scores().toList(values)
    }

    dataSource.emit(1)
    assertEquals(10, values[0]) // Assert on the list contents

    dataSource.emit(2)
    dataSource.emit(3)
    assertEquals(30, values[2])

    assertEquals(3, values.size) // Assert the number of items collected

    collectJob.cancel()
}

由于此处 Repository 公开的数据流永远无法完成,因此收集它的 toList 调用永远不会返回。因此,您需要在测试结束之前明确取消收集协程。否则,runTest 会一直等待其完成,导致测试停止响应并最终失败。

请注意此处的 UnconfinedTestDispatcher 如何用于收集协程。这可确保收集协程即时启动,并可在 launch 返回后接收值。

使用 Turbine 库

第三方 Turbine 库提供了一个用于创建收集协程的便捷 API,以及用于测试数据流的其他便捷功能:

@Test
fun usingTurbine() = runTest {
    val dataSource = FakeDataSource()
    val repository = Repository(dataSource)

    repository.scores().test {
        // Make calls that will trigger value changes only within test{}
        dataSource.emit(1)
        assertEquals(10, awaitItem())

        dataSource.emit(2)
        awaitItem() // Ignore items if needed, can also use skip(n)

        dataSource.emit(3)
        assertEquals(30, awaitItem())
    }
}

如需了解详情,请参阅库文档

测试 StateFlow

StateFlow 是一种可观察的数据存储器,可以收集这种存储器来以数据流的形式观察它随时间变化的存储值。请注意,该值数据流会混杂各种值,也就是说,如果在 StateFlow 中快速设置值,则不能保证该 StateFlow 的收集器能接收所有中间值,而只能接收最新的值。

在测试中,如果您考虑到混杂情况,则可以像收集任何其他数据流一样收集 StateFlow 的值(包括使用 Turbine 库)。在某些测试场景中,可以尝试收集和断言所有中间值。

不过,我们通常建议将 StateFlow 视为数据存储器,并断言其 value 属性。这样,测试会在给定时间点验证对象的当前状态,而不依赖于是否发生混杂情况。

例如,以下 ViewModel 会从 Repository 收集值,并在 StateFlow 中将值提供给界面:

class MyViewModel(private val myRepository: MyRepository) : ViewModel() {
    private val _score = MutableStateFlow(0)
    val score: StateFlow<Int> = _score.asStateFlow()

    fun initialize() {
        viewModelScope.launch {
            myRepository.scores().collect { score ->
                _score.value = score
            }
        }
    }
}

Repository 的虚构实现可能如下所示:

class FakeRepository : MyRepository {
    private val flow = MutableSharedFlow<Int>()
    suspend fun emit(value: Int) = flow.emit(value)
    override fun scores(): Flow<Int> = flow
}

使用此虚构实现测试 ViewModel 时,您可以从虚构实现发出值,以在 ViewModel 的 StateFlow 中触发更新,然后对更新后的 value 断言:

@Test
fun testHotFakeRepository() = runTest {
    val fakeRepository = FakeRepository()
    val viewModel = MyViewModel(fakeRepository)

    assertEquals(0, viewModel.score.value) // Assert on the initial value

    // Start collecting values from the Repository
    viewModel.initialize()

    // Then we can send in values one by one, which the ViewModel will collect
    fakeRepository.emit(1)
    assertEquals(1, viewModel.score.value)

    fakeRepository.emit(2)
    fakeRepository.emit(3)
    assertEquals(3, viewModel.score.value) // Assert on the latest value
}

使用 stateIn 创建的 StateFlow

在上一部分中,ViewModel 使用 MutableStateFlow 存储 Repository 中的数据流发出的最新值。这是一种常见的模式,通常通过使用 stateIn 运算符以更简单的方式实现,该运算符会将冷数据流转换为热 StateFlow

class MyViewModelWithStateIn(myRepository: MyRepository) : ViewModel() {
    val score: StateFlow<Int> = myRepository.scores()
        .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), 0)
}

stateIn 运算符具有 SharingStarted 参数,用于确定何时变为活跃状态并开始消耗底层数据流。ViewModel 中经常使用 SharingStarted.LazilySharingStarted.WhileSubsribed 等选项。

即使您在测试中对 StateFlowvalue 进行断言,也需要创建收集器。可以是空收集器:

@Test
fun testLazilySharingViewModel() = runTest {
    val fakeRepository = HotFakeRepository()
    val viewModel = MyViewModelWithStateIn(fakeRepository)

    // Create an empty collector for the StateFlow
    val collectJob = launch(UnconfinedTestDispatcher()) { viewModel.score.collect() }

    assertEquals(0, viewModel.score.value) // Can assert initial value

    // Trigger-assert like before
    fakeRepository.emit(1)
    assertEquals(1, viewModel.score.value)

    fakeRepository.emit(2)
    fakeRepository.emit(3)
    assertEquals(3, viewModel.score.value)

    // Cancel the collecting job at the end of the test
    collectJob.cancel()
}

其他资源