تدفق Kotlin على نظام Android

في الكوروتينات، يكون التدفق نوعًا يمكنه إصدار قيم متعدّدة بشكل تسلسلي، على عكس دوال التعليق التي تعرض قيمة واحدة فقط. على سبيل المثال، يمكنك استخدام التدفق لتلقي تحديثات مباشرة من قاعدة بيانات.

يتم إنشاء التدفقات على الكوروتينات ويمكن أن تقدم قيمًا متعددة. التدفق هو من الناحية النظرية مصدر بيانات يمكن حسابه بشكل غير متزامن. يجب أن تكون القيم الصادرة من النوع نفسه. على سبيل المثال، Flow<Int> هو تدفق يُصدر قيمًا صحيحة.

يشبه التدفق إلى حدّ كبير دالة Iterator التي تُنتج تسلسلاً من القيم، ولكنّها تستخدم دوال التعليق لإنتاج القيم واستهلاكها بشكل غير متزامن. وهذا يعني، على سبيل المثال، أنّ التدفق يمكن أن يقدّم بأمان طلب شبكة لإنتاج القيمة التالية بدون حظر سلسلة التعليمات الرئيسية.

هناك ثلاثة كيانات معنية بمصادر البيانات:

  • يُنتج المنتج البيانات التي تتم إضافتها إلى مصدر البيانات. بفضل الكوروتينات، يمكن للتدفقات أيضًا إنتاج بيانات بشكل غير متزامن.
  • (اختياري) يمكن للوسطاء تعديل كل قيمة يتم إطلاقها في البث أو البث نفسه.
  • ويستهلك المستهلك القيم من ساحة المشاركات.

الكيانات المشاركة في مصادر البيانات، والمستهلك، والوسطاء الاختياريين، والمنتج
الشكل 1. الكيانات المشتركة في عمليات بث البيانات: المستهلك والوسطاء الاختياريون والمنتجون.

في نظام التشغيل Android، يكون المستودع عادةً منتِجًا لبيانات واجهة المستخدم وفيه واجهة المستخدم (UI) التي تعرض البيانات في النهاية. في أوقات أخرى، تكون طبقة واجهة المستخدم منتجًا لأحداث إدخال المستخدم وتستخدمها طبقات أخرى من التسلسل الهرمي. عادةً ما تعمل الطبقات بين المنتج والمستهلك كوسطاء يعدّلون بث البيانات لضبطه وفقًا لمتطلبات الطبقة التالية.

إنشاء تدفق

لإنشاء التدفقات، استخدِم واجهات برمجة التطبيقات في أداة إنشاء التدفقات. تنشئ دالة الإنشاء flow تدفقًا جديدًا يمكنك من خلاله إصدار قيم جديدة يدويًا إلى مصدر البيانات باستخدام الدالة emit.

في المثال التالي، يجلب مصدر البيانات آخر الأخبار تلقائيًا على فترة زمنية ثابتة. ونظرًا لأن دالة التعليق لا يمكنها عرض قيم متعددة متتالية، فإن مصدر البيانات ينشئ ويعيد تدفقًا للوفاء بهذا المطلب. في هذه الحالة، يعمل مصدر البيانات كالمنتج.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

يتم تنفيذ أداة إنشاء flow داخل الكوروتين. وبالتالي، تستفيد من نفس واجهات برمجة التطبيقات غير المتزامنة، ولكن تنطبق بعض القيود:

  • تكون التدفقات متسلسلة. بما أنّ المنتج في الكوروتين، عند استدعاء دالة تعليق، يتم تعليق المنتج إلى أن يتم إرجاع دالة التعليق. في المثال، يعلّق المنتج إلى أن يكتمل طلب شبكة fetchLatestNews. عندها فقط يتم نشر النتيجة إلى مجموعة البث.
  • باستخدام أداة إنشاء flow، لا يمكن للأداة emit إنشاء قيم من CoroutineContext مختلف. لذلك، يجب عدم استدعاء emit باستخدام سمة CoroutineContext مختلفة من خلال إنشاء الكوروتينات الجديدة أو باستخدام withContext مجموعات من الرموز. يمكنك استخدام أدوات إنشاء التدفقات الأخرى، مثل callbackFlow في هذه الحالات.

تعديل ساحة المشاركات

يمكن للوسطاء استخدام عوامل التشغيل الوسيطة لتعديل تدفق البيانات بدون استهلاك القيم. هذه المشغلات هي دوال، عند تطبيقها على تدفق من البيانات، تقوم بإعداد سلسلة من العمليات التي لا يتم تنفيذها حتى يتم استهلاك القيم في المستقبل. اطّلِع على مزيد من المعلومات عن العوامل الوسيطة في المستندات المرجعية للتدفق.

في المثال أدناه، تستخدم طبقة المستودع عامل التشغيل الوسيط map لتحويل البيانات التي سيتم عرضها في View:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

يمكن تطبيق العوامل الوسيطة واحدة تلو الأخرى، لتشكيل سلسلة من العمليات التي يتم تنفيذها ببطء عند انبعاث عنصر ما في التدفق. تجدر الإشارة إلى أنّ مجرد تطبيق عامل تشغيل وسيط على مجموعة بث لا يؤدي إلى بدء جمع التدفق.

الجمع من تدفق

استخدِم عامل تشغيل طرفي لتشغيل التدفق لبدء رصد القيم. للحصول على جميع القيم في ساحة المشاركات عند إطلاقها، استخدِم collect. يمكنك الاطّلاع على المزيد من المعلومات حول مشغّلات محطة الدفع في مستندات التدفق الرسمية.

بما أنّ collect هي دالة تعليق، يجب تنفيذها داخل الكوروتين. يأخذ دالة lambda كمعلمة يتم استدعاءها على كل قيمة جديدة. ولأنها دالة تعليق، فإن الكوروتين الذي يستدعي collect قد يعلّق حتى يتم إغلاق التدفق.

لنتابع المثال السابق، إليك عملية تنفيذ بسيطة لـ ViewModel تستهلك البيانات من طبقة المستودع:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

يؤدّي جمع التدفق إلى تشغيل المنتِج الذي يعدّل آخر الأخبار ويصدر نتيجة طلب الشبكة على فترات ثابتة. بما أنّ المنتج يظل نشطًا دائمًا في حلقة التكرار while(true)، سيتم إغلاق بث البيانات عند محو ViewModel وإلغاء viewModelScope.

يمكن أن يتوقف جمع التدفق للأسباب التالية:

  • يتم إلغاء الكوروتين الذي يجمعه، كما هو موضّح في المثال السابق. يؤدي هذا أيضًا إلى إيقاف المنتج الأساسي.
  • ينتهي المنتج من إصدار السلع. في هذه الحالة، يتم إغلاق تدفّق البيانات ويستأنف تنفيذ الكوروتين الذي يُسمى collect عملية التنفيذ.

تكون التدفقات باردة وكسولة ما لم يتم تحديدها مع عوامل تشغيل وسيطة أخرى. هذا يعني أنه يتم تنفيذ التعليمة البرمجية للمنتج في كل مرة يتم استدعاء عامل تشغيل طرفي في التدفق. في المثال السابق، يؤدي استخدام عدة أدوات لتجميع التدفق إلى جلب مصدر البيانات لآخر الأخبار عدة مرات على فترات زمنية ثابتة مختلفة. لتحسين العملية ومشاركتها عندما يجمع عدة مستهلكين في الوقت نفسه، استخدِم عامل التشغيل shareIn.

رصد استثناءات غير متوقعة

يمكن تحميل المنتج من مكتبة تابعة لجهة خارجية. وهذا يعني أنّه قد يؤدي إلى حدوث استثناءات غير متوقعة. للتعامل مع هذه الاستثناءات، استخدِم عامل التشغيل الوسيط catch.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

في المثال السابق، عند حدوث استثناء، لا يتم استدعاء دالة collect lambda، لأنّه لم يتم استلام عنصر جديد.

يمكن لـ "catch" أيضًا emit عنصر في المسار. يمكن لطبقة المستودع النموذجية emit القيم المخزّنة مؤقتًا بدلاً من ذلك:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

في هذا المثال، عند حدوث استثناء، يتم استدعاء دالة collect lambda لأنّه تم إرسال عنصر جديد إلى مجموعة البث لهذا الاستثناء.

التنفيذ في سياق CoroutineContext مختلف

ينفّذ منتج أداة إنشاء flow تلقائيًا CoroutineContext في الكوروتين الذي يجمعه، وكما ذكرنا سابقًا، لا يمكنه emit استخدام قيم من CoroutineContext مختلف. قد يكون هذا السلوك غير مرغوب فيه في بعض الحالات. على سبيل المثال، في الأمثلة المستخدمة في هذا الموضوع، ينبغي ألا تنفّذ طبقة المستودع العمليات على Dispatchers.Main التي يستخدمها viewModelScope.

لتغيير CoroutineContext للتدفق، استخدِم عامل التشغيل الوسيط flowOn. تغيّر flowOn CoroutineContext من التدفق العلوي، أي المنتج وأي عوامل تشغيل وسيطة يتم تطبيقها قبل (أو أعلاه) flowOn. لا يتأثر تدفق الموقع الإلكتروني (عوامل التشغيل الوسيطة بعد flowOn مع المستهلك) ويتم تنفيذه على CoroutineContext المستخدَم في collect من التدفق. في حال توفُّر عدة عوامل تشغيل flowOn، سيغيّر كل منها مصدر البيانات من موقعه الحالي.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

باستخدام هذا الرمز، يستخدم العاملان onEach وmap عاملَي التشغيل defaultDispatcher، بينما يتم تنفيذ عامل التشغيل catch والمستهلك على Dispatchers.Main المستخدَم في viewModelScope.

وبينما تقوم طبقة مصدر البيانات بعمل وحدات الإدخال والإخراج، ينبغي لك استخدام وحدة إرسال محسنة لعمليات الإدخال والإخراج:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

التدفقات في مكتبات Jetpack

تم دمج التدفق في العديد من مكتبات Jetpack، وهو رائج بين مكتبات Android التابعة لجهات خارجية. يعد التدفق مناسبًا جدًا لتحديثات البيانات المباشرة وتدفقات البيانات التي لا نهاية لها.

يمكنك استخدام Flow with Room لتلقّي إشعارات بالتغييرات في قاعدة البيانات. عند استخدام كائنات الوصول إلى البيانات (DAO)، يمكنك عرض النوع Flow للحصول على تحديثات مباشرة.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

في كل مرة يحدث تغيير في جدول Example، يتم إطلاق قائمة جديدة بالعناصر الجديدة في قاعدة البيانات.

تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى تدفقات

callbackFlow هي أداة إنشاء تدفقات تتيح لك تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى مسارات. على سبيل المثال، تستخدم واجهات برمجة تطبيقات Android لـ Firebase Firestore عمليات معاودة الاتصال.

لتحويل واجهات برمجة التطبيقات هذه إلى تدفقات والاستماع إلى تحديثات قاعدة بيانات Firestore، يمكنك استخدام التعليمة البرمجية التالية:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

على عكس أداة إنشاء flow، تسمح callbackFlow بإطلاق القيم من CoroutineContext مختلف باستخدام دالة send أو خارجه باستخدام دالة trySend.

داخليًا، تستخدم callbackFlow قناة، التي تتشابه من الناحية النظرية إلى حد كبير مع قائمة انتظار الحظر. يتمّ إعداد القناة باستخدام السعة، وهي الحدّ الأقصى لعدد العناصر التي يمكن تخزينها مؤقتًا. تحتوي القناة التي تم إنشاؤها في callbackFlow على سعة تلقائية من 64 عنصرًا. عند محاولة إضافة عنصر جديد إلى قناة كاملة، يعلّق send المنتج إلى أن تتوفر مساحة للعنصر الجديد، في حين أنّ offer لا يضيف العنصر إلى القناة ويعرض علامة false على الفور.

موارد التدفق الإضافية