与数据流进行通信的单元或模块的测试方式,取决于受测对象使用数据流作为输入还是输出。
- 如果受测对象观察到数据流,您可以在虚构依赖项中生成数据流,而这些可以通过测试进行控制。
- 如果单元或模块公开了数据流,您可以读取并验证测试中的数据流所发出的一个或多个数据项。
创建虚构数据提供方
当受测对象是数据流使用方时,一种常见的测试方法是用虚构实现替换提供方。例如,假设有一个用于观察存储库的类,该存储库从两个生产数据源获取数据:

为了使测试具有确定性,您可以使用始终发出同一虚构数据的虚构存储库,替换存储库及其依赖项:

如需在数据流中发出一系列预定义值,请使用 flow
构建器:
class MyFakeRepository : MyRepository {
fun observeCount() = flow {
emit(ITEM_1)
}
}
在测试中,注入此虚构存储库,替换真实实现:
@Test
fun myTest() {
// Given a class with fake dependencies:
val sut = MyUnitUnderTest(MyFakeRepository())
// Trigger and verify
...
}
现在,您可以控制受测对象的输出,接下来可以通过检查其输出状况来验证其是否正常运行。
在测试中声明数据流发送
如果受测对象公开了数据流,该测试需对数据流元素做出声明。
假设上个示例的存储库公开了数据流:

根据测试需求,您通常需要检查首次发送数据,或来自数据流的有限数据项。
您可通过调用 first()
使用数据流的首次发送数据。此函数将等待首个数据项接收完成,然后将取消信号发送给提供方。
@Test
fun myRepositoryTest() = runBlocking {
// Given a repository that combines values from two data sources:
val repository = MyRepository(fakeSource1, fakeSource2)
// When the repository emits a value
val firstItem = repository.counter.first() // Returns the first item in the flow
// Then check it's the expected item
assertThat(firstItem, isEqualTo(ITEM_1) // Using AssertJ
}
如果该测试需要检查多个值,则调用 toList()
会使数据流等待数据源发出其所有值,然后以列表形式返回这些值。请注意,此操作仅适用于有限数据流。
@Test
fun myRepositoryTest() = runBlocking {
// Given a repository with a fake data source that emits ALL_MESSAGES
val messages = repository.observeChatMessages().toList()
// When all messages are emitted then they should be ALL_MESSAGES
assertThat(messages, isEqualTo(ALL_MESSAGES))
}
对于需要收集数据项的更复杂的数据流或未返回有限数据项的数据流,您可使用 Flow
API 选取并转换数据项。下面是一些示例:
// Take the second item
outputFlow.drop(1).first()
// Take the first 5 items
outputFlow.take(5).toList()
// Take the first 5 distinct items
outputFlow.take(5).toSet()
// Take the first 2 items matching a predicate
outputFlow.takeWhile(predicate).take(2).toList()
// Take the first item that matches the predicate
outputFlow.firstWhile(predicate)
// Take 5 items and apply a transformation to each
outputFlow.map(transformation).take(5)
// Takes the first item verifying that the flow is closed after that
outputFlow.single()
// Finite data streams
// Verify that the flow emits exactly N elements (optional predicate)
outputFlow.count()
outputFlow.count(predicate)
将 CoroutineDispatcher 作为依赖项
如果受测对象接受 CoroutineDispatcher
作为依赖项,则在测试中使用 kotlinx-coroutines-test
库提供的测试调度程序来替换 StandardTestDispatcher
和 UnconfinedTestDispatcher
。
在 runTest
中,使用提供的 testScheduler
创建新的测试调度程序,然后将该调度程序传递给受测对象:
@Test
fun myTest() = runTest {
val uut = MyUnitUnderTest(UnconfinedTestDispatcher(testScheduler))
// Test body
}