StateFlow
和 SharedFlow
是 Flow API,允许数据流以最优方式发出状态更新并向多个使用方发出值。
StateFlow
StateFlow
是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其 value
属性读取当前状态值。如需更新状态并将其发送到数据流,请为 MutableStateFlow
类的 value
属性分配一个新值。
在 Android 中,StateFlow
非常适合需要让可变状态保持可观察的类。
按照 Kotlin 数据流中的示例,可以从 LatestNewsViewModel
公开 StateFlow
,以便 View
能够监听界面状态更新,并自行使屏幕状态在配置更改后继续有效。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
// Backing property to avoid state updates from other classes
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
// The UI collects from this StateFlow to get its state updates
val uiState: StateFlow<LatestNewsUiState> = _uiState
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Update View with the latest favorite news
// Writes to the value property of MutableStateFlow,
// adding a new element to the flow and updating all
// of its collectors
.collect { favoriteNews ->
_uiState.value = LatestNewsUiState.Success(favoriteNews)
}
}
}
}
// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
data class Success(val news: List<ArticleHeadline>): LatestNewsUiState()
data class Error(val exception: Throwable): LatestNewsUiState()
}
负责更新 MutableStateFlow
的类是提供方,从 StateFlow
收集的所有类都是使用方。与使用 flow
构建器构建的冷数据流不同,StateFlow
是热数据流:从数据流收集数据不会触发任何提供方代码。StateFlow
始终处于活跃状态并存于内存中,而且只有在垃圾回收根中未涉及对它的其他引用时,它才符合垃圾回收条件。
当新使用方开始从数据流中收集数据时,它将接收信息流中的最近一个状态及任何后续状态。您可在 LiveData
等其他可观察类中找到此操作行为。
与处理任何其他数据流一样,View
会监听 StateFlow
:
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
...
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// repeatOnLifecycle launches the block in a new coroutine every time the
// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
如需将任何数据流转换为 StateFlow
,请使用 stateIn
中间运算符。
StateFlow、Flow 和 LiveData
StateFlow
和 LiveData
具有相似之处。两者都是可观察的数据容器类,并且在应用架构中使用时,两者都遵循相似模式。
但请注意,StateFlow
和 LiveData
的行为确实有所不同:
StateFlow
需要将初始状态传递给构造函数,而LiveData
不需要。- 当 View 进入
STOPPED
状态时,LiveData.observe()
会自动取消注册使用方,而从StateFlow
或任何其他数据流收集数据的操作并不会自动停止。如需实现相同的行为,您需要从Lifecycle.repeatOnLifecycle
块收集数据流。
利用 shareIn
使冷数据流变为热数据流
StateFlow
是热数据流,只要该数据流被收集,或对它的任何其他引用在垃圾回收根中存在,该数据流就会一直存于内存中。您可以使用 shareIn
运算符将冷数据流变为热数据流。
以在 Kotlin 数据流中创建的 callbackFlow
为例,您无需为每个收集器都创建一个新数据流,而是可以使用 shareIn
在收集器间共享从 Firestore 检索到的数据。您需要传入以下内容:
- 用于共享数据流的
CoroutineScope
。此作用域函数的生命周期应长于任何使用方,使共享数据流在足够长的时间内保持活跃状态。 - 要重放 (replay) 至每个新收集器的数据项数量。
- “启动”行为政策。
class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed()
)
}
在此示例中,latestNews
数据流将上次发出的数据项重放至新收集器,只要 externalScope
处于活跃状态并且存在活跃收集器,它就会一直处于活跃状态。当存在活跃订阅者时,SharingStarted.WhileSubscribed()
“启动”政策将使上游提供方保持活跃状态。可使用其他启动政策,例如使用 SharingStarted.Eagerly
可立即启动提供方,使用 SharingStarted.Lazily
可在第一个订阅者出现后开始共享数据,并使数据流永远保持活跃状态。
SharedFlow
shareIn
函数会返回一个热数据流 SharedFlow
,此数据流会向从其中收集值的所有使用方发出数据。SharedFlow
是 StateFlow
的可配置性极高的泛化数据流。
您无需使用 shareIn
即可创建 SharedFlow
。例如,您可以使用 SharedFlow
将 tick 信息发送到应用的其余部分,以便所有内容会定期同时刷新。除了获取最新资讯之外,您可能还想要使用用户最喜欢的主题集刷新用户信息部分。在以下代码段中,TickHandler
公开了 SharedFlow
,以便其他类知道要在何时刷新其内容。与 StateFlow
一样,请在类中使用类型 MutableSharedFlow
的后备属性将数据项发送给数据流:
// Class that centralizes when the content of the app needs to be refreshed
class TickHandler(
private val externalScope: CoroutineScope,
private val tickIntervalMs: Long = 5000
) {
// Backing property to avoid flow emissions from other classes
private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
val tickFlow: SharedFlow<Event<String>> = _tickFlow
init {
externalScope.launch {
while(true) {
_tickFlow.emit(Unit)
delay(tickIntervalMs)
}
}
}
}
class NewsRepository(
...,
private val tickHandler: TickHandler,
private val externalScope: CoroutineScope
) {
init {
externalScope.launch {
// Listen for tick updates
tickHandler.tickFlow.collect {
refreshLatestNews()
}
}
}
suspend fun refreshLatestNews() { ... }
...
}
您可通过以下方式自定义 SharedFlow
行为:
- 通过
replay
,您可以针对新订阅者重新发送多个之前已发出的值。 - 通过
onBufferOverflow
,您可以指定相关政策来处理缓冲区中已存满要发送的数据项的情况。默认值为BufferOverflow.SUSPEND
,这会使调用方挂起。其他选项包括DROP_LATEST
或DROP_OLDEST
。
MutableSharedFlow
还具有 subscriptionCount
属性,其中包含处于活跃状态的收集器的数量,以便您相应地优化业务逻辑。MutableSharedFlow
还包含一个 resetReplayCache
函数,供您在不想重放已向数据流发送的最新信息的情况下使用。
其他数据流资源
- Android 上的 Kotlin 数据流
- 在 Android 上测试 Kotlin 数据流
- 关于 Flow 的 shareIn 和 stateIn 运算符的注意事项
- 从 LiveData 迁移到 Kotlin Flow
- 有关 Kotlin 协程和数据流的其他资源