Em corrotinas, um fluxo é um tipo que pode emitir vários valores sequencialmente, ao contrário das funções de suspensão, que retornam somente um valor. Por exemplo, você pode usar um fluxo para receber atualizações em tempo real de um banco de dados.
Os fluxos são criados com base nas corrotinas e podem fornecer vários valores.
Conceitualmente, um fluxo é um stream de dados que pode ser computado
de forma assíncrona. Os valores emitidos precisam ser do mesmo tipo. Por
exemplo, um Flow<Int>
é um fluxo que emite valores inteiros.
Um fluxo é muito semelhante a um Iterator
que produz uma sequência de
valores, mas usa funções de suspensão para produzir e consumir valores
de maneira assíncrona. Isso significa, por exemplo, que o fluxo pode fazer uma
solicitação de rede com segurança para produzir o próximo valor sem bloquear a linha
de execução principal.
Há três entidades envolvidas em streams de dados:
- Um produtor produz dados que são adicionados ao stream. Graças às corrotinas, os fluxos também podem produzir dados de maneira assíncrona.
- Intermediários (opcionais) podem modificar cada valor emitido para o stream ou o próprio stream.
- Um consumidor consome os valores do stream.
No Android, um repositório normalmente é um produtor de dados de IU que tem a interface do usuário (IU) como o consumidor que mostra os dados. Outras vezes, a camada da IU é um produtor de eventos de entrada do usuário consumidos por outras camadas da hierarquia. As camadas entre o produtor e o consumidor geralmente agem como intermediárias que modificam o fluxo de dados para que ele seja ajustado aos requisitos da camada seguinte.
Como criar um fluxo
Para criar fluxos, use as
APIs flow builder
(link em inglês). A função builder flow
cria um novo fluxo em que é possível emitir manualmente
novos valores para o stream de dados usando a
função
emit
(link em inglês).
No exemplo abaixo, uma fonte de dados busca as notícias mais recentes automaticamente em um intervalo fixo. Como uma função de suspensão não pode retornar vários valores consecutivos, a fonte de dados cria e retorna um fluxo para atender a esse requisito. Nesse caso, a fonte de dados atua como o produtor.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
O builder flow
é executado em uma corrotina. Assim, ele se beneficia
das mesmas APIs assíncronas, mas com algumas restrições:
- Os fluxos são sequenciais. Como o produtor está em uma corrotina, ao chamar
uma função de suspensão, o produtor é suspenso até que a função de suspensão
seja retornada. No exemplo, o produtor faz a suspensão até que a solicitação de rede
fetchLatestNews
seja concluída. Só então o resultado é emitido para o stream. - Com o builder
flow
, o produtor não podeemit
(emitir) valores de umCoroutineContext
diferente. Portanto, não chameemit
em umCoroutineContext
diferente criando novas corrotinas ou usando blocos de códigowithContext
. Nesses casos, você pode usar outros builders flow, comocallbackFlow
(link em inglês).
Como modificar o stream
Os intermediários podem usar operadores intermediários (link em inglês) para modificar o stream de dados sem consumir os valores. Esses operadores são funções que, quando aplicadas a um stream de dados, configuram uma cadeia de operações que não são executadas até que os valores sejam consumidos no futuro. Saiba mais sobre os operadores intermediários na documentação de referência de fluxo (link em inglês).
No exemplo abaixo, a camada de repositório usa o operador intermediário
map
(link em inglês)
para transformar os dados que serão mostrados em View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Operadores intermediários podem ser aplicados um após o outro, formando uma cadeia de operações executadas lentamente quando um item é emitido no fluxo. Observe que apenas aplicar um operador intermediário a um stream não inicia a coleta de fluxo.
Como coletar de um fluxo
Use um operador de terminal para acionar o fluxo e começar a detectar
valores. Para receber todos os valores no stream conforme forem emitidos, use
collect
(link em inglês).
Saiba mais sobre os operadores de terminal na
documentação oficial de fluxo (link em inglês).
Como collect
é uma função de suspensão, é preciso executá-la em
uma corrotina. Ela usa uma lambda como um parâmetro que é chamado em
cada novo valor. Como essa é uma função de suspensão, a corrotina que
chama collect
pode ser suspensa até que o fluxo seja fechado.
Continuando com o exemplo anterior, veja uma implementação simples de
um ViewModel
consumindo os dados da camada do repositório:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
A coleta do fluxo aciona o produtor que atualiza as últimas notícias
e emite o resultado da solicitação de rede em um intervalo fixo. Como o
produtor permanece sempre ativo com o loop while(true)
, o stream
de dados será fechado quando o ViewModel for apagado e
o viewModelScope
for cancelado.
A coleta de fluxo pode ser interrompida pelos seguintes motivos:
- A corrotina coletada é cancelada, conforme mostrado no exemplo anterior. Isso também interrompe o produtor subjacente.
- O produtor termina de emitir os itens. Nesse caso, o stream de dados
é fechado e a corrotina que chamou
collect
retoma a execução.
Os fluxos são frios e lentos, a menos que especificados por outros operadores
intermediários. Isso significa que o código do produtor é executado sempre que um
operador de terminal é chamado no fluxo. No exemplo anterior,
ter vários coletores de fluxo faz com que a fonte de dados busque as
notícias mais recentes várias vezes em intervalos fixos diferentes. Para otimizar e
compartilhar um fluxo quando vários consumidores coletarem ao mesmo tempo, use o operador
shareIn
.
Como detectar exceções inesperadas
A implementação do produtor pode vir de uma biblioteca de terceiros.
Isso significa que ele pode gerar exceções inesperadas. Para lidar com essas
exceções, use o
operador intermediário
catch
(link em inglês).
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
No exemplo anterior, quando ocorre uma exceção, a lambda collect
não é chamada, porque um novo item não foi recebido.
catch
também pode emit
(emitir) itens para o fluxo. Em vez disso, a camada de repositório
de exemplo poderia emit
(emitir) os valores armazenados em cache:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
Nesse exemplo, a lambda collect
é
chamada quando ocorre uma exceção, já que um novo item foi emitido para o stream por causa da
exceção.
Como executar em um CoroutineContext diferente
Por padrão, o produtor de um builder flow
é executado no
CoroutineContext
da corrotina que coleta dele e, como
já mencionado, não pode emit
(emitir) valores de um
CoroutineContext
diferente. Esse comportamento pode ser indesejável em alguns casos.
Nos exemplos usados neste tópico, a camada
do repositório não deve executar operações no Dispatchers.Main
que
é usado pelo viewModelScope
.
Para mudar o CoroutineContext
de um fluxo, use o operador intermediário
flowOn
(link em inglês).
flowOn
muda o CoroutineContext
do fluxo upstream, ou seja,
o produtor e os operadores intermediários aplicados.antes (ou acima) de
flowOn
. O fluxo downstream (os operadores intermediários depois de flowOn
com o consumidor) não é afetado e é executado no
CoroutineContext
usado para collect
(coletar) do fluxo. Se houver vários
operadores flowOn
, cada um deles mudará o upstream do
local atual.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Com esse código, os operadores onEach
e map
usam o defaultDispatcher
,
enquanto o operador catch
e o consumidor são executados no
Dispatchers.Main
usado pelo viewModelScope
.
Como a camada de origem de dados está fazendo o trabalho de E/S, você precisa usar um agente otimizado para operações de E/S:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Fluxos em bibliotecas Jetpack
O fluxo é integrado em várias bibliotecas Jetpack e é comum em bibliotecas do Android de terceiros. Um fluxo é uma ótima opção para atualizações de dados em tempo real e streams infinitos de dados.
É possível usar
fluxos com o Room (link em inglês)
para receber notificações sobre mudanças em um banco de dados. Ao usar
objetos de acesso a dados (DAO, na sigla em inglês),
retorne um tipo Flow
para receber atualizações em tempo real.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Cada vez que há uma mudança na tabela Example
, uma nova lista é emitida
com os novos itens no banco de dados.
Converter APIs baseadas em callback em fluxos
O callbackFlow
(link em inglês)
é um builder flow que permite converter APIs baseadas em callback em fluxos.
Por exemplo, as APIs do Firestore do Firebase
para Android usam callbacks.
Para converter essas APIs em fluxos e detectar atualizações do banco de dados do Firestore, você pode usar este código:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
Ao contrário do builder flow
, callbackFlow
permite que os valores sejam emitidos de um CoroutineContext
diferente com a função
send
ou fora de uma corrotina com a
função
trySend
(links em inglês).
Internamente, callbackFlow
usa um
canal,
que é conceitualmente muito semelhante a uma
fila de bloqueio (links em inglês).
Um canal é configurado com uma capacidade, o número máximo de elementos
que podem ser armazenados em buffer. O canal criado no callbackFlow
tem uma capacidade
padrão de 64 elementos. Ao tentar adicionar um novo elemento a um canal
cheio, send
suspende o produtor até que haja espaço para o novo
elemento, enquanto trySend
não adiciona o elemento ao canal e retorna
o valor false
imediatamente.
trySend
adiciona imediatamente o elemento especificado ao canal,
somente se não violar as restrições de capacidade e, em seguida, retornar o
um resultado bem-sucedido.
Recursos de fluxo adicionais
- Como testar fluxos em Kotlin no Android
StateFlow
eSharedFlow
- Outros recursos para corrotinas e fluxos do Kotlin