Dalam coroutine, flow adalah jenis yang dapat memunculkan beberapa nilai secara berurutan, bukan fungsi penangguhan yang hanya menampilkan satu nilai. Misalnya, Anda dapat menggunakan flow untuk menerima update langsung dari database.
Flow dibuat berdasarkan coroutine dan dapat memberikan beberapa nilai.
Secara konseptual, flow adalah aliran data yang dapat dikomputasi secara asinkron. Nilai yang dimunculkan harus memiliki jenis yang sama. Misalnya,
Flow<Int>
adalah flow yang memunculkan nilai bilangan bulat.
Flow sangat mirip dengan Iterator
yang menghasilkan urutan
nilai, tetapi menggunakan fungsi penangguhan untuk menghasilkan dan memakai nilai
secara asinkron. Ini berarti, misalnya, flow dapat dengan aman membuat
permintaan jaringan untuk menghasilkan nilai berikutnya tanpa memblokir thread
utama.
Ada tiga entity yang terlibat dalam aliran data:
- Produser menghasilkan data yang ditambahkan ke aliran data. Berkat coroutine, flow juga dapat menghasilkan data secara asinkron.
- (Opsional) Perantara dapat mengubah setiap nilai yang dimunculkan ke dalam aliran data atau aliran data itu sendiri.
- Konsumen memakai nilai dari aliran data.
Di Android, repositori biasanya merupakan produser data UI yang memiliki antarmuka pengguna (UI) sebagai konsumen yang pada akhirnya menampilkan data. Pada waktu lain, lapisan UI merupakan produser peristiwa input pengguna dan lapisan lain dari hierarki memakainya. Lapisan di antara produser dan konsumen biasanya berfungsi sebagai perantara yang mengubah aliran data untuk menyesuaikannya dengan persyaratan lapisan berikutnya.
Membuat flow
Untuk membuat flow, gunakan
API flow builder
Fungsi builder flow
membuat flow baru tempat Anda dapat
memunculkan nilai baru ke dalam aliran data secara manual menggunakan
fungsi
emit
.
Pada contoh berikut, sumber data mengambil berita terbaru secara otomatis pada interval tetap. Karena fungsi penangguhan tidak dapat menampilkan beberapa nilai berturut-turut, sumber data akan membuat dan menampilkan flow untuk memenuhi persyaratan ini. Dalam hal ini, sumber data berfungsi sebagai produser.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
Builder flow
dieksekusi dalam coroutine. Dengan demikian, builder tersebut menerima manfaat
dari API asinkron yang sama,
tetapi beberapa pembatasan berlaku:
- Flow bersifat berurutan. Karena produser berada dalam coroutine, saat memanggil
fungsi penangguhan, produser akan ditangguhkan hingga fungsi penangguhan
ditampilkan. Pada contoh tersebut, produser ditangguhkan hingga permintaan
jaringan
fetchLatestNews
selesai. Baru setelah itu hasilnya akan dimunculkan ke aliran data. - Dengan builder
flow
, produser tidak dapat melakukanemit
nilai dariCoroutineContext
berbeda. Karena itu, jangan panggilemit
dalamCoroutineContext
yang berbeda dengan membuat coroutine baru atau dengan menggunakan blok kodewithContext
. Dalam kasus ini, Anda dapat menggunakan builder flow lain seperticallbackFlow
.
Mengubah aliran data
Perantara dapat menggunakan operator perantara untuk mengubah aliran data tanpa memakai nilainya. Operator ini adalah fungsi yang, saat diterapkan ke aliran data, menyiapkan rantai operasi yang tidak dieksekusi hingga nilai dipakai di masa mendatang. Pelajari lebih lanjut operator perantara dalam Dokumentasi referensi flow.
Pada contoh di bawah ini, lapisan repositori menggunakan operator perantara
map
untuk mengubah data yang akan ditampilkan di View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
Operator perantara dapat diterapkan satu per satu, dengan membentuk rantai operasi yang dieksekusi dengan lambat saat item dimunculkan ke dalam flow. Perlu diperhatikan bahwa hanya menerapkan operator perantara ke aliran data tidak akan memulai pengumpulan flow.
Mengumpulkan dari flow
Gunakan operator terminal untuk memicu flow guna mulai memproses nilai. Untuk mendapatkan semua nilai dalam aliran data saat dimunculkan, gunakan
collect
.
Anda dapat mempelajari operator terminal lebih lanjut di
dokumentasi flow resmi.
Sebagai fungsi penangguhan, collect
perlu dieksekusi dalam
coroutine. Diperlukan lambda sebagai parameter yang dipanggil pada
setiap nilai baru. Karena merupakan fungsi penangguhan, coroutine yang
memanggil collect
dapat ditangguhkan hingga flow ditutup.
Melanjutkan contoh sebelumnya, berikut implementasi sederhana
ViewModel
yang memakai data dari lapisan repositori:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Pengumpulan flow akan memicu produser yang memperbarui berita terkini
dan memunculkan hasil permintaan jaringan pada interval tetap. Karena
produser selalu aktif dengan loop while(true)
, aliran
data akan ditutup saat ViewModel dihapus dan
viewModelScope
dibatalkan.
Pengumpulan flow dapat dihentikan karena alasan berikut:
- Coroutine yang mengumpulkan dibatalkan, seperti ditunjukkan pada contoh sebelumnya. Kondisi ini juga akan menghentikan produser yang mendasari.
- Produser selesai memunculkan item. Dalam hal ini, aliran data
ditutup dan coroutine yang memanggil
collect
melanjutkan eksekusi.
Flow bersifat cold dan lambat kecuali jika ditentukan dengan operator perantara
lainnya. Ini berarti kode produser dieksekusi setiap kali
operator terminal dipanggil pada flow. Pada contoh sebelumnya,
memiliki beberapa pengumpul flow menyebabkan sumber data mengambil
berita terkini beberapa kali pada interval tetap yang berbeda. Untuk mengoptimalkan dan
berbagi flow jika beberapa konsumen mengumpulkan secara bersamaan, gunakan
operator shareIn
.
Menangkap pengecualian yang tidak diharapkan
Implementasi produser dapat berasal dari library pihak ketiga.
Hal ini berarti implementasi dapat menampilkan pengecualian yang tidak diharapkan. Untuk menangani
pengecualian ini, gunakan
operator perantara
catch
.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
Pada contoh sebelumnya, saat pengecualian terjadi, lambda
collect
tidak dipanggil karena item baru belum diterima.
catch
juga dapat melakukan emit
item ke flow. Lapisan repositori contoh dapat melakukan emit
nilai yang di-cache:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
Dalam contoh ini, jika pengecualian terjadi, lambda collect
akan dipanggil saat item baru telah dimunculkan ke aliran data karena pengecualian.
Mengeksekusi dalam CoroutineContext yang berbeda
Secara default, produser builder flow
dieksekusi dalam CoroutineContext
coroutine yang mengumpulkan darinya, dan seperti disebutkan sebelumnya, produser ini tidak dapat melakukan emit
nilai dari CoroutineContext
yang berbeda. Perilaku ini mungkin tidak diinginkan dalam beberapa kasus.
Misalnya, dalam contoh yang digunakan di seluruh topik ini, lapisan
repositori tidak boleh menjalankan operasi pada Dispatchers.Main
yang digunakan oleh viewModelScope
.
Untuk mengubah CoroutineContext
flow, gunakan operator perantara
flowOn
.
flowOn
mengubah CoroutineContext
flow upstream, yang berarti
produser dan operator perantara mana pun yang diterapkan sebelum (atau di atas)
flowOn
. Flow downstream (operator perantara setelah flowOn
bersama konsumen) tidak terpengaruh dan dieksekusi di
CoroutineContext
yang digunakan untuk collect
dari flow. Jika ada
beberapa operator flowOn
, masing-masing operator akan mengubah upstream dari
lokasinya saat ini.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Dengan kode ini, operator onEach
dan map
menggunakan defaultDispatcher
,
sedangkan operator catch
dan konsumen dieksekusi pada
Dispatchers.Main
yang digunakan oleh viewModelScope
.
Karena lapisan sumber data menjalankan fungsi I/O, Anda harus menggunakan dispatcher yang dioptimalkan untuk operasi I/O:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Flow di library Jetpack
Flow diintegrasikan ke banyak library Jetpack, dan populer di antara library pihak ketiga Android. Flow sangat cocok untuk update data langsung dan aliran data tanpa batas.
Anda dapat menggunakan
Flow dengan Room
untuk menerima notifikasi perubahan dalam database. Saat menggunakan objek akses data (DAO), tampilkan jenis Flow
untuk mendapatkan update langsung.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Setiap kali ada perubahan di tabel Example
, daftar baru akan dimunculkan
dengan item baru dalam database.
Mengonversi API berbasis callback menjadi flow
callbackFlow
adalah flow builder yang memungkinkan Anda mengonversi API berbasis callback menjadi flow.
Sebagai contoh, Firebase Firestore Android API menggunakan callback.
Untuk mengonversi API ini menjadi flow dan memproses update database Firestore, Anda dapat menggunakan kode berikut:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
Berbeda dengan builder flow
, callbackFlow
memungkinkan nilai dimunculkan dari CoroutineContext
yang berbeda dengan
fungsi send
atau di luar coroutine dengan
fungsi
trySend
.
Secara internal, callbackFlow
menggunakan
saluran,
yang secara konseptual sangat mirip dengan antrean
pemblokiran.
Saluran dikonfigurasi dengan kapasitas, yaitu jumlah maksimum elemen
yang dapat di-buffer. Saluran yang dibuat di callbackFlow
memiliki kapasitas
default 64 elemen. Saat mencoba menambahkan elemen baru ke saluran
lengkap, send
akan menangguhkan produser hingga ada ruang untuk elemen
baru, sementara offer
tidak menambahkan elemen ke saluran dan segera menampilkan
false
.
Referensi flow lainnya
- Menguji flow Kotlin di Android
StateFlow
danSharedFlow
- Referensi lainnya untuk coroutine dan flow Kotlin