Tailwind CSSTailwind CSS
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
Home
  • Tailwind CSS 书籍目录
  • Vue 3 开发实战指南
  • React 和 Next.js 学习
  • TypeScript
  • React开发框架书籍大纲
  • Shadcn学习大纲
  • Swift 编程语言:从入门到进阶
  • SwiftUI 学习指南
  • 函数式编程大纲
  • Swift 异步编程语言
  • Swift 协议化编程
  • SwiftUI MVVM 开发模式
  • SwiftUI 图表开发书籍
  • SwiftData
  • ArkTS编程语言:从入门到精通
  • 仓颉编程语言:从入门到精通
  • 鸿蒙手机客户端开发实战
  • WPF书籍
  • C#开发书籍
learn
  • Java编程语言
  • Kotlin 编程入门与实战
  • /python/outline.html
  • AI Agent
  • MCP (Model Context Protocol) 应用指南
  • 深度学习
  • 深度学习
  • 强化学习: 理论与实践
  • 扩散模型书籍
  • Agentic AI for Everyone
langchain
  • 自定义AsyncStream实现

自定义AsyncStream实现

AsyncStream是Swift提供的一个强大工具,用于创建自定义的AsyncSequence实现。相比手动实现AsyncSequence协议,它通过闭包和continuation简化了异步数据流的生成过程。配合for await,AsyncStream可以轻松处理动态数据,如实时事件或外部输入。本节将介绍AsyncStream的基本用法、实现原理,并通过一个聊天应用的案例展示其在实际开发中的应用,帮助你掌握自定义异步序列的技巧。

AsyncStream基础

AsyncStream是一个结构体,符合AsyncSequence协议,允许开发者通过闭包动态生成元素。它有两种主要构造方式:

  • 基本构造:提供元素类型和生成逻辑。
  • 缓冲构造:控制元素生成与消费的缓冲策略。

基本用法

最简单的AsyncStream构造:

let stream = AsyncStream<Int> { continuation in
    Task {
        for i in 1...5 {
            try? await Task.sleep(nanoseconds: 500_000_000) // 0.5秒间隔
            continuation.yield(i)
        }
        continuation.finish()
    }
}

Task {
    for await number in stream {
        print("收到:\(number)")
    }
    print("流结束")
}

输出(每0.5秒一行):

收到:1
收到:2
收到:3
收到:4
收到:5
流结束
  • continuation.yield(_😃:推送一个元素到流中。
  • continuation.finish():结束流,类似迭代器返回nil。

带错误支持

AsyncStream支持抛出错误:

let errorStream = AsyncStream<String> { continuation in
    Task {
        continuation.yield("消息1")
        try? await Task.sleep(nanoseconds: 1_000_000_000)
        continuation.yield("消息2")
        throw StreamError.failed
    }
}

enum StreamError: Error {
    case failed
}

Task {
    do {
        for try await message in errorStream {
            print(message)
        }
    } catch {
        print("流错误:\(error)")
    }
}

输出:

消息1
消息2
流错误:failed
  • 抛出错误后,流自动终止。

实现原理

AsyncStream内部基于continuation机制:

  • Continuation对象:管理流的生命周期,负责推送元素、抛出错误或结束。
  • 任务协作:生成逻辑运行于独立Task,与消费逻辑解耦。
  • 缓冲控制:默认无缓冲(unbuffered),新元素等待消费后生成;可通过bufferingPolicy调整。

例如,带缓冲的构造:

let bufferedStream = AsyncStream(Int.self, bufferingPolicy: .bufferingNewest(2)) { continuation in
    Task {
        for i in 1...10 {
            continuation.yield(i)
            try? await Task.sleep(nanoseconds: 200_000_000)
        }
        continuation.finish()
    }
}
  • .bufferingNewest(2):保留最新2个元素,丢弃旧的。
  • 适合生产快于消费的场景。

实战案例:聊天消息流

假设我们要实现一个聊天应用,模拟从服务器接收实时消息。以下是完整实现:

1. 定义消息模型

struct ChatMessage: Codable {
    let id: String
    let text: String
    let timestamp: Date
}

2. 创建消息流

模拟WebSocket推送:

func chatMessageStream() -> AsyncStream<ChatMessage> {
    AsyncStream { continuation in
        Task {
            // 模拟服务器推送
            let messages = [
                ChatMessage(id: "1", text: "你好!", timestamp: Date()),
                ChatMessage(id: "2", text: "在吗?", timestamp: Date(timeIntervalSinceNow: 1)),
                ChatMessage(id: "3", text: "收到!", timestamp: Date(timeIntervalSinceNow: 2))
            ]
            
            for message in messages {
                try? await Task.sleep(nanoseconds: 1_000_000_000) // 模拟1秒延迟
                continuation.yield(message)
            }
            
            // 模拟网络中断
            if Bool.random() {
                continuation.finish()
                print("服务器断开")
            } else {
                try? await Task.sleep(nanoseconds: 2_000_000_000)
                continuation.yield(ChatMessage(id: "4", text: "再见", timestamp: Date()))
                continuation.finish()
            }
        }
        
        // 可选:监听取消
        continuation.onTermination = { @Sendable reason in
            print("流终止:\(reason)") // .cancelled 或 .finished
        }
    }
}
  • yield推送消息。
  • onTermination处理流结束或取消。

3. 集成到UI

在视图控制器中消费流:

class ChatViewController: UIViewController {
    @IBOutlet weak var tableView: UITableView!
    private var messages: [ChatMessage] = []

    override func viewDidLoad() {
        super.viewDidLoad()
        startListening()
    }

    private func startListening() {
        Task { @MainActor in
            for await message in chatMessageStream() {
                messages.append(message)
                tableView.insertRows(at: [IndexPath(row: messages.count - 1, section: 0)], with: .automatic)
            }
            print("消息流结束")
        }
    }
}

// UITableViewDataSource
extension ChatViewController: UITableViewDataSource {
    func tableView(_ tableView: UITableView, numberOfRowsInSection section: Int) -> Int {
        messages.count
    }

    func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) -> UITableViewCell {
        let cell = tableView.dequeueReusableCell(withIdentifier: "ChatCell", for: indexPath)
        let message = messages[indexPath.row]
        cell.textLabel?.text = message.text
        cell.detailTextLabel?.text = "\(message.timestamp)"
        return cell
    }
}
  • @MainActor确保UI更新在主线程。
  • 实时插入新消息到表格。

运行结果

  • 每秒收到一条消息,表格动态更新。
  • 随机模拟断开,打印“服务器断开”并结束。

高级用法

  1. 取消支持
    检查Task.isCancelled处理外部取消:

    if Task.isCancelled {
        continuation.finish()
        return
    }
    
  2. 外部输入
    从外部事件(如通知)推送数据:

    AsyncStream { continuation in
        NotificationCenter.default.addObserver(forName: .newMessage, object: nil, queue: nil) { note in
            if let message = note.userInfo?["message"] as? ChatMessage {
                continuation.yield(message)
            }
        }
    }
    
  3. 缓冲策略
    使用.bufferingOldest(10)保留10个最早元素,适合日志场景。

小结

AsyncStream通过闭包和continuation提供了一种简便的方式来创建自定义异步序列。本节通过聊天消息流的案例,展示了其构建和应用过程,强调了实时数据处理的优雅性。掌握AsyncStream,你将能灵活应对动态数据需求。本章回顾了异步序列的三大支柱,下一章将进入任务管理与并发控制的领域,进一步提升你的并发编程能力。


内容说明

  • 结构:从基础到原理,再到实战案例和高级用法,最后总结。
  • 代码:包含简单示例和完整聊天案例,突出实用性。
  • 语气:实践性且深入,适合技术书籍的收尾章节。
  • 衔接:承接前两节(AsyncSequence和for await),预告后续(任务管理)。
Last Updated:: 3/4/25, 10:21 AM