Android'de Kotlin akışı

Eş yordamlarda, yalnızca tek bir değer döndüren askıya alma işlevlerinin aksine, akış, sırayla birden fazla değer iletebilen bir türdür. Örneğin, bir veritabanından canlı güncellemeler almak için bir akış kullanabilirsiniz.

Akışlar eş yordamlar temel alınarak oluşturulur ve birden fazla değer sağlayabilir. Akış, kavram olarak eşzamansız olarak hesaplanabilen bir veri akışıdır. Yayınlanan değerler aynı türde olmalıdır. Örneğin Flow<Int>, tam sayı değerleri yayan bir akıştır.

Akış, değer dizisi oluşturan bir Iterator öğesine çok benzer ancak değerleri eşzamansız olarak üretmek ve tüketmek için askıya alma işlevlerini kullanır. Bu, örneğin, akışın ana iş parçacığını engellemeden sonraki değeri üretmek için güvenli bir şekilde ağ isteğinde bulunabileceği anlamına gelir.

Veri akışlarına dahil olan üç varlık vardır:

  • Üretici, akışa eklenen veriler üretir. İlişkiler sayesinde akışlar eşzamansız olarak veri de üretebilir.
  • (İsteğe bağlı) Aracılar, akışa veya akışın kendisine yayınlanan her değeri değiştirebilir.
  • Tüketici, akıştaki değerleri tüketir.

Veri akışlarında yer alan varlıklar; tüketici, isteğe bağlı aracılar ve üretici
Şekil 1. Veri akışlarına dahil olan varlıklar: tüketici, isteğe bağlı aracılar ve üretici.

Android'de depo, genellikle verileri görüntüleyen tüketici olarak kullanıcı arayüzüne (UI) sahip olan kullanıcı arayüzü verilerinin üreticisidir. Bazı durumlarda ise kullanıcı arayüzü katmanı, kullanıcı girişi etkinliklerinin üreticisidir ve hiyerarşinin diğer katmanları bunları tüketir. Üretici ile tüketici arasındaki katmanlar genellikle veri akışını aşağıdaki katmanın gereksinimlerine göre düzenlemek için değiştiren aracılar olarak görev yapar.

Akış oluşturma

Akışlar oluşturmak için akış oluşturucu API'lerini kullanın. flow oluşturucu işlevi, emit işlevini kullanarak veri akışına manuel olarak yeni değerler yayınlayabileceğiniz yeni bir akış oluşturur.

Aşağıdaki örnekte bir veri kaynağı en son haberleri sabit aralıklarla otomatik olarak getirmektedir. Askıya alma işlevi ardışık birden çok değer döndüremediğinden, veri kaynağı bu gereksinimi karşılamak için bir akış oluşturur ve döndürür. Bu durumda veri kaynağı, üretici rolünü üstlenir.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow oluşturucu bir eş yordam içinde yürütülür. Bu nedenle, aynı eşzamansız API'lerden yararlanır ancak bazı kısıtlamalar geçerlidir:

  • Akışlar sıralıdır. Üretici bir eş yordamda olduğundan, askıya alma işlevi çağrılırken askıya alma işlevi döndürülene kadar üretici askıya alır. Bu örnekte, üretici fetchLatestNews ağ isteği tamamlanana kadar askıya alır. Ancak bu şekilde sonuç akışa gönderilir.
  • flow oluşturucu ile üretici, farklı bir CoroutineContext öğesinden emit değer alamaz. Bu nedenle, yeni eş yordamlar oluşturarak veya withContext kod blokları kullanarak emit öğesini farklı bir CoroutineContext içinde çağırmayın. Bu durumlarda, callbackFlow gibi diğer akış oluşturucuları kullanabilirsiniz.

Akışta değişiklik yapma

Aracılar, değerleri kullanmadan veri akışını değiştirmek için ara operatörleri kullanabilir. Bu operatörler, bir veri akışına uygulandığında değerler gelecekte tüketilene kadar çalıştırılmayan bir işlem zinciri oluşturan işlevlerdir. Akış referans belgelerinde ara operatörler hakkında daha fazla bilgi edinebilirsiniz.

Aşağıdaki örnekte depo katmanı, View üzerinde görüntülenecek verileri dönüştürmek için ara operatörü map kullanmaktadır:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Ara operatörler art arda uygulanarak akışa bir öğe yayınlandığında geç yürütülen bir işlem zinciri oluşturabilir. Akışa yalnızca ara operatör uygulanmasının akış toplamayı başlatmadığını unutmayın.

Akıştan veri toplama

Değerleri dinlemeye başlamak üzere akışı tetiklemek için bir terminal operatörü kullanın. Akıştaki tüm değerleri yayınlandıkça almak için collect işlevini kullanın. Terminal operatörleri hakkında daha fazla bilgiyi resmi akış belgelerinde bulabilirsiniz.

collect bir askıya alma işlevi olduğundan, bir eş yordam içinde yürütülmelidir. Her yeni değerde çağrılan bir lambdayı parametre olarak alır. Bu bir askıya alma işlevi olduğundan collect çağıran eş yordam, akış kapatılana kadar askıya alabilir.

Önceki örnekten devam edersek, depo katmanındaki verileri tüketen bir ViewModel basit uygulaması şu şekildedir:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Akışın toplanması, en son haberleri yenileyen ve ağ isteğinin sonucunu sabit aralıklarla yayınlayan yapımcıyı tetikler. Üretici, while(true) döngüsüyle her zaman etkin kaldığı için ViewModel temizlendiğinde ve viewModelScope iptal edildiğinde veri akışı kapatılır.

Akış toplama aşağıdaki nedenlerle durdurulabilir:

  • Toplanan eş yordam, önceki örnekte gösterildiği gibi iptal edilir. Bu işlem, temel yapımcıyı da durdurur.
  • Üretici ürün yayınlamayı bitirir. Bu durumda veri akışı kapatılır ve collect adlı eş yordam yürütmeye devam eder.

Akışlar, diğer ara operatörlerle belirtilmedikçe soğuk ve geçen niteliktedir. Bu, akışta bir terminal operatörü her çağrıldığında üretici kodunun yürütüleceği anlamına gelir. Önceki örnekte, birden fazla akış toplayıcınızın olması, veri kaynağının en son haberleri farklı sabit aralıklarla birden çok kez getirmesine neden olur. Birden fazla tüketici aynı anda veri topladığında bir akışı optimize etmek ve paylaşmak için shareIn operatörünü kullanın.

Beklenmedik istisnaları yakalama

Yapımcının uygulaması üçüncü taraf bir kitaplıktan gelebilir. Bu da beklenmedik istisnalar atabileceği anlamına gelir. Bu istisnaları yönetmek için catch ara operatörünü kullanın.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Önceki örnekte, bir istisna oluştuğunda yeni bir öğe alınmadığı için collect lambda çağrılmaz.

Ayrıca catch, akışa emit öğe de ekleyebilir. Örnek depo katmanı, bunun yerine önbelleğe alınan değerleri emit sağlayabilir:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

Bu örnekte, bir istisna oluştuğunda istisna nedeniyle akışa yeni bir öğe dağıtıldığı için collect lambda çağrılır.

Farklı bir CoroutineContext'te yürütme

Varsayılan olarak flow oluşturucunun üreticisi, kendisinden toplanan eşin CoroutineContext içinde yürütülür ve daha önce de belirtildiği gibi, farklı bir CoroutineContext öğesinden değerler emit alamaz. Bazı durumlarda bu istenmeyen bir davranış olabilir. Örneğin, bu konu boyunca kullanılan örneklerde depo katmanı, viewModelScope tarafından kullanılan Dispatchers.Main üzerinde işlem yapmamalıdır.

Bir akışın CoroutineContext değerini değiştirmek için flowOn ara operatörünü kullanın. flowOn, yukarı akış akışının CoroutineContext değerini değiştirir. Bu değer, flowOn öncesinde (veya sonrasında) uygulanan üretici ve ara operatörlerin olduğu anlamına gelir. Aşağı akış (tüketiciyle birlikte flowOn'dan sonra ara operatörler) etkilenmez ve akıştan collect için kullanılan CoroutineContext üzerinde yürütülür. Birden fazla flowOn operatörü varsa her biri yukarı akışı geçerli konumundan değiştirir.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Bu kodla onEach ve map operatörleri defaultDispatcher kullanırken catch operatörü ve tüketici, viewModelScope tarafından kullanılan Dispatchers.Main üzerinde yürütülür.

Veri kaynağı katmanı G/Ç işlemleri yaparken, G/Ç işlemleri için optimize edilmiş bir görev dağıtıcı kullanmanız gerekir:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Jetpack kitaplıklarındaki akışlar

Flow, birçok Jetpack kitaplığına entegredir ve Android üçüncü taraf kitaplıkları arasında popülerdir. Akış, canlı veri güncellemeleri ve sonsuz veri akışı için idealdir.

Bir veritabanındaki değişikliklerle ilgili bildirim almak için Oda ile Akış'ı kullanabilirsiniz. Veri erişim nesnelerini (DAO) kullanırken canlı güncellemeleri almak için bir Flow türü döndürün.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Example tablosunda her değişiklik olduğunda, veritabanındaki yeni öğelerle yeni bir liste yayınlanır.

Geri çağırmaya dayalı API'leri akışlara dönüştürün

callbackFlow, geri çağırmaya dayalı API'leri akışlara dönüştürmenizi sağlayan bir akış oluşturucudur. Örneğin, Firebase Firestore Android API'leri geri çağırma kullanır.

Bu API'leri akışlara dönüştürmek ve Firestore veritabanı güncellemelerini dinlemek için aşağıdaki kodu kullanabilirsiniz:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow oluşturucunun aksine callbackFlow, değerlerin send işlevine sahip farklı bir CoroutineContext'den veya trySend işlevine sahip bir eş değer dışında dağıtılmasına izin verir.

callbackFlow şirket içinde, kavramsal olarak engelleme sırasına çok benzeyen bir kanal kullanır. Bir kanal, arabelleğe alınabilecek maksimum öğe sayısı olan kapasite ile yapılandırılır. callbackFlow ürününde oluşturulan kanalın varsayılan kapasitesi 64 öğedir. Tam kanala yeni bir öğe eklemeye çalıştığınızda send, yeni öğe için alan açılıncaya kadar yapımcıyı askıya alır. Öte yandan, offer öğeyi kanala eklemez ve hemen false değerini döndürür.

Ek akış kaynakları