Android'de Kotlin akışı

Eş yordamlarda, yalnızca tek bir değer döndüren askıya alma işlevlerinin aksine akış, birden fazla değeri sırayla yayınlayabilen bir türdür. Örneğin, bir veritabanından canlı güncellemeler almak için bir akış kullanabilirsiniz.

Akışlar eş yordamların üzerine kuruludur ve birden fazla değer sağlayabilir. Akış, kavramsal olarak eşzamansız olarak hesaplanabilen bir veri akışıdır. Yayınlanan değerler aynı türde olmalıdır. Örneğin Flow<Int>, tam sayı değerleri yayan bir akıştır.

Akış, bir değer dizisi oluşturan Iterator işlevine çok benzer ancak eşzamansız olarak değer üretmek ve tüketmek için askıya alma işlevlerini kullanır. Bu, örneğin akışın ana iş parçacığını engellemeden bir sonraki değeri üretmek için güvenli bir şekilde ağ isteğinde bulunabileceği anlamına gelir.

Veri akışlarına dahil olan üç varlık vardır:

  • Üretici, akışa eklenen veriler oluşturur. Eş yordamlar sayesinde akışlar, verileri eşzamansız olarak da üretebilir.
  • (İsteğe bağlı) Aracılar, akışa veya akışın kendisine yayınlanan her bir değeri değiştirebilir.
  • Tüketiciler, akıştaki değerleri tüketir.

Veri akışlarına dahil olan tüzel kişiler; tüketici, isteğe bağlı aracılar ve üreticiler
Şekil 1. Veri akışlarında yer alan tüzel kişiler: tüketici, isteğe bağlı aracılar ve üretici.

Android'de depo, genellikle verileri görüntüleyen tüketici olarak kullanıcı arayüzüne sahip bir kullanıcı arayüzü verileri üreticisidir. Bazı durumlarda ise kullanıcı arayüzü katmanı, kullanıcı giriş etkinliklerinin üreticisidir ve hiyerarşinin diğer katmanları bunları tüketir. Üretici ve tüketici arasındaki katmanlar genellikle veri akışını değiştirerek bir sonraki katmanın gereksinimlerine göre düzenleyen aracı görevi görür.

Akış oluşturma

Akış oluşturmak için akış oluşturucu API'lerini kullanın. flow oluşturucu işlevi, emit işlevini kullanarak veri akışına manuel olarak yeni değerler yayınlayabileceğiniz yeni bir akış oluşturur.

Aşağıdaki örnekte bir veri kaynağı en son haberleri sabit bir aralıklarla otomatik olarak getirir. Askıya alma işlevi birden fazla ardışık değer döndüremediğinden veri kaynağı bu koşulu karşılamak için bir akış oluşturur ve döndürür. Bu durumda veri kaynağı, üretici rolünü üstlenir.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow oluşturucu, bir eş yordam içinde yürütüldü. Dolayısıyla, aynı eşzamansız API'lerden yararlanır ancak bazı kısıtlamalar geçerlidir:

  • Akışlar sıralıdır. Üretici bir eş yordam içinde olduğundan, askıya alma işlevi çağrılırken askıya alma işlevi geri dönene kadar cihazı askıya alır. Örnekte üretici, fetchLatestNews ağ isteği tamamlanana kadar askıya alınır. Sonuç, ancak sonrasında akışa iletilir.
  • flow oluşturucuyla, üretici farklı bir CoroutineContext'den emit değerleri alamaz. Bu nedenle, yeni eş yordamlar oluşturarak veya withContext kod blokları kullanarak emit öğesini farklı bir CoroutineContext içinde çağırmayın. Bu durumlarda callbackFlow gibi diğer akış oluşturucuları kullanabilirsiniz.

Akışı değiştirme

Aracılar, değerleri tüketmeden veri akışını değiştirmek için ara operatörler kullanabilir. Bu operatörler, bir veri akışına uygulandığında değerler gelecekte tüketilene kadar yürütülmeyen bir işlem zinciri oluşturan işlevlerdir. Akış referans belgelerinden ara operatörler hakkında daha fazla bilgi edinebilirsiniz.

Aşağıdaki örnekte, depo katmanı View üzerinde görüntülenecek verileri dönüştürmek için map ara operatörünü kullanır:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

Ara operatörler birbiri ardına uygulanabilir. Böylece, bir öğe akışa yayınlandığında geç gerçekleştirilen bir işlem zinciri oluşturabilir. Bir akışa sadece ara operatör uygulamanın akış toplama işlemini başlatmadığını unutmayın.

Akıştan veri toplama

Değerleri dinlemeye başlamak üzere akışı tetiklemek için bir terminal operatörü kullanın. Yayınlandığı şekilde akıştaki tüm değerleri almak için collect işlevini kullanın. Resmi akış belgelerinde terminal operatörleri hakkında daha fazla bilgi edinebilirsiniz.

collect bir askıya alma işlevi olduğundan, eş yordam içinde yürütülmesi gerekir. Her yeni değerde çağrılan parametre olarak lambda kullanılır. Bu bir askıya alma işlevi olduğundan, collect çağrısı yapan eş düzey çalışan, akış kapatılana kadar askıya alınabilir.

Önceki örnekten devam edelim. Depo katmanındaki verileri tüketen bir ViewModel şöyle basit bir şekilde uygulanabilir:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Akışın toplanması, en son haberleri yenileyen ve ağ isteğinin sonucunu sabit aralıklarla yayan üreticiyi tetikler. Üretici while(true) döngüsüyle her zaman etkin olduğundan, ViewModel temizlendiğinde ve viewModelScope iptal edildiğinde veri akışı kapatılır.

Akış toplama aşağıdaki nedenlerle durdurulabilir:

  • Bir önceki örnekte gösterildiği gibi, toplanan eş yordam iptal edildi. Bu işlem, ana yapımcıyı da durdurur.
  • Üretici, öğeleri yayınlamayı bitirir. Bu durumda veri akışı kapatılır ve collect adlı eş yordam yürütme devam eder.

Diğer ara operatörlerle belirtilmediği sürece akışlar soğuk ve geç olur. Bu, akışta bir terminal operatörü her çağrıldığında üretici kodunun yürütülmesi anlamına gelir. Önceki örnekte, birden fazla akış toplayıcının olması, veri kaynağının en son haberleri farklı sabit aralıklarla birden çok kez getirmesine neden olur. Birden fazla tüketici aynı anda toplama yaptığında akışı optimize etmek ve paylaşmak için shareIn operatörünü kullanın.

Beklenmeyen istisnaları yakalama

Üretici uygulaması, üçüncü taraf kitaplığından gelebilir. Bu, beklenmedik istisnalar oluşturabileceği anlamına gelir. Bu istisnaları işlemek için catch ara operatörünü kullanın.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Yukarıdaki örnekte bir istisna oluştuğunda, yeni bir öğe alınmadığından collect lambda çağrılmaz.

catch, akışa emit öğe de ekleyebilir. Örnek depo katmanı, bunun yerine önbelleğe alınan değerleri emit:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

Bu örnekte, bir istisna oluştuğunda istisna nedeniyle akışa yeni bir öğe yayınlandığı için collect lambdası çağrılır.

Farklı bir CoroutineContext'te yürütme

Varsayılan olarak, flow oluşturucunun üreticisi, kendisinden toplanan eş yordamın CoroutineContext içinde yürütülür ve daha önce de belirtildiği gibi farklı bir CoroutineContext öğesinden emit değerleri alamaz. Bazı durumlarda bu istenmeyen bir davranış olabilir. Örneğin, bu konu genelinde kullanılan örneklerde depo katmanı, viewModelScope tarafından kullanılan Dispatchers.Main üzerinde işlem yapmamalıdır.

Bir akışın CoroutineContext değerini değiştirmek için flowOn ara operatörünü kullanın. flowOn, yukarı akış akışının CoroutineContext değerini değiştirir; yani üretici ve önce (veya üstü) flowOn uygulanan tüm ara operatörler. Aşağı akış akışı (tüketiciyle birlikte flowOn tarihinden sonra ara operatörler) etkilenmez ve akıştan collect için kullanılan CoroutineContext üzerinde yürütülür. Birden fazla flowOn operatörü varsa her biri yukarı akışı mevcut konumundan değiştirir.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Bu kodla, onEach ve map operatörleri defaultDispatcher'ı kullanırken catch operatörü ve tüketici, viewModelScope tarafından kullanılan Dispatchers.Main üzerinde yürütülür.

Veri kaynağı katmanı G/Ç çalışması yaptığından, G/Ç işlemleri için optimize edilmiş bir görev dağıtıcı kullanmanız gerekir:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

Jetpack kitaplıklarındaki akışlar

Akış birçok Jetpack kitaplığına entegre edilmiştir ve Android üçüncü taraf kitaplıkları arasında popülerdir. Akış, canlı veri güncellemeleri ve sonsuz veri akışı için idealdir.

Veritabanındaki değişikliklerle ilgili bilgi almak için Flow with Room'u kullanabilirsiniz. Veri erişim nesnelerini (DAO) kullanırken canlı güncellemeler almak için bir Flow türü döndürün.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Example tablosunda her değişiklik olduğunda, veritabanındaki yeni öğeleri içeren yeni bir liste yayınlanır.

Geri çağırmaya dayalı API'leri akışlara dönüştürme

callbackFlow, geri çağırmaya dayalı API'leri akışlara dönüştürmenizi sağlayan bir akış oluşturucudur. Örneğin, Firebase Firestore Android API'lerinde geri çağırma işlevleri kullanılır.

Bu API'leri akışlara dönüştürmek ve Firestore veritabanı güncellemelerini dinlemek için aşağıdaki kodu kullanabilirsiniz:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow oluşturucudan farklı olarak callbackFlow, değerlerin send işleviyle farklı bir CoroutineContext'den veya trySend işleviyle bir eş yordamın dışında yayınlanmasına izin verir.

callbackFlow, dahili olarak bir kanal kullanır. Bu kanal, kavramsal olarak engelleme sırasına çok benzer. Bir kanal, arabelleğe alınabilecek maksimum öğe sayısı olan bir kapasite ile yapılandırılır. callbackFlow ürününde oluşturulan kanalın varsayılan kapasitesi 64 öğedir. Tam kanala yeni bir öğe eklemeye çalıştığınızda, send yeni öğe için yer ayrılana kadar yapımcıyı askıya alır. trySend ise öğeyi kanala eklemez ve false değerini hemen döndürür.

trySend, yalnızca kapasite kısıtlamalarını ihlal etmediği sürece belirtilen öğeyi kanala hemen ekler ve daha sonra başarılı sonucu döndürür.

Ek akış kaynakları