From 8494e51823d8ac29d7c40f2fb6a4872493009629 Mon Sep 17 00:00:00 2001 From: link2026 Date: Wed, 10 Jun 2026 06:42:59 +0800 Subject: [PATCH] =?UTF-8?q?feat(AI):=20=E6=8E=A8=E7=90=86=E9=97=B8?= =?UTF-8?q?=E9=97=A8=E5=8F=8C=E4=BC=98=E5=85=88=E7=BA=A7=20=E2=80=94=20?= =?UTF-8?q?=E5=89=8D=E5=8F=B0=E6=8F=92=E9=98=9F=E3=80=81=E5=90=8E=E5=8F=B0?= =?UTF-8?q?=E6=8C=89=20token=20=E8=AE=A9=E4=BD=8D;=E6=9A=B4=E9=9C=B2?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E4=B8=8E=E5=90=8E=E7=AB=AF=E6=A0=87=E7=AD=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Fable 5 --- 康康/AI/AIRuntime.swift | 83 +++++++++++++++++++++++--- 康康Tests/InferencePriorityTests.swift | 28 +++++++++ 2 files changed, 102 insertions(+), 9 deletions(-) create mode 100644 康康Tests/InferencePriorityTests.swift diff --git a/康康/AI/AIRuntime.swift b/康康/AI/AIRuntime.swift index a8b127d..58ca338 100644 --- a/康康/AI/AIRuntime.swift +++ b/康康/AI/AIRuntime.swift @@ -15,6 +15,13 @@ enum AIRuntimeError: Error, LocalizedError { } } +/// 推理优先级。interactive = 用户正在屏幕前等(识别/问答/自检); +/// background = 预生成(报告摘要等),排队让行、解码中可被协作式抢占。 +nonisolated enum InferencePriority: Sendable, Equatable { + case interactive + case background +} + actor AIRuntime { static let shared = AIRuntime() @@ -29,6 +36,21 @@ actor AIRuntime { private(set) var vlStatus: Status = .notReady private(set) var lastDecodeRate: Double = 0 + /// 末次文本生成的性能统计(性能自检页消费;两后端归一)。 + private(set) var lastGenerateStats: GenerateStats? + + /// 当前实际生效的后端标签(性能自检 / PPT 截图用)。 + var activeBackendLabel: String { + if InferenceEngine.current == .mnn, mnnStatus == .ready { + return InferenceEngine.cpuSupportsSME2 ? "MNN · SME2" : "MNN · NEON" + } + #if targetEnvironment(simulator) + return "MLX · CPU(模拟器)" + #else + return "MLX · GPU" + #endif + } + private var llmSession: LLMSession? private var vlSession: VLSession? @@ -52,30 +74,56 @@ actor AIRuntime { // 这里用 actor 内信号量(count = 1):所有「会占显存的重活」(解码 + 模型加载) // 进入前先 await acquireGate(),结束后 releaseGate()。actor 串行执行保证 // gateBusy / gateWaiters 的读写天然无并发。 + private struct GateWaiter { + let priority: InferencePriority + let cont: CheckedContinuation + } private var gateBusy = false - private var gateWaiters: [CheckedContinuation] = [] + private var gateHolderPriority: InferencePriority = .interactive + private var preemptRequested = false + private var gateWaiters: [GateWaiter] = [] - private func acquireGate() async { + /// interactive 排到所有 background 等待者之前;同优先级保持 FIFO。纯函数,单测覆盖。 + nonisolated static func gateInsertionIndex(of priority: InferencePriority, + in waiting: [InferencePriority]) -> Int { + guard priority == .interactive else { return waiting.count } + return waiting.firstIndex(of: .background) ?? waiting.count + } + + private func acquireGate(_ priority: InferencePriority = .interactive) async { if !gateBusy { gateBusy = true + gateHolderPriority = priority return } + // 前台请求撞上后台持有者:请其让位 —— 后台解码循环在下一个 token 抛 CancellationError。 + if priority == .interactive, gateHolderPriority == .background { + preemptRequested = true + } await withCheckedContinuation { (cont: CheckedContinuation) in - gateWaiters.append(cont) + let idx = Self.gateInsertionIndex(of: priority, in: gateWaiters.map(\.priority)) + gateWaiters.insert(GateWaiter(priority: priority, cont: cont), at: idx) } // 被 releaseGate 唤醒时即已持有闸门(gateBusy 保持 true)。 } private func releaseGate() { + preemptRequested = false if gateWaiters.isEmpty { gateBusy = false } else { // 把闸门直接交给队首等待者,gateBusy 维持 true,不留空窗。 let next = gateWaiters.removeFirst() - next.resume() + gateHolderPriority = next.priority + next.cont.resume() } } + /// 后台持有者每收到一个 token 查一次:前台在排队就让位。 + private func shouldPreempt(_ priority: InferencePriority) -> Bool { + priority == .background && preemptRequested + } + private init() {} /// App 启动时调用一次:给 MLX 的 GPU 缓冲池设上限,避免 reuse cache 在大模型常驻之上 @@ -180,9 +228,12 @@ actor AIRuntime { /// 流式生成。调用前应先 await prepare()。 /// 注意:返回流是同步创建的,但跨 actor 调用 LLMSession 需要 await。 - func generate(prompt: String, maxTokens: Int = 256) -> AsyncThrowingStream { + /// priority = .background 时排队让行、解码中可被前台请求按 token 抢占(CancellationError 透传)。 + func generate(prompt: String, + maxTokens: Int = 256, + priority: InferencePriority = .interactive) -> AsyncThrowingStream { if InferenceEngine.current == .mnn, mnnStatus == .ready { - return mnnGenerate(prompt: prompt, maxTokens: maxTokens) + return mnnGenerate(prompt: prompt, maxTokens: maxTokens, priority: priority) } // 在 actor 隔离上下文中捕获快照,Task 内不再访问 self.status / self.llmSession let snapshotStatus = status @@ -195,7 +246,7 @@ actor AIRuntime { return } // 进闸门:保证本次 LLM 解码与任何 VL 解码 / 模型加载串行,绝不并发占显存。 - await self.acquireGate() + await self.acquireGate(priority) do { // session.generate 跨 actor 边界,需要 await let stream = await session.generate(prompt: prompt, maxTokens: maxTokens) @@ -203,12 +254,18 @@ actor AIRuntime { // 消费者(UI)提前关闭/取消时,下面的 checkCancellation 让本 Task 尽快退出, // 连带丢弃 session 流并触发其 onTermination,停止底层 MLX 解码,不空耗 GPU。 try Task.checkCancellation() + // 后台任务让位:前台请求在排队时,下一个 token 处主动退出。 + if self.shouldPreempt(priority) { throw CancellationError() } // Task 闭包在 generate() 内启动,继承 AIRuntime 的 actor 隔离; // 调用同 actor 的 recordRate 不需要 await self.recordRate(chunk.decodeRate) continuation.yield(chunk) } + self.lastGenerateStats = await session.lastStats continuation.finish() + } catch is CancellationError { + // 取消/抢占以 CancellationError 透传,调用方据此区分「让位」与「真失败」。 + continuation.finish(throwing: CancellationError()) } catch { continuation.finish(throwing: AIRuntimeError.inferenceFailed("\(error)")) } @@ -222,7 +279,9 @@ actor AIRuntime { } /// MNN(CPU/SME2)文本流式生成。结构与 MLX 分支一致:进闸门、串行解码、记录速率。 - private func mnnGenerate(prompt: String, maxTokens: Int) -> AsyncThrowingStream { + private func mnnGenerate(prompt: String, + maxTokens: Int, + priority: InferencePriority) -> AsyncThrowingStream { let ready = (mnnStatus == .ready) return AsyncThrowingStream { continuation in let task = Task { @@ -230,15 +289,21 @@ actor AIRuntime { continuation.finish(throwing: AIRuntimeError.notReady) return } - await self.acquireGate() + await self.acquireGate(priority) do { let stream = await self.mnn.generate(prompt: prompt, maxTokens: maxTokens) for try await chunk in stream { try Task.checkCancellation() + // 后台任务让位:前台请求在排队时,下一个 token 处主动退出 + //(流终止触发 MNNBackend.onTermination → bridge.cancel())。 + if self.shouldPreempt(priority) { throw CancellationError() } self.recordRate(chunk.decodeRate) continuation.yield(chunk) } + self.lastGenerateStats = await self.mnn.lastStats continuation.finish() + } catch is CancellationError { + continuation.finish(throwing: CancellationError()) } catch { continuation.finish(throwing: AIRuntimeError.inferenceFailed("\(error)")) } diff --git a/康康Tests/InferencePriorityTests.swift b/康康Tests/InferencePriorityTests.swift new file mode 100644 index 0000000..a0fa890 --- /dev/null +++ b/康康Tests/InferencePriorityTests.swift @@ -0,0 +1,28 @@ +import Testing +@testable import 康康 + +struct InferencePriorityTests { + + @Test func interactiveJumpsAheadOfBackground() { + let idx = AIRuntime.gateInsertionIndex(of: .interactive, + in: [.interactive, .background, .background]) + #expect(idx == 1) + } + + @Test func interactiveKeepsFIFOAmongInteractive() { + let idx = AIRuntime.gateInsertionIndex(of: .interactive, + in: [.interactive, .interactive]) + #expect(idx == 2) + } + + @Test func backgroundAlwaysAppends() { + let idx = AIRuntime.gateInsertionIndex(of: .background, + in: [.interactive, .background]) + #expect(idx == 2) + } + + @Test func emptyQueueInsertsAtZero() { + #expect(AIRuntime.gateInsertionIndex(of: .interactive, in: []) == 0) + #expect(AIRuntime.gateInsertionIndex(of: .background, in: []) == 0) + } +}