Kotlin در اندروید جریان دارد، Kotlin در اندروید جریان دارد

در کوروتین‌ها، یک جریان نوعی است که می‌تواند چندین مقدار را به‌طور متوالی منتشر کند، برخلاف توابع تعلیقی که فقط یک مقدار را برمی‌گردانند. برای مثال، می‌توانید از یک جریان برای دریافت به‌روزرسانی‌های زنده از پایگاه داده استفاده کنید.

جریان ها در بالای کوروتین ها ساخته می شوند و می توانند مقادیر متعددی را ارائه دهند. یک جریان از نظر مفهومی جریانی از داده است که می تواند به صورت ناهمزمان محاسبه شود. مقادیر منتشر شده باید از یک نوع باشند. به عنوان مثال، یک Flow<Int> جریانی است که مقادیر صحیح را منتشر می کند.

یک جریان بسیار شبیه به یک Iterator است که دنباله ای از مقادیر را تولید می کند، اما از توابع تعلیق برای تولید و مصرف مقادیر به صورت ناهمزمان استفاده می کند. این بدان معناست که، برای مثال، جریان می‌تواند با خیال راحت یک درخواست شبکه برای تولید مقدار بعدی بدون مسدود کردن رشته اصلی ارسال کند.

سه نهاد در جریان داده ها دخیل هستند:

  • یک تولید کننده داده هایی را تولید می کند که به جریان اضافه می شود. به لطف کوروتین ها، جریان ها همچنین می توانند داده ها را به صورت ناهمزمان تولید کنند.
  • (اختیاری) واسطه ها می توانند هر مقدار منتشر شده در جریان یا خود جریان را تغییر دهند.
  • یک مصرف کننده ارزش های جریان را مصرف می کند.

نهادهای درگیر در جریان داده ها؛ مصرف کننده، واسطه های اختیاری و تولید کننده
شکل 1. نهادهای درگیر در جریان داده ها: مصرف کننده، واسطه های اختیاری و تولیدکننده.

در اندروید، یک مخزن معمولاً تولید کننده داده های رابط کاربری است که رابط کاربری (UI) را به عنوان مصرف کننده دارد که در نهایت داده ها را نمایش می دهد. در موارد دیگر، لایه UI تولید کننده رویدادهای ورودی کاربر است و سایر لایه های سلسله مراتب آنها را مصرف می کنند. لایه‌های بین تولیدکننده و مصرف‌کننده معمولاً به‌عنوان واسطه عمل می‌کنند که جریان داده‌ها را برای تنظیم آن با الزامات لایه زیر تغییر می‌دهند.

ایجاد یک جریان

برای ایجاد جریان، از APIهای سازنده جریان استفاده کنید. تابع flow builder یک جریان جدید ایجاد می کند که در آن می توانید به صورت دستی مقادیر جدیدی را با استفاده از تابع emit به جریان داده ارسال کنید.

در مثال زیر، یک منبع داده، آخرین اخبار را به طور خودکار در یک بازه زمانی ثابت واکشی می کند. از آنجایی که یک تابع تعلیق نمی تواند چندین مقدار متوالی را برگرداند، منبع داده جریانی را برای برآورده کردن این نیاز ایجاد و برمی گرداند. در این حالت منبع داده به عنوان تولید کننده عمل می کند.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow ساز در یک کوروتین اجرا می شود. بنابراین، از همان API های ناهمزمان بهره می برد، اما برخی محدودیت ها اعمال می شود:

  • جریان ها متوالی هستند. از آنجایی که تولیدکننده در یک برنامه قرار دارد، هنگام فراخوانی یک تابع تعلیق، تولیدکننده تا زمانی که تابع تعلیق برگردد، به حالت تعلیق در می‌آید. در مثال، تولیدکننده تا زمانی که درخواست شبکه fetchLatestNews تکمیل شود، به حالت تعلیق در می‌آید. تنها پس از آن است که نتیجه به جریان منتشر می شود.
  • با سازنده flow ، تولید کننده نمی تواند مقادیری را از یک CoroutineContext متفاوت emit . بنابراین، با ایجاد کوروتین های جدید یا با استفاده از بلوک های کد withContext ، emit در یک CoroutineContext متفاوت فراخوانی نکنید. در این موارد می توانید از سایر سازندگان جریان مانند callbackFlow استفاده کنید.

اصلاح جریان

واسطه ها می توانند از عملگرهای میانی برای تغییر جریان داده ها بدون مصرف مقادیر استفاده کنند. این عملگرها توابعی هستند که وقتی روی یک جریان داده اعمال می شوند، زنجیره ای از عملیات را تنظیم می کنند که تا زمانی که مقادیر در آینده مصرف نشوند، اجرا نمی شوند. در مستندات مرجع Flow درباره عملگرهای میانی بیشتر بیاموزید.

در مثال زیر، لایه مخزن map اپراتور میانی برای تبدیل داده های نمایش داده شده در View استفاده می کند:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

عملگرهای میانی را می‌توان یکی پس از دیگری اعمال کرد و زنجیره‌ای از عملیات را تشکیل می‌دهد که وقتی یک آیتم به جریان گسیل می‌شود، با تنبلی اجرا می‌شوند. توجه داشته باشید که صرفا اعمال یک عملگر میانی برای یک جریان، جمع آوری جریان را شروع نمی کند.

جمع آوری از یک جریان

از یک اپراتور ترمینال برای شروع جریان برای شروع گوش دادن به مقادیر استفاده کنید. برای دریافت تمام مقادیر موجود در جریان در حین انتشار، از collect استفاده کنید. در اسناد رسمی جریان می توانید درباره اپراتورهای ترمینال اطلاعات بیشتری کسب کنید.

از آنجایی که collect یک تابع تعلیق است، باید در یک کوروتین اجرا شود. یک لامبدا را به عنوان پارامتری در نظر می گیرد که روی هر مقدار جدید فراخوانی می شود. از آنجایی که یک تابع تعلیق است، کوروتینی که فراخوانی‌ها collect ممکن است تا زمانی که جریان بسته شود، به حالت تعلیق درآید.

در ادامه مثال قبلی، در اینجا یک پیاده سازی ساده از یک ViewModel است که داده ها را از لایه مخزن مصرف می کند:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

جمع‌آوری جریان، تولیدکننده را فعال می‌کند که آخرین اخبار را تازه‌سازی می‌کند و نتیجه درخواست شبکه را در یک بازه زمانی ثابت منتشر می‌کند. از آنجایی که سازنده همیشه با حلقه while(true) فعال می ماند، جریان داده ها با پاک شدن ViewModel و لغو viewModelScope بسته می شود.

جمع آوری جریان می تواند به دلایل زیر متوقف شود:

  • همانطور که در مثال قبلی نشان داده شده است، کاری که جمع آوری می شود لغو می شود. این نیز تولید کننده اصلی را متوقف می کند.
  • تولید کننده انتشار اقلام را تمام می کند. در این حالت، جریان داده بسته می شود و کوروتینی که collect نامیده می شود، اجرا را از سر می گیرد.

جریان ها سرد و تنبل هستند مگر اینکه با سایر اپراتورهای میانی مشخص شده باشند. این به این معنی است که هر بار که یک اپراتور پایانه در جریان فراخوانی می شود، کد تولید کننده اجرا می شود. در مثال قبلی، داشتن چندین جمع‌آورنده جریان باعث می‌شود که منبع داده، آخرین اخبار را چندین بار در فواصل ثابت مختلف دریافت کند. برای بهینه سازی و به اشتراک گذاری یک جریان زمانی که چندین مصرف کننده به طور همزمان جمع آوری می کنند، از عملگر shareIn استفاده کنید.

گرفتن استثناهای غیرمنتظره

اجرای تولید کننده می تواند از یک کتابخانه شخص ثالث باشد. این بدان معنی است که می تواند استثناهای غیرمنتظره ایجاد کند. برای رسیدگی به این استثناها، از عملگر catch intermediate استفاده کنید.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

در مثال قبلی، هنگامی که یک استثنا رخ می دهد، collect لامبدا فراخوانی نمی شود، زیرا آیتم جدیدی دریافت نشده است.

catch همچنین می تواند موارد را به جریان emit . لایه مخزن نمونه می تواند مقادیر ذخیره شده را به جای آن emit :

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

در این مثال، هنگامی که یک استثنا رخ می دهد، collect لامبدا فراخوانی می شود، زیرا یک آیتم جدید به دلیل استثنا به جریان منتشر شده است.

اجرا در یک CoroutineContext متفاوت

به‌طور پیش‌فرض، تولیدکننده یک flow ساز در CoroutineContext از coroutine که از آن جمع‌آوری می‌شود، اجرا می‌کند، و همانطور که قبلاً ذکر شد، نمی‌تواند مقادیری را از یک CoroutineContext متفاوت emit . این رفتار ممکن است در برخی موارد نامطلوب باشد. به عنوان مثال، در مثال‌های مورد استفاده در سراسر این مبحث، لایه مخزن نباید عملیاتی را روی Dispatchers.Main که توسط viewModelScope استفاده می‌شود، انجام دهد.

برای تغییر CoroutineContext یک جریان، از عملگر میانی flowOn استفاده کنید. flowOn CoroutineContext جریان بالادست را تغییر می دهد، به این معنی که تولید کننده و هر عملگر واسطه ای که قبل از (یا بالاتر) flowOn اعمال شده است. جریان پایین دست (اپراتورهای میانی پس از flowOn همراه با مصرف کننده) تحت تأثیر قرار نمی گیرد و در CoroutineContext مورد استفاده برای collect از جریان اجرا می شود. اگر چندین اپراتور flowOn وجود داشته باشد، هر کدام بالادست را از مکان فعلی خود تغییر می دهد.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

با این کد، اپراتورهای onEach و map از defaultDispatcher استفاده می کنند، در حالی که عملگر catch و مصرف کننده در Dispatchers.Main که توسط viewModelScope استفاده می شود، اجرا می شوند.

از آنجایی که لایه منبع داده در حال انجام کار I/O است، باید از توزیع کننده ای استفاده کنید که برای عملیات I/O بهینه شده است:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // Executes on the IO dispatcher
        ...
    }
        .flowOn(ioDispatcher)
}

جریان در کتابخانه های Jetpack

Flow در بسیاری از کتابخانه‌های Jetpack ادغام شده است و در میان کتابخانه‌های شخص ثالث اندروید محبوب است. Flow برای به روز رسانی داده های زنده و جریان های بی پایان داده مناسب است.

می توانید از Flow with Room استفاده کنید تا از تغییرات یک پایگاه داده مطلع شوید. هنگام استفاده از اشیاء دسترسی به داده (DAO) ، یک نوع Flow را برای دریافت به‌روزرسانی‌های زنده برگردانید.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

هر بار که تغییری در جدول Example ایجاد می شود، یک لیست جدید با موارد جدید در پایگاه داده منتشر می شود.

API های مبتنی بر تماس را به جریان تبدیل کنید

callbackFlow یک جریان ساز است که به شما امکان می دهد API های مبتنی بر تماس را به جریان تبدیل کنید. به عنوان مثال، APIهای Firebase Firestore Android از تماس‌های برگشتی استفاده می‌کنند.

برای تبدیل این APIها به جریان و گوش دادن به به روز رسانی پایگاه داده Firestore، می توانید از کد زیر استفاده کنید:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                trySend(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

برخلاف سازنده flow ، callbackFlow اجازه می‌دهد مقادیر از یک CoroutineContext متفاوت با تابع send یا خارج از یک کوروتین با تابع trySend منتشر شود.

در داخل، callbackFlow از یک کانال استفاده می کند که از نظر مفهومی بسیار شبیه به یک صف مسدود کردن است. یک کانال با ظرفیت ، حداکثر تعداد عناصری که می تواند بافر شود، پیکربندی شده است. کانال ایجاد شده در callbackFlow دارای ظرفیت پیش فرض 64 عنصر است. وقتی می‌خواهید یک عنصر جدید را به یک کانال کامل اضافه کنید، send تا زمانی که فضایی برای عنصر جدید وجود داشته باشد، سازنده را به حالت تعلیق در می‌آورد، در حالی که trySend عنصر را به کانال اضافه نمی‌کند و بلافاصله false را برمی‌گرداند.

trySend بلافاصله عنصر مشخص شده را به کانال اضافه می کند، فقط در صورتی که محدودیت های ظرفیت آن را نقض نکند، و سپس نتیجه موفقیت آمیز را برمی گرداند.

منابع جریان اضافی