في الكوروتينات، يكون التدفق هو نوع يمكنه إرسال قيم متعددة بشكل تسلسلي، على عكس دوال التعليق التي تعرض قيمة واحدة فقط. على سبيل المثال، يمكنك استخدام تدفق لتلقي تحديثات مباشرة من قاعدة بيانات.
تعتمد التدفقات على الكوروتينات ويمكن أن توفر قيمًا متعددة.
من الناحية النظرية، إنّ التدفق هو مصدر بيانات يمكن احتسابه بشكل غير متزامن. يجب أن تكون القيم المنبعثة من النوع نفسه. على سبيل المثال، Flow<Int>
هو تدفق يُصدر قيمًا صحيحة.
يتشابه التدفق إلى حد كبير مع Iterator
الذي ينتج سلسلة من القيم، ولكنه يستخدم دوال التعليق لإنتاج القيم واستهلاكها بشكل غير متزامن. وهذا يعني أنّ المسار يمكنه مثلاً إرسال طلب شبكة بأمان لإنتاج القيمة التالية بدون حظر سلسلة التعليمات الرئيسية.
هناك ثلاثة كيانات متضمنة في تدفقات البيانات:
- ينتج المنتج البيانات التي تتم إضافتها إلى مصدر البيانات. بفضل الكوروتينات، يمكن للتدفقات أيضًا إنتاج البيانات بشكل غير متزامن.
- (اختياري) يمكن للوسطاء تعديل كل قيمة يتم إطلاقها في ساحة المشاركات أو البث نفسه.
- يستهلك المستهلك القيم من مصدر البيانات.
في نظام Android، يكون المستودع عادةً منتجًا لبيانات واجهة المستخدم التي تحتوي على واجهة المستخدم (UI) التي تعرض البيانات في النهاية. وفي أحيان أخرى، تكون طبقة واجهة المستخدم منتجة لأحداث إدخال المستخدم، وطبقات أخرى من التسلسل الهرمي تستهلكها. تعمل الطبقات الواقعة بين المنتج والمستهلك عادةً كوسطاء يعدّلون تدفق البيانات لضبطه وفقًا لمتطلبات الطبقة التالية.
إنشاء تدفق
لإنشاء التدفقات، استخدم واجهات برمجة التطبيقات
لأداة إنشاء التدفق. تنشئ دالة إنشاء flow
مسارًا جديدًا يمكنك من خلاله إرسال قيم جديدة يدويًا إلى تدفق البيانات باستخدام الدالة
emit
.
في المثال التالي، يجلب مصدر البيانات آخر الأخبار تلقائيًا على فترات زمنية ثابتة. نظرًا لأن دالة التعليق لا يمكن أن ترجع عدة قيم متتالية، ينشئ مصدر البيانات تدفقًا ويعيده لتحقيق هذا المطلب. في هذه الحالة، يعمل مصدر البيانات كمنتج.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
يتم تنفيذ أداة إنشاء flow
داخل الكوروتين. وبالتالي، تستفيد من واجهات برمجة التطبيقات غير المتزامنة نفسها، ولكن تسري بعض القيود:
- تكون التدفقات متسلسلة. بما أن المنتِج موجود في الكوروتين، عند استدعاء دالة تعليق، يعلّق المنتج مؤقتًا حتى تُرجع دالة التعليق. في المثال، يتم تعليق المنتج إلى أن يكتمل طلب الشبكة
fetchLatestNews
. وحينئذٍ فقط يتم إصدار النتيجة إلى ساحة المشاركات. - باستخدام أداة إنشاء
flow
، لا يمكن للناتجemit
القيم منCoroutineContext
مختلفة. لذلك، لا تطلبemit
ضمنCoroutineContext
مختلف من خلال إنشاء كوروتين جديد أو استخدامwithContext
كتلة من الرموز البرمجية. يمكنك في هذه الحالات استخدام أدوات إنشاء تدفقات أخرى مثلcallbackFlow
.
تعديل ساحة المشاركات
يمكن للوسطاء استخدام عوامل تشغيل متوسطة لتعديل تدفق البيانات بدون استهلاك القيم. هذه العوامل هي دوال، عند تطبيقها على تدفق من البيانات، تعد سلسلة من العمليات التي لا يتم تنفيذها حتى يتم استهلاك القيم في المستقبل. لمزيد من المعلومات حول المشغلات المتوسطة، يُرجى الاطّلاع على المستندات المرجعية للتدفق.
في المثال أدناه، تستخدم طبقة المستودع عامل التشغيل المتوسط
map
لتحويل البيانات التي سيتم عرضها على View
:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
يمكن تطبيق المشغلات المتوسطة واحدًا تلو الآخر، لتشكيل سلسلة من العمليات التي يتم تنفيذها ببطء عند إطلاق عنصر في التدفق. يُرجى العلم أنّ تطبيق عامل تشغيل متوسط على التدفق لا يؤدي إلى بدء جمع البيانات.
الجمع من التدفق
استخدِم عامل تشغيل طرفي لتشغيل التدفق لبدء الاستماع إلى القيم. للحصول على جميع القيم في ساحة المشاركات بالشكل المنبعث، استخدِم collect
.
يمكنك معرفة المزيد من المعلومات حول مشغّلي الوحدات الطرفية في المستندات الرسمية حول التدفق.
بما أنّ collect
هي دالة تعليق، يجب تنفيذها داخل كوروتين. تأخذ lambda كمعلمة يتم استدعاؤها على كل قيمة جديدة. ونظرًا لأنها دالة تعليق، قد يتم تعليق الكوروتين
الذي يستدعي collect
إلى أن يتم إغلاق التدفق.
استكمالاً للمثال السابق، إليك تنفيذًا بسيطًا لـ ViewModel
يستخدم البيانات من طبقة المستودع:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
وتؤدي عملية جمع البيانات إلى تشغيل المنتج الذي يحدّث آخر الأخبار
ويصدر نتيجة طلب الشبكة على فاصل ثابت. وبما أنّ المنتج
يظل نشطًا دائمًا في حلقة while(true)
، سيتم إغلاق مصدر البيانات عند محو
ViewModel ويتم إلغاء viewModelScope
.
يمكن أن يتوقف جمع البيانات للأسباب التالية:
- يتم إلغاء الكوروتين الذي يجمع، كما هو موضّح في المثال السابق. يؤدي هذا أيضًا إلى إيقاف المنتج الأساسي.
- ينتهي المنتج من إطلاق العناصر. في هذه الحالة، يتم إغلاق تدفق البيانات
ويستأنف الكوروتين الذي يسمى
collect
التنفيذ.
تكون التدفقات باردة وكسولة ما لم يتم تحديدها مع عوامل تشغيل وسيطة أخرى. هذا يعني أنه يتم تنفيذ كود المنتج في كل مرة
يتم استدعاء عامل تشغيل نهائي في التدفق. في المثال السابق، يؤدي وجود عدّة أدوات تجميع للتدفق إلى استرجاع مصدر البيانات لآخر الأخبار عدة مرات على فترات زمنية ثابتة مختلفة. لتحسين التدفق ومشاركته عندما يجمع عدة مستهلكين في الوقت نفسه، استخدِم عامل التشغيل shareIn
.
رصد الاستثناءات غير المتوقعة
يمكن أن تأتي عملية تنفيذ المنتج من مكتبة تابعة لجهة خارجية.
وهذا يعني أنه قد ينتج عنه استثناءات غير متوقعة. للتعامل مع هذه الاستثناءات، استخدِم عامل التشغيل المتوسط
catch
.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
في المثال السابق، عند حدوث استثناء، لا يتم استدعاء lambda
collect
لعدم تلقّي عنصر جديد.
يمكن لـ "catch
" أيضًا إضافة emit
عنصر إلى المسار. يمكن لطبقة المستودع النموذجية emit
القيم المخزنة مؤقتًا بدلاً من ذلك:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
في هذا المثال، عند حدوث استثناء، يتم استدعاء دالة lambda collect
لأنّه تم إرسال عنصر جديد إلى البث بسبب
الاستثناء.
التنفيذ في سياق CoroutineContext مختلف
بشكل تلقائي، ينفّذ المنتج الذي أنشأ أداة إنشاء flow
في CoroutineContext
من الكوروتين الذي يجمع منه، وكما ذكرنا سابقًا، لا يمكنه emit
استخدام قيم من CoroutineContext
مختلف. وقد يكون هذا السلوك غير مرغوب فيه في بعض الحالات.
على سبيل المثال، في الأمثلة المستخدَمة في هذا الموضوع، يجب ألا تُجري طبقة المستودع أي عمليات على Dispatchers.Main
التي تستخدمها viewModelScope
.
لتغيير CoroutineContext
للتدفق، استخدِم عامل التشغيل الوسيط
flowOn
.
تؤدي السمة flowOn
إلى تغيير CoroutineContext
في المسار الرئيسي، ما يعني أنّ المنتج وأي عوامل تشغيل وسيطة تم تطبيقها قبل (أو أعلى)
flowOn
. لا يتأثر تدفق عملية نقل البيانات (عوامل التشغيل الوسيطة بعد flowOn
بالإضافة إلى المستهلك) ويتم تنفيذه على
CoroutineContext
المستخدَم في collect
من المسار. إذا كان هناك العديد من عوامل تشغيل flowOn
، سيغيّر كل عامل تشغيل من عوامل تشغيل الصفحة الرئيسية من موقعها الحالي.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
من خلال هذا الرمز، يستخدم عاملا التشغيل onEach
وmap
العاملَين defaultDispatcher
،
في حين يتم تنفيذ عامل التشغيل catch
والمستهلك على
Dispatchers.Main
ويتم استخدامه من خلال viewModelScope
.
بما أنّ طبقة مصدر البيانات هي التي تنفّذ أعمال وحدات الإدخال والإخراج، عليك استخدام مُرسِل محسَّن لعمليات الإدخال والإخراج:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
مسارات في مكتبات Jetpack
تم دمج Floodlight في العديد من مكتبات Jetpack، وهو شائع بين مكتبات الجهات الخارجية على Android. يُعد التدفق مناسبًا جدًا لتحديثات البيانات المباشرة وتدفقات البيانات اللانهائية.
يمكنك استخدام
التدفق مع الغرفة
للإبلاغ بالتغييرات في قاعدة البيانات. عند استخدام
كائنات الوصول إلى البيانات (DAO)،
يمكنك إرجاع النوع Flow
للحصول على تحديثات مباشرة.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
عندما يحدث تغيير في الجدول Example
، يتم إصدار قائمة جديدة بالعناصر الجديدة في قاعدة البيانات.
تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى مسارات
callbackFlow
هي أداة إنشاء مسارات تتيح لك تحويل واجهات برمجة التطبيقات المستندة إلى معاودة الاتصال إلى مسارات.
على سبيل المثال، تستخدم واجهات برمجة تطبيقات Firebase Firestore
في Android عمليات معاودة الاتصال.
لتحويل واجهات برمجة التطبيقات هذه إلى تدفقات والاستماع إلى تحديثات قاعدة بيانات Firestore، يمكنك استخدام التعليمة البرمجية التالية:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
بخلاف أداة إنشاء flow
، تسمح السمة callbackFlow
بإطلاق القيم من CoroutineContext
مختلفة باستخدام الدالة send
أو خارج الكوروتين من خلال الدالة trySend
.
داخليًا، يستخدم callbackFlow
قناة،
وهي تشبه إلى حدّ كبير قائمة انتظار الحظر.
يتم إعداد القناة باستخدام السعة، وهي أقصى عدد من العناصر التي يمكن تخزينها مؤقتًا. إنّ القناة التي تم إنشاؤها في callbackFlow
بسعة
64 عنصرًا بشكل تلقائي عندما تحاول إضافة عنصر جديد إلى قناة كاملة، تعلّق send
المنتج إلى أن تتوفر مساحة للعنصر الجديد، في حين لا يضيف trySend
العنصر إلى القناة ويعرض false
على الفور.
تضيف السمة trySend
العنصر المحدّد إلى القناة على الفور، وذلك فقط
إذا لم يكن المحتوى ينتهك قيود الحدّ الأقصى، ثم يعرض
النتيجة الناجحة.
موارد التدفق الإضافية
- اختبار مسارات Kotlin على Android
StateFlow
وSharedFlow
- مراجع إضافية حول الكوروتينات في لغة Kotlin وتدفق البيانات