Fluxos em Kotlin no Android

Em corrotinas, um fluxo é um tipo que pode emitir vários valores sequencialmente, ao contrário das funções de suspensão, que retornam somente um valor. Por exemplo, você pode usar um fluxo para receber atualizações em tempo real de um banco de dados.

Os fluxos são criados com base nas corrotinas e podem fornecer vários valores. Conceitualmente, um fluxo é um stream de dados que pode ser computado de forma assíncrona. Os valores emitidos precisam ser do mesmo tipo. Por exemplo, um Flow<Int> é um fluxo que emite valores inteiros.

Um fluxo é muito semelhante a um Iterator que produz uma sequência de valores, mas usa funções de suspensão para produzir e consumir valores de maneira assíncrona. Isso significa, por exemplo, que o fluxo pode fazer uma solicitação de rede com segurança para produzir o próximo valor sem bloquear a linha de execução principal.

Há três entidades envolvidas em streams de dados:

  • Um produtor produz dados que são adicionados ao stream. Graças às corrotinas, os fluxos também podem produzir dados de maneira assíncrona.
  • Intermediários (opcionais) podem modificar cada valor emitido para o stream ou o próprio stream.
  • Um consumidor consome os valores do stream.

entidades envolvidas em streams de dados: consumidor, intermediários
              opcionais e produtor
Figura 1. Entidades envolvidas em streams de dados: consumidor, intermediários opcionais e produtor.

No Android, um repositório normalmente é um produtor de dados de IU que tem a interface do usuário (IU) como o consumidor que mostra os dados. Outras vezes, a camada da IU é um produtor de eventos de entrada do usuário consumidos por outras camadas da hierarquia. As camadas entre o produtor e o consumidor geralmente agem como intermediárias que modificam o fluxo de dados para que ele seja ajustado aos requisitos da camada seguinte.

Como criar um fluxo

Para criar fluxos, use as APIs flow builder (link em inglês). A função builder flow cria um novo fluxo em que é possível emitir manualmente novos valores para o stream de dados usando a função emit (link em inglês).

No exemplo abaixo, uma fonte de dados busca as notícias mais recentes automaticamente em um intervalo fixo. Como uma função de suspensão não pode retornar vários valores consecutivos, a fonte de dados cria e retorna um fluxo para atender a esse requisito. Nesse caso, a fonte de dados atua como o produtor.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

O builder flow é executado em uma corrotina. Assim, ele se beneficia das mesmas APIs assíncronas, mas com algumas restrições:

  • Os fluxos são sequenciais. Como o produtor está em uma corrotina, ao chamar uma função de suspensão, o produtor é suspenso até que a função de suspensão seja retornada. No exemplo, o produtor faz a suspensão até que a solicitação de rede fetchLatestNews seja concluída. Só então o resultado é emitido para o stream.
  • Com o builder flow, o produtor não pode emit (emitir) valores de um CoroutineContext diferente. Portanto, não chame emit em um CoroutineContext diferente criando novas corrotinas ou usando blocos de código withContext. Nesses casos, você pode usar outros builders flow, como callbackFlow (link em inglês).

Como modificar o stream

Os intermediários podem usar operadores intermediários (link em inglês) para modificar o stream de dados sem consumir os valores. Esses operadores são funções que, quando aplicadas a um stream de dados, configuram uma cadeia de operações que não são executadas até que os valores sejam consumidos no futuro. Saiba mais sobre os operadores intermediários na documentação de referência de fluxo (link em inglês).

No exemplo abaixo, a camada de repositório usa o operador intermediário map (link em inglês) para transformar os dados que serão mostrados em View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Operadores intermediários podem ser aplicados um após o outro, formando uma cadeia de operações executadas lentamente quando um item é emitido no fluxo. Observe que apenas aplicar um operador intermediário a um stream não inicia a coleta de fluxo.

Como coletar de um fluxo

Use um operador de terminal para acionar o fluxo e começar a detectar valores. Para receber todos os valores no stream conforme forem emitidos, use collect (link em inglês). Saiba mais sobre os operadores de terminal na documentação oficial de fluxo (link em inglês).

Como collect é uma função de suspensão, é preciso executá-la em uma corrotina. Ela usa uma lambda como um parâmetro que é chamado em cada novo valor. Como essa é uma função de suspensão, a corrotina que chama collect pode ser suspensa até que o fluxo seja fechado.

Continuando com o exemplo anterior, veja uma implementação simples de um ViewModel consumindo os dados da camada do repositório:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

A coleta do fluxo aciona o produtor que atualiza as últimas notícias e emite o resultado da solicitação de rede em um intervalo fixo. Como o produtor permanece sempre ativo com o loop while(true), o stream de dados será fechado quando o ViewModel for apagado e o viewModelScope for cancelado.

A coleta de fluxo pode ser interrompida pelos seguintes motivos:

  • A corrotina coletada é cancelada, conforme mostrado no exemplo anterior. Isso também interrompe o produtor subjacente.
  • O produtor termina de emitir os itens. Nesse caso, o stream de dados é fechado e a corrotina que chamou collect retoma a execução.

Os fluxos são frios e lentos, a menos que especificados por outros operadores intermediários. Isso significa que o código do produtor é executado sempre que um operador de terminal é chamado no fluxo. No exemplo anterior, ter vários coletores de fluxo faz com que a fonte de dados busque as notícias mais recentes várias vezes em intervalos fixos diferentes. Para otimizar e compartilhar um fluxo quando vários consumidores coletarem ao mesmo tempo, use o operador shareIn.

Como detectar exceções inesperadas

A implementação do produtor pode vir de uma biblioteca de terceiros. Isso significa que ele pode gerar exceções inesperadas. Para lidar com essas exceções, use o operador intermediário catch (link em inglês).

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

No exemplo anterior, quando ocorre uma exceção, a lambda collect não é chamada, porque um novo item não foi recebido.

catch também pode emit (emitir) itens para o fluxo. Em vez disso, a camada de repositório de exemplo poderia emit (emitir) os valores armazenados em cache:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

Nesse exemplo, a lambda collect é chamada quando ocorre uma exceção, já que um novo item foi emitido para o stream por causa da exceção.

Como executar em um CoroutineContext diferente

Por padrão, o produtor de um builder flow é executado no CoroutineContext da corrotina que coleta dele e, como já mencionado, não pode emit (emitir) valores de um CoroutineContext diferente. Esse comportamento pode ser indesejável em alguns casos. Nos exemplos usados neste tópico, a camada do repositório não deve executar operações no Dispatchers.Main que é usado pelo viewModelScope.

Para mudar o CoroutineContext de um fluxo, use o operador intermediário flowOn (link em inglês). flowOn muda o CoroutineContext do fluxo upstream, ou seja, o produtor e os operadores intermediários aplicados.antes (ou acima) de flowOn. O fluxo downstream (os operadores intermediários depois de flowOn com o consumidor) não é afetado e é executado no CoroutineContext usado para collect (coletar) do fluxo. Se houver vários operadores flowOn, cada um deles mudará o upstream do local atual.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Com esse código, os operadores onEach e map usam o defaultDispatcher, enquanto o operador catch e o consumidor são executados no Dispatchers.Main usado pelo viewModelScope.

Como a camada de origem de dados está fazendo o trabalho de E/S, você precisa usar um agente otimizado para operações de E/S:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Fluxos em bibliotecas Jetpack

O fluxo é integrado em várias bibliotecas Jetpack e é comum em bibliotecas do Android de terceiros. Um fluxo é uma ótima opção para atualizações de dados em tempo real e streams infinitos de dados.

É possível usar fluxos com o Room (link em inglês) para receber notificações sobre mudanças em um banco de dados. Ao usar objetos de acesso a dados (DAO, na sigla em inglês), retorne um tipo Flow para receber atualizações em tempo real.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Cada vez que há uma mudança na tabela Example, uma nova lista é emitida com os novos itens no banco de dados.

Converter APIs baseadas em callback em fluxos

O callbackFlow (link em inglês) é um builder flow que permite converter APIs baseadas em callback em fluxos. Por exemplo, as APIs do Firestore do Firebase para Android usam callbacks.

Para converter essas APIs em fluxos e detectar atualizações do banco de dados do Firestore, você pode usar este código:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

Ao contrário do builder flow, callbackFlow permite que os valores sejam emitidos de um CoroutineContext diferente com a função send ou fora de uma corrotina com a função trySend (links em inglês).

Internamente, callbackFlow usa um canal, que é conceitualmente muito semelhante a uma fila de bloqueio (links em inglês). Um canal é configurado com uma capacidade, o número máximo de elementos que podem ser armazenados em buffer. O canal criado no callbackFlow tem uma capacidade padrão de 64 elementos. Ao tentar adicionar um novo elemento a um canal cheio, send suspende o produtor até que haja espaço para o novo elemento, enquanto trySend não adiciona o elemento ao canal e retorna o valor false imediatamente.

trySend adiciona imediatamente o elemento especificado ao canal, somente se não violar as restrições de capacidade e, em seguida, retornar o um resultado bem-sucedido.

Recursos de fluxo adicionais