协程流(Flow)
1. 什么是 Flow?
Flow 是 Kotlin 协程库中用于处理异步数据流的组件,类似于响应式编程中的 Observable(RxJava)或 LiveData(Android)。它提供了一种声明式的方式处理按顺序发射的多个值,同时支持协程的挂起和取消机制。
核心特点:
- 冷流(Cold Stream):Flow 是冷流,只有在收集(
collect)时才会执行发射逻辑。 - 可取消性:与协程深度集成,支持结构化并发。
- 背压(Backpressure)处理:通过挂起机制天然支持背压。
2. Flow 的基本使用
创建 Flow
fun simpleFlow(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 模拟异步操作
emit(i) // 发射值
}
}
收集 Flow
runBlocking {
simpleFlow().collect { value ->
println(value) // 输出 1, 2, 3
}
}
3. Flow 操作符
Flow 提供丰富的操作符,分为两类:
- 中间操作符:如
map、filter,返回一个新的 Flow。 - 末端操作符:如
collect、toList,触发流的执行。
常用操作符示例
flowOf(1, 2, 3)
.filter { it % 2 == 0 }
.map { it * 2 }
.collect { println(it) } // 输出 4
4. 异常处理
Flow 提供多种方式处理异常:
try/catch块包裹collect。catch中间操作符:flow { emit(1); throw RuntimeException() } .catch { e -> println("Caught: $e") } .collect { println(it) }
5. 上下文切换
默认情况下,Flow 的发射和收集在同一个协程上下文中运行。通过 flowOn 切换上游上下文:
flow { emit(1) }
.flowOn(Dispatchers.IO) // 发射在 IO 线程
.collect { println(it) } // 收集在调用线程
6. StateFlow 与 SharedFlow
- StateFlow:类似
LiveData,保留最新状态值,适合 UI 状态管理。val stateFlow = MutableStateFlow(0) stateFlow.collect { println(it) } - SharedFlow:支持多订阅者的热流,可配置重放(replay)策略。
7. 实战场景
场景 1:网络请求分页
fun fetchPages(pageSize: Int): Flow<List<Item>> = flow {
var page = 0
while (true) {
val items = api.fetchPage(page, pageSize)
emit(items)
if (items.size < pageSize) break
page++
}
}
场景 2:数据库变化监听
fun observeUserUpdates(userId: String): Flow<User> =
database.userDao().observeUserById(userId)
总结
Flow 是 Kotlin 协程生态中处理异步数据流的强大工具,通过组合操作符和协程特性,可以简洁地实现复杂的数据流逻辑。结合 StateFlow 和 SharedFlow,还能轻松应对 UI 状态管理和事件分发场景。
