coroutines-এ, একটি ফ্লো হল এমন একটি ধরন যা ক্রমানুসারে একাধিক মান নির্গত করতে পারে, শুধুমাত্র একটি একক মান ফিরিয়ে দেয় এমন ফাংশনগুলিকে স্থগিত করার বিপরীতে। উদাহরণস্বরূপ, আপনি একটি ডাটাবেস থেকে লাইভ আপডেট পেতে একটি প্রবাহ ব্যবহার করতে পারেন।
ফ্লোগুলি কোরোটিনের উপরে তৈরি করা হয় এবং একাধিক মান প্রদান করতে পারে। একটি প্রবাহ ধারণাগতভাবে ডেটার একটি প্রবাহ যা অ্যাসিঙ্ক্রোনাসভাবে গণনা করা যেতে পারে। নির্গত মান একই ধরনের হতে হবে। উদাহরণস্বরূপ, একটি Flow<Int>
হল একটি প্রবাহ যা পূর্ণসংখ্যার মান নির্গত করে।
একটি প্রবাহ একটি Iterator
সাথে খুব সাদৃশ্যপূর্ণ যেটি মানগুলির একটি ক্রম তৈরি করে, তবে এটি অসিঙ্ক্রোনাসভাবে মানগুলি তৈরি এবং ব্যবহার করতে সাসপেন্ড ফাংশন ব্যবহার করে। এর মানে হল, উদাহরণস্বরূপ, প্রধান থ্রেড ব্লক না করেই প্রবাহ নিরাপদে পরবর্তী মান তৈরি করার জন্য একটি নেটওয়ার্ক অনুরোধ করতে পারে।
ডেটার প্রবাহের সাথে জড়িত তিনটি সত্তা রয়েছে:
- একজন প্রযোজক ডেটা তৈরি করে যা স্ট্রীমে যোগ করা হয়। কোরোটিনের জন্য ধন্যবাদ, ফ্লোও অ্যাসিঙ্ক্রোনাসভাবে ডেটা তৈরি করতে পারে।
- (ঐচ্ছিক) মধ্যস্থতাকারীরা স্ট্রীম বা প্রবাহে নির্গত প্রতিটি মান পরিবর্তন করতে পারে।
- একজন ভোক্তা স্রোত থেকে মান গ্রহণ করে।
অ্যান্ড্রয়েডে, একটি সংগ্রহস্থল সাধারণত UI ডেটার একটি প্রযোজক যার ব্যবহারকারী হিসাবে ব্যবহারকারী ইন্টারফেস (UI) থাকে যা শেষ পর্যন্ত ডেটা প্রদর্শন করে। অন্য সময়ে, UI স্তরটি ব্যবহারকারীর ইনপুট ইভেন্টগুলির একটি প্রযোজক এবং অনুক্রমের অন্যান্য স্তরগুলি সেগুলি গ্রহণ করে৷ প্রযোজক এবং ভোক্তার মধ্যে স্তরগুলি সাধারণত মধ্যস্থতাকারী হিসাবে কাজ করে যা নিম্নলিখিত স্তরের প্রয়োজনীয়তার সাথে সামঞ্জস্য করার জন্য ডেটার প্রবাহকে সংশোধন করে।
একটি প্রবাহ সৃষ্টি
ফ্লো তৈরি করতে, ফ্লো বিল্ডার API ব্যবহার করুন। flow
বিল্ডার ফাংশন একটি নতুন ফ্লো তৈরি করে যেখানে আপনি emit
ফাংশন ব্যবহার করে ডেটার প্রবাহে ম্যানুয়ালি নতুন মান নির্গত করতে পারেন।
নিম্নলিখিত উদাহরণে, একটি ডেটা উত্স একটি নির্দিষ্ট ব্যবধানে স্বয়ংক্রিয়ভাবে সর্বশেষ খবর নিয়ে আসে। যেহেতু একটি সাসপেন্ড ফাংশন একাধিক পরপর মান ফেরত দিতে পারে না, তাই ডেটা উৎস এই প্রয়োজনীয়তা পূরণের জন্য একটি প্রবাহ তৈরি করে এবং ফেরত দেয়। এই ক্ষেত্রে, ডেটা উত্স প্রযোজক হিসাবে কাজ করে।
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
বিল্ডার একটি coroutine মধ্যে মৃত্যুদন্ড কার্যকর করা হয়. এইভাবে, এটি একই অ্যাসিঙ্ক্রোনাস API থেকে উপকৃত হয়, তবে কিছু বিধিনিষেধ প্রযোজ্য:
- প্রবাহ অনুক্রমিক হয়। যেহেতু প্রযোজক একটি করোটিনে থাকে, যখন একটি সাসপেন্ড ফাংশন কল করে, প্রযোজক সাসপেন্ড ফাংশনটি ফিরে না আসা পর্যন্ত স্থগিত করে। উদাহরণে,
fetchLatestNews
নেটওয়ার্ক অনুরোধ সম্পূর্ণ না হওয়া পর্যন্ত প্রযোজক স্থগিত করে। তবেই ফলটি স্রোতে নির্গত হয়। -
flow
নির্মাতার সাথে, প্রযোজক একটি ভিন্নCoroutineContext
থেকে মানemit
করতে পারে না। অতএব, নতুন কোরোটিন তৈরি করে বা কোডের সাথেwithContext
ব্লক ব্যবহার করে ভিন্নCoroutineContext
এemit
কল করবেন না। আপনি এই ক্ষেত্রেcallbackFlow
এর মতো অন্যান্য ফ্লো নির্মাতা ব্যবহার করতে পারেন।
স্ট্রীম পরিবর্তন করা হচ্ছে
মধ্যস্থতাকারীরা মধ্যবর্তী অপারেটর ব্যবহার করতে পারে মানগুলি ব্যবহার না করেই ডেটার স্ট্রিম পরিবর্তন করতে। এই অপারেটরগুলি এমন ফাংশন যা, ডেটার একটি প্রবাহে প্রয়োগ করা হলে, ক্রিয়াকলাপের একটি চেইন সেট আপ করে যা ভবিষ্যতে মানগুলি গ্রাস না হওয়া পর্যন্ত কার্যকর করা হয় না। ফ্লো রেফারেন্স ডকুমেন্টেশনে মধ্যবর্তী অপারেটর সম্পর্কে আরও জানুন।
নীচের উদাহরণে, রিপোজিটরি স্তরটি View
প্রদর্শিত ডেটা রূপান্তর করতে মধ্যবর্তী অপারেটর map
ব্যবহার করে:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
মধ্যবর্তী অপারেটরগুলি একের পর এক প্রয়োগ করা যেতে পারে, একটি ক্রিয়াকলাপের শৃঙ্খল গঠন করে যা অলসভাবে সম্পাদিত হয় যখন একটি আইটেম প্রবাহে নির্গত হয়। মনে রাখবেন যে শুধুমাত্র একটি স্ট্রীমে একটি মধ্যবর্তী অপারেটর প্রয়োগ করা প্রবাহ সংগ্রহ শুরু করে না।
একটি প্রবাহ থেকে সংগ্রহ
মান শোনা শুরু করতে ফ্লো ট্রিগার করতে একটি টার্মিনাল অপারেটর ব্যবহার করুন। স্ট্রীমের সমস্ত মানগুলি নির্গত হওয়ার সাথে সাথে পেতে, collect
ব্যবহার করুন। আপনি অফিসিয়াল ফ্লো ডকুমেন্টেশনে টার্মিনাল অপারেটর সম্পর্কে আরও জানতে পারেন।
যেহেতু collect
একটি সাসপেন্ড ফাংশন, এটি একটি কোরোটিনের মধ্যে কার্যকর করা দরকার। এটি একটি প্যারামিটার হিসাবে একটি ল্যাম্বডা নেয় যা প্রতিটি নতুন মানের উপর কল করা হয়। যেহেতু এটি একটি সাসপেন্ড ফাংশন, কল যে কোরোটিন collect
তা প্রবাহ বন্ধ না হওয়া পর্যন্ত স্থগিত হতে পারে।
পূর্ববর্তী উদাহরণটি অব্যাহত রেখে, এখানে একটি ViewModel
একটি সহজ বাস্তবায়ন রয়েছে যা সংগ্রহস্থল স্তর থেকে ডেটা গ্রহণ করে:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
প্রবাহ সংগ্রহ করা প্রযোজককে ট্রিগার করে যা সর্বশেষ সংবাদ রিফ্রেশ করে এবং একটি নির্দিষ্ট ব্যবধানে নেটওয়ার্ক অনুরোধের ফলাফল নির্গত করে। যেহেতু প্রযোজক সবসময় while(true)
লুপের সাথে সক্রিয় থাকে, তাই ViewModel সাফ হয়ে গেলে এবং viewModelScope
বাতিল হয়ে গেলে ডেটার স্ট্রিম বন্ধ হয়ে যাবে।
নিম্নলিখিত কারণে প্রবাহ সংগ্রহ বন্ধ হতে পারে:
- যে কোরোটিন সংগ্রহ করে তা বাতিল করা হয়েছে, যেমনটি আগের উদাহরণে দেখানো হয়েছে। এটি অন্তর্নিহিত প্রযোজককেও থামিয়ে দেয়।
- প্রযোজক আইটেম নির্গত শেষ. এই ক্ষেত্রে, ডেটার স্ট্রীম বন্ধ হয়ে যায় এবং কোরোটিন যেটিকে
collect
বলা হয় সেটি এক্সিকিউশন পুনরায় শুরু করে।
অন্যান্য মধ্যবর্তী অপারেটরগুলির সাথে নির্দিষ্ট না হলে প্রবাহগুলি ঠান্ডা এবং অলস হয়৷ এর মানে হল যে প্রতিবার ফ্লোতে টার্মিনাল অপারেটরকে কল করা হলে প্রযোজক কোডটি কার্যকর করা হয়। পূর্ববর্তী উদাহরণে, একাধিক ফ্লো সংগ্রাহক থাকার ফলে ডেটা উত্সটি বিভিন্ন নির্দিষ্ট ব্যবধানে একাধিকবার সর্বশেষ সংবাদ পেতে পারে। যখন একাধিক ভোক্তা একই সময়ে সংগ্রহ করে তখন একটি প্রবাহ অপ্টিমাইজ করতে এবং শেয়ার করতে, shareIn
অপারেটর ব্যবহার করুন।
অপ্রত্যাশিত ব্যতিক্রম ধরা
প্রযোজকের বাস্তবায়ন তৃতীয় পক্ষের লাইব্রেরি থেকে আসতে পারে। এর মানে হল যে এটি অপ্রত্যাশিত ব্যতিক্রম নিক্ষেপ করতে পারে। এই ব্যতিক্রমগুলি পরিচালনা করতে, catch
ইন্টারমিডিয়েট অপারেটর ব্যবহার করুন।
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
পূর্ববর্তী উদাহরণে, যখন একটি ব্যতিক্রম ঘটে, তখন collect
ল্যাম্বডা বলা হয় না, কারণ একটি নতুন আইটেম গৃহীত হয়নি।
catch
প্রবাহে আইটেম emit
করতে পারে। উদাহরণ সংগ্রহস্থল স্তর পরিবর্তে ক্যাশে মান emit
করতে পারে:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
এই উদাহরণে, যখন একটি ব্যতিক্রম ঘটে, তখন collect
ল্যাম্বডা বলা হয়, কারণ ব্যতিক্রমের কারণে একটি নতুন আইটেম স্রোতে নির্গত হয়েছে।
একটি ভিন্ন Coroutine Context-এ কার্যকর করা হচ্ছে
ডিফল্টরূপে, একটি flow
নির্মাতার প্রযোজক এটি থেকে সংগ্রহ করা coroutine-এর CoroutineContext
এ সম্পাদন করে এবং পূর্বে উল্লেখ করা হয়েছে, এটি একটি ভিন্ন CoroutineContext
থেকে মান emit
করতে পারে না। এই আচরণ কিছু ক্ষেত্রে অবাঞ্ছিত হতে পারে। উদাহরণস্বরূপ, এই বিষয় জুড়ে ব্যবহৃত উদাহরণগুলিতে, সংগ্রহস্থল স্তরটি Dispatchers.Main
এ ক্রিয়াকলাপ সম্পাদন করা উচিত নয় যা viewModelScope
দ্বারা ব্যবহৃত হয়।
একটি প্রবাহের CoroutineContext
পরিবর্তন করতে, মধ্যবর্তী অপারেটর flowOn
ব্যবহার করুন। flowOn
আপস্ট্রিম প্রবাহের CoroutineContext
পরিবর্তন করে, যার অর্থ প্রযোজক এবং flowOn
এর আগে (বা উপরে) প্রয়োগ করা যেকোনো মধ্যবর্তী অপারেটর। ডাউনস্ট্রিম প্রবাহ (ভোক্তার সাথে flowOn
পরে মধ্যবর্তী অপারেটর) প্রভাবিত হয় না এবং প্রবাহ থেকে collect
করতে ব্যবহৃত CoroutineContext
এ কার্যকর হয়। যদি একাধিক flowOn
অপারেটর থাকে, প্রতিটি তার বর্তমান অবস্থান থেকে আপস্ট্রিম পরিবর্তন করে।
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
এই কোডের সাহায্যে, onEach
এবং map
অপারেটররা defaultDispatcher
ব্যবহার করে, যেখানে catch
অপারেটর এবং ভোক্তা Dispatchers.Main
কার্যকর করা হয়। viewModelScope
দ্বারা ব্যবহৃত হয়।
যেহেতু ডাটা সোর্স লেয়ারটি I/O কাজ করছে, আপনার উচিত এমন একটি ডিসপ্যাচার ব্যবহার করা যা I/O অপারেশনের জন্য অপ্টিমাইজ করা হয়েছে:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
জেটপ্যাক লাইব্রেরিতে প্রবাহিত হয়
ফ্লো অনেক জেটপ্যাক লাইব্রেরিতে সংহত করা হয়েছে এবং এটি অ্যান্ড্রয়েড তৃতীয় পক্ষের লাইব্রেরির মধ্যে জনপ্রিয়। ফ্লো লাইভ ডেটা আপডেট এবং ডেটার অফুরন্ত স্ট্রিমগুলির জন্য একটি দুর্দান্ত ফিট।
ডাটাবেসের পরিবর্তন সম্পর্কে অবহিত হওয়ার জন্য আপনি ফ্লো উইথ রুম ব্যবহার করতে পারেন। ডেটা অ্যাক্সেস অবজেক্ট (DAO) ব্যবহার করার সময়, লাইভ আপডেট পেতে একটি Flow
টাইপ ফেরত দিন।
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
প্রতিবার Example
সারণীতে পরিবর্তন হলে, ডাটাবেসের নতুন আইটেমগুলির সাথে একটি নতুন তালিকা নির্গত হয়।
কলব্যাক-ভিত্তিক APIগুলিকে প্রবাহে রূপান্তর করুন
callbackFlow
হল একটি ফ্লো নির্মাতা যা আপনাকে কলব্যাক-ভিত্তিক APIগুলিকে ফ্লোতে রূপান্তর করতে দেয়। উদাহরণ হিসেবে, Firebase Firestore Android APIs কলব্যাক ব্যবহার করে।
এই APIগুলিকে প্রবাহে রূপান্তর করতে এবং Firestore ডাটাবেস আপডেট শুনতে, আপনি নিম্নলিখিত কোডটি ব্যবহার করতে পারেন:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
flow
বিল্ডারের বিপরীতে, callbackFlow
send
ফাংশন সহ একটি ভিন্ন CoroutineContext
থেকে বা trySend
ফাংশন সহ একটি coroutine এর বাইরে মান নির্গত করার অনুমতি দেয়।
অভ্যন্তরীণভাবে, callbackFlow
একটি চ্যানেল ব্যবহার করে, যা ধারণাগতভাবে একটি ব্লকিং সারির মতো। একটি চ্যানেল একটি ক্ষমতা সহ কনফিগার করা হয়, বাফার করা যেতে পারে এমন উপাদানগুলির সর্বাধিক সংখ্যা৷ callbackFlow
তৈরি করা চ্যানেলটিতে 64টি উপাদানের ডিফল্ট ক্ষমতা রয়েছে। আপনি যখন একটি পূর্ণ চ্যানেলে একটি নতুন উপাদান যুক্ত করার চেষ্টা করেন, তখন নতুন উপাদানের জন্য স্থান না পাওয়া পর্যন্ত send
প্রযোজককে স্থগিত করে, যেখানে trySend
চ্যানেলে উপাদান যোগ করে না এবং অবিলম্বে false
ফেরত দেয়।
trySend
অবিলম্বে চ্যানেলে নির্দিষ্ট উপাদান যোগ করে, শুধুমাত্র যদি এটি তার ক্ষমতা সীমাবদ্ধতা লঙ্ঘন না করে, এবং তারপর সফল ফলাফল প্রদান করে।
অতিরিক্ত প্রবাহ সম্পদ
- কোটলিন টেস্টিং অ্যান্ড্রয়েডে প্রবাহিত হয়
-
StateFlow
এবংSharedFlow
- কোটলিন কোরোটিন এবং প্রবাহের জন্য অতিরিক্ত সংস্থান
coroutines-এ, একটি ফ্লো হল এমন একটি ধরন যা ক্রমানুসারে একাধিক মান নির্গত করতে পারে, শুধুমাত্র একটি একক মান ফিরিয়ে দেয় এমন ফাংশনগুলিকে স্থগিত করার বিপরীতে। উদাহরণস্বরূপ, আপনি একটি ডাটাবেস থেকে লাইভ আপডেট পেতে একটি প্রবাহ ব্যবহার করতে পারেন।
ফ্লোগুলি কোরোটিনের উপরে তৈরি করা হয় এবং একাধিক মান প্রদান করতে পারে। একটি প্রবাহ ধারণাগতভাবে ডেটার একটি প্রবাহ যা অ্যাসিঙ্ক্রোনাসভাবে গণনা করা যেতে পারে। নির্গত মান একই ধরনের হতে হবে। উদাহরণস্বরূপ, একটি Flow<Int>
হল একটি প্রবাহ যা পূর্ণসংখ্যার মান নির্গত করে।
একটি প্রবাহ একটি Iterator
সাথে খুব সাদৃশ্যপূর্ণ যেটি মানগুলির একটি ক্রম তৈরি করে, তবে এটি অসিঙ্ক্রোনাসভাবে মানগুলি তৈরি এবং ব্যবহার করতে সাসপেন্ড ফাংশন ব্যবহার করে। এর মানে হল, উদাহরণস্বরূপ, প্রধান থ্রেড ব্লক না করে প্রবাহ নিরাপদে পরবর্তী মান তৈরি করার জন্য একটি নেটওয়ার্ক অনুরোধ করতে পারে।
ডেটার প্রবাহের সাথে জড়িত তিনটি সত্তা রয়েছে:
- একজন প্রযোজক ডেটা তৈরি করে যা স্ট্রীমে যোগ করা হয়। কোরোটিনের জন্য ধন্যবাদ, ফ্লোও অ্যাসিঙ্ক্রোনাসভাবে ডেটা তৈরি করতে পারে।
- (ঐচ্ছিক) মধ্যস্থতাকারীরা স্ট্রীম বা প্রবাহে নির্গত প্রতিটি মান পরিবর্তন করতে পারে।
- একজন ভোক্তা স্রোত থেকে মান গ্রহণ করে।
অ্যান্ড্রয়েডে, একটি সংগ্রহস্থল সাধারণত UI ডেটার একটি প্রযোজক যার ব্যবহারকারী হিসাবে ব্যবহারকারী ইন্টারফেস (UI) থাকে যা শেষ পর্যন্ত ডেটা প্রদর্শন করে। অন্য সময়ে, UI স্তরটি ব্যবহারকারীর ইনপুট ইভেন্টগুলির একটি প্রযোজক এবং অনুক্রমের অন্যান্য স্তরগুলি সেগুলি গ্রহণ করে৷ প্রযোজক এবং ভোক্তার মধ্যে স্তরগুলি সাধারণত মধ্যস্থতাকারী হিসাবে কাজ করে যা নিম্নলিখিত স্তরের প্রয়োজনীয়তার সাথে সামঞ্জস্য করার জন্য ডেটার প্রবাহকে সংশোধন করে।
একটি প্রবাহ সৃষ্টি
ফ্লো তৈরি করতে, ফ্লো বিল্ডার API ব্যবহার করুন। flow
বিল্ডার ফাংশন একটি নতুন ফ্লো তৈরি করে যেখানে আপনি emit
ফাংশন ব্যবহার করে ডেটার প্রবাহে ম্যানুয়ালি নতুন মান নির্গত করতে পারেন।
নিম্নলিখিত উদাহরণে, একটি ডেটা উত্স একটি নির্দিষ্ট ব্যবধানে স্বয়ংক্রিয়ভাবে সর্বশেষ খবর নিয়ে আসে। যেহেতু একটি সাসপেন্ড ফাংশন একাধিক পরপর মান ফেরত দিতে পারে না, তাই ডেটা উৎস এই প্রয়োজনীয়তা পূরণের জন্য একটি প্রবাহ তৈরি করে এবং ফেরত দেয়। এই ক্ষেত্রে, ডেটা উত্স প্রযোজক হিসাবে কাজ করে।
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
বিল্ডার একটি coroutine মধ্যে মৃত্যুদন্ড কার্যকর করা হয়. এইভাবে, এটি একই অ্যাসিঙ্ক্রোনাস API থেকে উপকৃত হয়, তবে কিছু বিধিনিষেধ প্রযোজ্য:
- প্রবাহ অনুক্রমিক হয়। যেহেতু প্রযোজক একটি করোটিনে থাকে, যখন একটি সাসপেন্ড ফাংশন কল করে, প্রযোজক সাসপেন্ড ফাংশনটি ফিরে না আসা পর্যন্ত স্থগিত করে। উদাহরণে,
fetchLatestNews
নেটওয়ার্ক অনুরোধ সম্পূর্ণ না হওয়া পর্যন্ত প্রযোজক স্থগিত করে। তবেই ফলটি স্রোতে নির্গত হয়। -
flow
নির্মাতার সাথে, প্রযোজক একটি ভিন্নCoroutineContext
থেকে মানemit
করতে পারে না। অতএব, নতুন কোরোটিন তৈরি করে বা কোডের সাথেwithContext
ব্লক ব্যবহার করে ভিন্নCoroutineContext
এemit
কল করবেন না। আপনি এই ক্ষেত্রেcallbackFlow
এর মতো অন্যান্য ফ্লো নির্মাতা ব্যবহার করতে পারেন।
স্ট্রীম পরিবর্তন করা হচ্ছে
মধ্যস্থতাকারীরা মধ্যবর্তী অপারেটর ব্যবহার করতে পারে মানগুলি ব্যবহার না করেই ডেটার স্ট্রিম পরিবর্তন করতে। এই অপারেটরগুলি এমন ফাংশন যা, ডেটার একটি প্রবাহে প্রয়োগ করা হলে, ক্রিয়াকলাপের একটি চেইন সেট আপ করে যা ভবিষ্যতে মানগুলি গ্রাস না হওয়া পর্যন্ত কার্যকর করা হয় না। ফ্লো রেফারেন্স ডকুমেন্টেশনে মধ্যবর্তী অপারেটর সম্পর্কে আরও জানুন।
নীচের উদাহরণে, রিপোজিটরি স্তরটি View
প্রদর্শিত ডেটা রূপান্তর করতে মধ্যবর্তী অপারেটর map
ব্যবহার করে:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
মধ্যবর্তী অপারেটরগুলি একের পর এক প্রয়োগ করা যেতে পারে, একটি ক্রিয়াকলাপের শৃঙ্খল গঠন করে যা অলসভাবে সম্পাদিত হয় যখন একটি আইটেম প্রবাহে নির্গত হয়। মনে রাখবেন যে শুধুমাত্র একটি স্ট্রীমে একটি মধ্যবর্তী অপারেটর প্রয়োগ করা প্রবাহ সংগ্রহ শুরু করে না।
একটি প্রবাহ থেকে সংগ্রহ
মান শুনতে শুরু করতে ফ্লো ট্রিগার করতে একটি টার্মিনাল অপারেটর ব্যবহার করুন। স্ট্রীমের সমস্ত মানগুলি নির্গত হওয়ার সাথে সাথে পেতে, collect
ব্যবহার করুন। আপনি অফিসিয়াল ফ্লো ডকুমেন্টেশনে টার্মিনাল অপারেটর সম্পর্কে আরও জানতে পারেন।
যেহেতু collect
একটি সাসপেন্ড ফাংশন, এটি একটি কোরোটিনের মধ্যে কার্যকর করা দরকার। এটি একটি প্যারামিটার হিসাবে একটি ল্যাম্বডা নেয় যা প্রতিটি নতুন মানের উপর কল করা হয়। যেহেতু এটি একটি সাসপেন্ড ফাংশন, কল যে কোরোটিন collect
তা প্রবাহ বন্ধ না হওয়া পর্যন্ত স্থগিত হতে পারে।
পূর্ববর্তী উদাহরণটি অব্যাহত রেখে, এখানে একটি ViewModel
একটি সহজ বাস্তবায়ন রয়েছে যা সংগ্রহস্থল স্তর থেকে ডেটা গ্রহণ করে:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
প্রবাহ সংগ্রহ করা প্রযোজককে ট্রিগার করে যা সর্বশেষ সংবাদ রিফ্রেশ করে এবং একটি নির্দিষ্ট ব্যবধানে নেটওয়ার্ক অনুরোধের ফলাফল নির্গত করে। যেহেতু প্রযোজক সবসময় while(true)
লুপের সাথে সক্রিয় থাকে, তাই ViewModel সাফ হয়ে গেলে এবং viewModelScope
বাতিল হয়ে গেলে ডেটার স্ট্রিম বন্ধ হয়ে যাবে।
নিম্নলিখিত কারণে প্রবাহ সংগ্রহ বন্ধ হতে পারে:
- যে কোরোটিন সংগ্রহ করে তা বাতিল করা হয়েছে, যেমনটি আগের উদাহরণে দেখানো হয়েছে। এটি অন্তর্নিহিত প্রযোজককেও থামিয়ে দেয়।
- প্রযোজক আইটেম নির্গত শেষ. এই ক্ষেত্রে, ডেটার স্ট্রীম বন্ধ হয়ে যায় এবং কোরোটিন যেটিকে
collect
বলা হয় সেটি এক্সিকিউশন পুনরায় শুরু করে।
অন্যান্য মধ্যবর্তী অপারেটরগুলির সাথে নির্দিষ্ট না হলে প্রবাহগুলি ঠান্ডা এবং অলস হয়৷ এর মানে হল যে প্রতিবার ফ্লোতে টার্মিনাল অপারেটরকে কল করা হলে প্রযোজক কোডটি কার্যকর করা হয়। পূর্ববর্তী উদাহরণে, একাধিক ফ্লো সংগ্রাহক থাকার ফলে ডেটা উত্সটি বিভিন্ন নির্দিষ্ট ব্যবধানে একাধিকবার সর্বশেষ সংবাদ পেতে পারে। যখন একাধিক ভোক্তা একই সময়ে সংগ্রহ করে তখন একটি প্রবাহ অপ্টিমাইজ করতে এবং শেয়ার করতে, shareIn
অপারেটর ব্যবহার করুন।
অপ্রত্যাশিত ব্যতিক্রম ধরা
প্রযোজকের বাস্তবায়ন তৃতীয় পক্ষের লাইব্রেরি থেকে আসতে পারে। এর মানে হল যে এটি অপ্রত্যাশিত ব্যতিক্রম নিক্ষেপ করতে পারে। এই ব্যতিক্রমগুলি পরিচালনা করতে, catch
ইন্টারমিডিয়েট অপারেটর ব্যবহার করুন।
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
পূর্ববর্তী উদাহরণে, যখন একটি ব্যতিক্রম ঘটে, তখন collect
ল্যাম্বডা বলা হয় না, কারণ একটি নতুন আইটেম গৃহীত হয়নি।
catch
প্রবাহে আইটেম emit
করতে পারে। উদাহরণ সংগ্রহস্থল স্তর পরিবর্তে ক্যাশে মান emit
করতে পারে:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
এই উদাহরণে, যখন একটি ব্যতিক্রম ঘটে, তখন collect
ল্যাম্বডা বলা হয়, কারণ ব্যতিক্রমের কারণে একটি নতুন আইটেম স্রোতে নির্গত হয়েছে।
একটি ভিন্ন Coroutine Context-এ কার্যকর করা হচ্ছে
ডিফল্টরূপে, একটি flow
নির্মাতার প্রযোজক এটি থেকে সংগ্রহ করা coroutine-এর CoroutineContext
এ সম্পাদন করে এবং পূর্বে উল্লেখ করা হয়েছে, এটি একটি ভিন্ন CoroutineContext
থেকে মান emit
করতে পারে না। এই আচরণ কিছু ক্ষেত্রে অবাঞ্ছিত হতে পারে। উদাহরণস্বরূপ, এই বিষয় জুড়ে ব্যবহৃত উদাহরণগুলিতে, সংগ্রহস্থল স্তরটি Dispatchers.Main
এ ক্রিয়াকলাপ সম্পাদন করা উচিত নয় যা viewModelScope
দ্বারা ব্যবহৃত হয়।
একটি প্রবাহের CoroutineContext
পরিবর্তন করতে, মধ্যবর্তী অপারেটর flowOn
ব্যবহার করুন। flowOn
আপস্ট্রিম প্রবাহের CoroutineContext
পরিবর্তন করে, যার অর্থ প্রযোজক এবং flowOn
এর আগে (বা উপরে) প্রয়োগ করা যেকোনো মধ্যবর্তী অপারেটর। ডাউনস্ট্রিম প্রবাহ (ভোক্তার সাথে flowOn
পরে মধ্যবর্তী অপারেটর) প্রভাবিত হয় না এবং প্রবাহ থেকে collect
করতে ব্যবহৃত CoroutineContext
এ কার্যকর হয়। যদি একাধিক flowOn
অপারেটর থাকে, প্রতিটি তার বর্তমান অবস্থান থেকে আপস্ট্রিম পরিবর্তন করে।
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
এই কোডের সাহায্যে, onEach
এবং map
অপারেটররা defaultDispatcher
ব্যবহার করে, যেখানে catch
অপারেটর এবং ভোক্তা Dispatchers.Main
কার্যকর করা হয়। viewModelScope
দ্বারা ব্যবহৃত হয়।
যেহেতু ডাটা সোর্স লেয়ারটি I/O কাজ করছে, আপনার উচিত এমন একটি ডিসপ্যাচার ব্যবহার করা যা I/O অপারেশনের জন্য অপ্টিমাইজ করা হয়েছে:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
জেটপ্যাক লাইব্রেরিতে প্রবাহিত হয়
ফ্লো অনেক জেটপ্যাক লাইব্রেরিতে সংহত করা হয়েছে এবং এটি অ্যান্ড্রয়েড তৃতীয় পক্ষের লাইব্রেরির মধ্যে জনপ্রিয়। ফ্লো লাইভ ডেটা আপডেট এবং ডেটার অফুরন্ত স্ট্রিমগুলির জন্য একটি দুর্দান্ত ফিট।
ডাটাবেসের পরিবর্তন সম্পর্কে অবহিত হওয়ার জন্য আপনি ফ্লো উইথ রুম ব্যবহার করতে পারেন। ডেটা অ্যাক্সেস অবজেক্ট (DAO) ব্যবহার করার সময়, লাইভ আপডেট পেতে একটি Flow
টাইপ ফেরত দিন।
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Example
সারণীতে প্রতিবার পরিবর্তন হলে, ডাটাবেসের নতুন আইটেমগুলির সাথে একটি নতুন তালিকা নির্গত হয়।
কলব্যাক-ভিত্তিক APIগুলিকে প্রবাহে রূপান্তর করুন
callbackFlow
হল একটি ফ্লো নির্মাতা যা আপনাকে কলব্যাক-ভিত্তিক APIগুলিকে ফ্লোতে রূপান্তর করতে দেয়। উদাহরণ হিসেবে, Firebase Firestore Android APIs কলব্যাক ব্যবহার করে।
এই APIগুলিকে প্রবাহে রূপান্তর করতে এবং Firestore ডাটাবেস আপডেট শুনতে, আপনি নিম্নলিখিত কোডটি ব্যবহার করতে পারেন:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
flow
বিল্ডারের বিপরীতে, callbackFlow
send
ফাংশন সহ একটি ভিন্ন CoroutineContext
থেকে বা trySend
ফাংশন সহ একটি coroutine এর বাইরে মান নির্গত করার অনুমতি দেয়।
অভ্যন্তরীণভাবে, callbackFlow
একটি চ্যানেল ব্যবহার করে, যা ধারণাগতভাবে একটি ব্লকিং সারির মতো। একটি চ্যানেল একটি ক্ষমতা সহ কনফিগার করা হয়, বাফার করা যেতে পারে এমন উপাদানগুলির সর্বাধিক সংখ্যা৷ callbackFlow
তৈরি করা চ্যানেলটিতে 64টি উপাদানের ডিফল্ট ক্ষমতা রয়েছে। আপনি যখন একটি পূর্ণ চ্যানেলে একটি নতুন উপাদান যুক্ত করার চেষ্টা করেন, তখন নতুন উপাদানের জন্য স্থান না পাওয়া পর্যন্ত send
প্রযোজককে স্থগিত করে, যেখানে trySend
চ্যানেলে উপাদান যোগ করে না এবং অবিলম্বে false
ফেরত দেয়।
trySend
অবিলম্বে চ্যানেলে নির্দিষ্ট উপাদান যোগ করে, শুধুমাত্র যদি এটি তার ক্ষমতা সীমাবদ্ধতা লঙ্ঘন না করে, এবং তারপর সফল ফলাফল প্রদান করে।
অতিরিক্ত প্রবাহ সম্পদ
- কোটলিন টেস্টিং অ্যান্ড্রয়েডে প্রবাহিত হয়
-
StateFlow
এবংSharedFlow
- কোটলিন কোরোটিন এবং প্রবাহের জন্য অতিরিক্ত সংস্থান