در کوروتینها، یک جریان نوعی است که میتواند چندین مقدار را بهطور متوالی منتشر کند، برخلاف توابع تعلیقی که فقط یک مقدار را برمیگردانند. برای مثال، میتوانید از یک جریان برای دریافت بهروزرسانیهای زنده از پایگاه داده استفاده کنید.
جریان ها در بالای کوروتین ها ساخته می شوند و می توانند مقادیر متعددی را ارائه دهند. یک جریان از نظر مفهومی جریانی از داده است که می تواند به صورت ناهمزمان محاسبه شود. مقادیر منتشر شده باید از یک نوع باشند. به عنوان مثال، یک Flow<Int>
جریانی است که مقادیر صحیح را منتشر می کند.
یک جریان بسیار شبیه به یک Iterator
است که دنباله ای از مقادیر را تولید می کند، اما از توابع تعلیق برای تولید و مصرف مقادیر به صورت ناهمزمان استفاده می کند. این بدان معناست که، برای مثال، جریان میتواند با خیال راحت یک درخواست شبکه برای تولید مقدار بعدی بدون مسدود کردن رشته اصلی ارسال کند.
سه نهاد در جریان داده ها دخیل هستند:
- یک تولید کننده داده هایی را تولید می کند که به جریان اضافه می شود. به لطف کوروتین ها، جریان ها همچنین می توانند داده ها را به صورت ناهمزمان تولید کنند.
- (اختیاری) واسطه ها می توانند هر مقدار منتشر شده در جریان یا خود جریان را تغییر دهند.
- یک مصرف کننده ارزش های جریان را مصرف می کند.
در اندروید، یک مخزن معمولاً تولید کننده داده های رابط کاربری است که رابط کاربری (UI) را به عنوان مصرف کننده دارد که در نهایت داده ها را نمایش می دهد. در موارد دیگر، لایه UI تولید کننده رویدادهای ورودی کاربر است و سایر لایه های سلسله مراتب آنها را مصرف می کنند. لایههای بین تولیدکننده و مصرفکننده معمولاً بهعنوان واسطه عمل میکنند که جریان دادهها را برای تنظیم آن با الزامات لایه زیر تغییر میدهند.
ایجاد یک جریان
برای ایجاد جریان، از APIهای سازنده جریان استفاده کنید. تابع flow
builder یک جریان جدید ایجاد می کند که در آن می توانید به صورت دستی مقادیر جدیدی را با استفاده از تابع emit
به جریان داده ارسال کنید.
در مثال زیر، یک منبع داده، آخرین اخبار را به طور خودکار در یک بازه زمانی ثابت واکشی می کند. از آنجایی که یک تابع تعلیق نمی تواند چندین مقدار متوالی را برگرداند، منبع داده جریانی را برای برآورده کردن این نیاز ایجاد و برمی گرداند. در این حالت منبع داده به عنوان تولید کننده عمل می کند.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
ساز در یک کوروتین اجرا می شود. بنابراین، از همان API های ناهمزمان بهره می برد، اما برخی محدودیت ها اعمال می شود:
- جریان ها متوالی هستند. از آنجایی که تولیدکننده در یک برنامه قرار دارد، هنگام فراخوانی یک تابع تعلیق، تولیدکننده تا زمانی که تابع تعلیق برگردد، به حالت تعلیق در میآید. در مثال، تولیدکننده تا زمانی که درخواست شبکه
fetchLatestNews
تکمیل شود، به حالت تعلیق در میآید. تنها پس از آن است که نتیجه به جریان منتشر می شود. - با سازنده
flow
، تولید کننده نمی تواند مقادیری را از یکCoroutineContext
متفاوتemit
. بنابراین، با ایجاد کوروتین های جدید یا با استفاده از بلوک های کدwithContext
،emit
در یکCoroutineContext
متفاوت فراخوانی نکنید. در این موارد می توانید از سایر سازندگان جریان مانندcallbackFlow
استفاده کنید.
اصلاح جریان
واسطه ها می توانند از عملگرهای میانی برای تغییر جریان داده ها بدون مصرف مقادیر استفاده کنند. این عملگرها توابعی هستند که وقتی روی یک جریان داده اعمال می شوند، زنجیره ای از عملیات را تنظیم می کنند که تا زمانی که مقادیر در آینده مصرف نشوند، اجرا نمی شوند. در مستندات مرجع Flow درباره عملگرهای میانی بیشتر بیاموزید.
در مثال زیر، لایه مخزن map
اپراتور میانی برای تبدیل داده های نمایش داده شده در View
استفاده می کند:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
عملگرهای میانی را میتوان یکی پس از دیگری اعمال کرد و زنجیرهای از عملیات را تشکیل میدهد که وقتی یک آیتم به جریان گسیل میشود، با تنبلی اجرا میشوند. توجه داشته باشید که صرفا اعمال یک عملگر میانی برای یک جریان، جمع آوری جریان را شروع نمی کند.
جمع آوری از یک جریان
از یک اپراتور ترمینال برای شروع جریان برای شروع گوش دادن به مقادیر استفاده کنید. برای دریافت تمام مقادیر موجود در جریان در حین انتشار، از collect
استفاده کنید. در اسناد رسمی جریان می توانید درباره اپراتورهای ترمینال اطلاعات بیشتری کسب کنید.
از آنجایی که collect
یک تابع تعلیق است، باید در یک کوروتین اجرا شود. یک لامبدا را به عنوان پارامتری در نظر می گیرد که روی هر مقدار جدید فراخوانی می شود. از آنجایی که یک تابع تعلیق است، کوروتینی که فراخوانیها collect
ممکن است تا زمانی که جریان بسته شود، به حالت تعلیق درآید.
در ادامه مثال قبلی، در اینجا یک پیاده سازی ساده از یک ViewModel
است که داده ها را از لایه مخزن مصرف می کند:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
جمعآوری جریان، تولیدکننده را فعال میکند که آخرین اخبار را تازهسازی میکند و نتیجه درخواست شبکه را در یک بازه زمانی ثابت منتشر میکند. از آنجایی که سازنده همیشه با حلقه while(true)
فعال می ماند، جریان داده ها با پاک شدن ViewModel و لغو viewModelScope
بسته می شود.
جمع آوری جریان می تواند به دلایل زیر متوقف شود:
- همانطور که در مثال قبلی نشان داده شده است، کاری که جمع آوری می شود لغو می شود. این نیز تولید کننده اصلی را متوقف می کند.
- تولید کننده انتشار اقلام را تمام می کند. در این حالت، جریان داده بسته می شود و کوروتینی که
collect
نامیده می شود، اجرا را از سر می گیرد.
جریان ها سرد و تنبل هستند مگر اینکه با سایر اپراتورهای میانی مشخص شده باشند. این به این معنی است که هر بار که یک اپراتور پایانه در جریان فراخوانی می شود، کد تولید کننده اجرا می شود. در مثال قبلی، داشتن چندین جمعآورنده جریان باعث میشود که منبع داده، آخرین اخبار را چندین بار در فواصل ثابت مختلف دریافت کند. برای بهینه سازی و به اشتراک گذاری یک جریان زمانی که چندین مصرف کننده به طور همزمان جمع آوری می کنند، از عملگر shareIn
استفاده کنید.
گرفتن استثناهای غیرمنتظره
اجرای تولید کننده می تواند از یک کتابخانه شخص ثالث باشد. این بدان معنی است که می تواند استثناهای غیرمنتظره ایجاد کند. برای رسیدگی به این استثناها، از عملگر catch
intermediate استفاده کنید.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
در مثال قبلی، هنگامی که یک استثنا رخ می دهد، collect
لامبدا فراخوانی نمی شود، زیرا آیتم جدیدی دریافت نشده است.
catch
همچنین می تواند موارد را به جریان emit
. لایه مخزن نمونه می تواند مقادیر ذخیره شده را به جای آن emit
:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
در این مثال، هنگامی که یک استثنا رخ می دهد، collect
لامبدا فراخوانی می شود، زیرا یک آیتم جدید به دلیل استثنا به جریان منتشر شده است.
اجرا در یک CoroutineContext متفاوت
بهطور پیشفرض، تولیدکننده یک flow
ساز در CoroutineContext
از coroutine که از آن جمعآوری میشود، اجرا میکند، و همانطور که قبلاً ذکر شد، نمیتواند مقادیری را از یک CoroutineContext
متفاوت emit
. این رفتار ممکن است در برخی موارد نامطلوب باشد. به عنوان مثال، در مثالهای مورد استفاده در سراسر این مبحث، لایه مخزن نباید عملیاتی را روی Dispatchers.Main
که توسط viewModelScope
استفاده میشود، انجام دهد.
برای تغییر CoroutineContext
یک جریان، از عملگر میانی flowOn
استفاده کنید. flowOn
CoroutineContext
جریان بالادست را تغییر می دهد، به این معنی که تولید کننده و هر عملگر واسطه ای که قبل از (یا بالاتر) flowOn
اعمال شده است. جریان پایین دست (اپراتورهای میانی پس از flowOn
همراه با مصرف کننده) تحت تأثیر قرار نمی گیرد و در CoroutineContext
مورد استفاده برای collect
از جریان اجرا می شود. اگر چندین اپراتور flowOn
وجود داشته باشد، هر کدام بالادست را از مکان فعلی خود تغییر می دهد.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
با این کد، اپراتورهای onEach
و map
از defaultDispatcher
استفاده می کنند، در حالی که عملگر catch
و مصرف کننده در Dispatchers.Main
که توسط viewModelScope
استفاده می شود، اجرا می شوند.
از آنجایی که لایه منبع داده در حال انجام کار I/O است، باید از توزیع کننده ای استفاده کنید که برای عملیات I/O بهینه شده است:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
جریان در کتابخانه های Jetpack
Flow در بسیاری از کتابخانههای Jetpack ادغام شده است و در میان کتابخانههای شخص ثالث اندروید محبوب است. Flow برای به روز رسانی داده های زنده و جریان های بی پایان داده مناسب است.
می توانید از Flow with Room استفاده کنید تا از تغییرات یک پایگاه داده مطلع شوید. هنگام استفاده از اشیاء دسترسی به داده (DAO) ، یک نوع Flow
را برای دریافت بهروزرسانیهای زنده برگردانید.
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
هر بار که تغییری در جدول Example
ایجاد می شود، یک لیست جدید با موارد جدید در پایگاه داده منتشر می شود.
API های مبتنی بر تماس را به جریان تبدیل کنید
callbackFlow
یک جریان ساز است که به شما امکان می دهد API های مبتنی بر تماس را به جریان تبدیل کنید. به عنوان مثال، APIهای Firebase Firestore Android از تماسهای برگشتی استفاده میکنند.
برای تبدیل این APIها به جریان و گوش دادن به به روز رسانی پایگاه داده Firestore، می توانید از کد زیر استفاده کنید:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
برخلاف سازنده flow
، callbackFlow
اجازه میدهد مقادیر از یک CoroutineContext
متفاوت با تابع send
یا خارج از یک کوروتین با تابع trySend
منتشر شود.
در داخل، callbackFlow
از یک کانال استفاده می کند که از نظر مفهومی بسیار شبیه به یک صف مسدود کردن است. یک کانال با ظرفیت ، حداکثر تعداد عناصری که می تواند بافر شود، پیکربندی شده است. کانال ایجاد شده در callbackFlow
دارای ظرفیت پیش فرض 64 عنصر است. وقتی میخواهید یک عنصر جدید را به یک کانال کامل اضافه کنید، send
تا زمانی که فضایی برای عنصر جدید وجود داشته باشد، سازنده را به حالت تعلیق در میآورد، در حالی که trySend
عنصر را به کانال اضافه نمیکند و بلافاصله false
را برمیگرداند.
trySend
بلافاصله عنصر مشخص شده را به کانال اضافه می کند، فقط در صورتی که محدودیت های ظرفیت آن را نقض نکند، و سپس نتیجه موفقیت آمیز را برمی گرداند.