自定义AsyncStream实现
AsyncStream是Swift提供的一个强大工具,用于创建自定义的AsyncSequence实现。相比手动实现AsyncSequence协议,它通过闭包和continuation简化了异步数据流的生成过程。配合for await,AsyncStream可以轻松处理动态数据,如实时事件或外部输入。本节将介绍AsyncStream的基本用法、实现原理,并通过一个聊天应用的案例展示其在实际开发中的应用,帮助你掌握自定义异步序列的技巧。
AsyncStream基础
AsyncStream是一个结构体,符合AsyncSequence协议,允许开发者通过闭包动态生成元素。它有两种主要构造方式:
- 基本构造:提供元素类型和生成逻辑。
- 缓冲构造:控制元素生成与消费的缓冲策略。
基本用法
最简单的AsyncStream构造:
let stream = AsyncStream<Int> { continuation in
Task {
for i in 1...5 {
try? await Task.sleep(nanoseconds: 500_000_000) // 0.5秒间隔
continuation.yield(i)
}
continuation.finish()
}
}
Task {
for await number in stream {
print("收到:\(number)")
}
print("流结束")
}
输出(每0.5秒一行):
收到:1
收到:2
收到:3
收到:4
收到:5
流结束
- continuation.yield(_😃:推送一个元素到流中。
- continuation.finish():结束流,类似迭代器返回
nil。
带错误支持
AsyncStream支持抛出错误:
let errorStream = AsyncStream<String> { continuation in
Task {
continuation.yield("消息1")
try? await Task.sleep(nanoseconds: 1_000_000_000)
continuation.yield("消息2")
throw StreamError.failed
}
}
enum StreamError: Error {
case failed
}
Task {
do {
for try await message in errorStream {
print(message)
}
} catch {
print("流错误:\(error)")
}
}
输出:
消息1
消息2
流错误:failed
- 抛出错误后,流自动终止。
实现原理
AsyncStream内部基于continuation机制:
- Continuation对象:管理流的生命周期,负责推送元素、抛出错误或结束。
- 任务协作:生成逻辑运行于独立
Task,与消费逻辑解耦。 - 缓冲控制:默认无缓冲(unbuffered),新元素等待消费后生成;可通过
bufferingPolicy调整。
例如,带缓冲的构造:
let bufferedStream = AsyncStream(Int.self, bufferingPolicy: .bufferingNewest(2)) { continuation in
Task {
for i in 1...10 {
continuation.yield(i)
try? await Task.sleep(nanoseconds: 200_000_000)
}
continuation.finish()
}
}
.bufferingNewest(2):保留最新2个元素,丢弃旧的。- 适合生产快于消费的场景。
实战案例:聊天消息流
假设我们要实现一个聊天应用,模拟从服务器接收实时消息。以下是完整实现:
1. 定义消息模型
struct ChatMessage: Codable {
let id: String
let text: String
let timestamp: Date
}
2. 创建消息流
模拟WebSocket推送:
func chatMessageStream() -> AsyncStream<ChatMessage> {
AsyncStream { continuation in
Task {
// 模拟服务器推送
let messages = [
ChatMessage(id: "1", text: "你好!", timestamp: Date()),
ChatMessage(id: "2", text: "在吗?", timestamp: Date(timeIntervalSinceNow: 1)),
ChatMessage(id: "3", text: "收到!", timestamp: Date(timeIntervalSinceNow: 2))
]
for message in messages {
try? await Task.sleep(nanoseconds: 1_000_000_000) // 模拟1秒延迟
continuation.yield(message)
}
// 模拟网络中断
if Bool.random() {
continuation.finish()
print("服务器断开")
} else {
try? await Task.sleep(nanoseconds: 2_000_000_000)
continuation.yield(ChatMessage(id: "4", text: "再见", timestamp: Date()))
continuation.finish()
}
}
// 可选:监听取消
continuation.onTermination = { @Sendable reason in
print("流终止:\(reason)") // .cancelled 或 .finished
}
}
}
yield推送消息。onTermination处理流结束或取消。
3. 集成到UI
在视图控制器中消费流:
class ChatViewController: UIViewController {
@IBOutlet weak var tableView: UITableView!
private var messages: [ChatMessage] = []
override func viewDidLoad() {
super.viewDidLoad()
startListening()
}
private func startListening() {
Task { @MainActor in
for await message in chatMessageStream() {
messages.append(message)
tableView.insertRows(at: [IndexPath(row: messages.count - 1, section: 0)], with: .automatic)
}
print("消息流结束")
}
}
}
// UITableViewDataSource
extension ChatViewController: UITableViewDataSource {
func tableView(_ tableView: UITableView, numberOfRowsInSection section: Int) -> Int {
messages.count
}
func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) -> UITableViewCell {
let cell = tableView.dequeueReusableCell(withIdentifier: "ChatCell", for: indexPath)
let message = messages[indexPath.row]
cell.textLabel?.text = message.text
cell.detailTextLabel?.text = "\(message.timestamp)"
return cell
}
}
@MainActor确保UI更新在主线程。- 实时插入新消息到表格。
运行结果
- 每秒收到一条消息,表格动态更新。
- 随机模拟断开,打印“服务器断开”并结束。
高级用法
取消支持
检查Task.isCancelled处理外部取消:if Task.isCancelled { continuation.finish() return }外部输入
从外部事件(如通知)推送数据:AsyncStream { continuation in NotificationCenter.default.addObserver(forName: .newMessage, object: nil, queue: nil) { note in if let message = note.userInfo?["message"] as? ChatMessage { continuation.yield(message) } } }缓冲策略
使用.bufferingOldest(10)保留10个最早元素,适合日志场景。
小结
AsyncStream通过闭包和continuation提供了一种简便的方式来创建自定义异步序列。本节通过聊天消息流的案例,展示了其构建和应用过程,强调了实时数据处理的优雅性。掌握AsyncStream,你将能灵活应对动态数据需求。本章回顾了异步序列的三大支柱,下一章将进入任务管理与并发控制的领域,进一步提升你的并发编程能力。
内容说明
- 结构:从基础到原理,再到实战案例和高级用法,最后总结。
- 代码:包含简单示例和完整聊天案例,突出实用性。
- 语气:实践性且深入,适合技术书籍的收尾章节。
- 衔接:承接前两节(
AsyncSequence和for await),预告后续(任务管理)。
