协程通道(Channel)
1. 什么是协程通道?
协程通道(Channel)是 Kotlin 协程库提供的一种用于协程间通信的机制,类似于阻塞队列(BlockingQueue),但完全基于非阻塞的挂起操作实现。它允许在不同协程之间安全地发送和接收数据,是协程并发编程的核心组件之一。
核心特点:
- 非阻塞:通过
send()和receive()挂起函数实现异步通信。 - 多生产者-多消费者:支持多个协程同时发送或接收数据。
- 类型安全:通道是泛型的(如
Channel<Int>),确保数据类型一致性。
2. 通道的基本使用
创建通道
val channel = Channel<String>() // 创建一个 String 类型的通道
发送与接收数据
// 生产者协程
launch {
channel.send("Hello")
channel.send("Kotlin")
channel.close() // 关闭通道
}
// 消费者协程
launch {
for (msg in channel) { // 迭代接收数据,直到通道关闭
println(msg)
}
}
通道的关闭
- 调用
close()后,通道不再接受新数据,但会处理已发送的数据。 - 消费者可以通过
channel.isClosedForReceive检查通道状态。
3. 通道的类型
Kotlin 提供了多种通道实现,适用于不同场景:
| 类型 | 描述 |
|---|---|
| Rendezvous | 默认类型,发送和接收必须同时就绪(无缓冲区)。 |
| Buffered | 带固定容量缓冲区的通道(如 Channel(10))。 |
| Conflated | 只保留最新值,新数据会覆盖未处理的旧数据(Channel.CONFLATED)。 |
| Unlimited | 无限缓冲区的通道(Channel.UNLIMITED),可能导致内存问题。 |
示例:缓冲通道
val bufferedChannel = Channel<Int>(capacity = 5) // 容量为5的缓冲通道
4. 通道的高级操作
4.1 produce 构建器
简化生产者协程的创建:
val numberChannel = produce {
for (x in 1..5) send(x * x)
}
// 消费者
numberChannel.consumeEach { println(it) }
4.2 select 表达式
监听多个通道的发送或接收操作:
select<Unit> {
channel1.onReceive { println("From channel1: $it") }
channel2.onReceive { println("From channel2: $it") }
}
4.3 异常处理
通过 try-catch 捕获通道操作中的异常:
try {
channel.receive()
} catch (e: ClosedReceiveChannelException) {
println("Channel closed!")
}
5. 实战场景
场景1:任务分发
val taskChannel = Channel<Task>()
// 生产者
launch {
tasks.forEach { taskChannel.send(it) }
taskChannel.close()
}
// 消费者(多个工作协程)
repeat(4) {
launch {
for (task in taskChannel) processTask(task)
}
}
场景2:事件总线
val eventChannel = Channel<Event>(Channel.UNLIMITED)
// 订阅事件
launch {
eventChannel.consumeEach { event ->
handleEvent(event)
}
}
// 发布事件
fun postEvent(event: Event) {
launch { eventChannel.send(event) }
}
6. 注意事项
- 资源释放:确保在不再需要时关闭通道,避免内存泄漏。
- 背压问题:缓冲通道需合理设置容量,防止生产者速度远超消费者。
- 协程取消:通道操作会响应协程的取消状态。
