feat(AI): 推理闸门双优先级 — 前台插队、后台按 token 让位;暴露统计与后端标签
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,13 @@ enum AIRuntimeError: Error, LocalizedError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 推理优先级。interactive = 用户正在屏幕前等(识别/问答/自检);
|
||||||
|
/// background = 预生成(报告摘要等),排队让行、解码中可被协作式抢占。
|
||||||
|
nonisolated enum InferencePriority: Sendable, Equatable {
|
||||||
|
case interactive
|
||||||
|
case background
|
||||||
|
}
|
||||||
|
|
||||||
actor AIRuntime {
|
actor AIRuntime {
|
||||||
static let shared = AIRuntime()
|
static let shared = AIRuntime()
|
||||||
|
|
||||||
@@ -29,6 +36,21 @@ actor AIRuntime {
|
|||||||
private(set) var vlStatus: Status = .notReady
|
private(set) var vlStatus: Status = .notReady
|
||||||
private(set) var lastDecodeRate: Double = 0
|
private(set) var lastDecodeRate: Double = 0
|
||||||
|
|
||||||
|
/// 末次文本生成的性能统计(性能自检页消费;两后端归一)。
|
||||||
|
private(set) var lastGenerateStats: GenerateStats?
|
||||||
|
|
||||||
|
/// 当前实际生效的后端标签(性能自检 / PPT 截图用)。
|
||||||
|
var activeBackendLabel: String {
|
||||||
|
if InferenceEngine.current == .mnn, mnnStatus == .ready {
|
||||||
|
return InferenceEngine.cpuSupportsSME2 ? "MNN · SME2" : "MNN · NEON"
|
||||||
|
}
|
||||||
|
#if targetEnvironment(simulator)
|
||||||
|
return "MLX · CPU(模拟器)"
|
||||||
|
#else
|
||||||
|
return "MLX · GPU"
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
private var llmSession: LLMSession?
|
private var llmSession: LLMSession?
|
||||||
private var vlSession: VLSession?
|
private var vlSession: VLSession?
|
||||||
|
|
||||||
@@ -52,30 +74,56 @@ actor AIRuntime {
|
|||||||
// 这里用 actor 内信号量(count = 1):所有「会占显存的重活」(解码 + 模型加载)
|
// 这里用 actor 内信号量(count = 1):所有「会占显存的重活」(解码 + 模型加载)
|
||||||
// 进入前先 await acquireGate(),结束后 releaseGate()。actor 串行执行保证
|
// 进入前先 await acquireGate(),结束后 releaseGate()。actor 串行执行保证
|
||||||
// gateBusy / gateWaiters 的读写天然无并发。
|
// gateBusy / gateWaiters 的读写天然无并发。
|
||||||
|
private struct GateWaiter {
|
||||||
|
let priority: InferencePriority
|
||||||
|
let cont: CheckedContinuation<Void, Never>
|
||||||
|
}
|
||||||
private var gateBusy = false
|
private var gateBusy = false
|
||||||
private var gateWaiters: [CheckedContinuation<Void, Never>] = []
|
private var gateHolderPriority: InferencePriority = .interactive
|
||||||
|
private var preemptRequested = false
|
||||||
|
private var gateWaiters: [GateWaiter] = []
|
||||||
|
|
||||||
private func acquireGate() async {
|
/// interactive 排到所有 background 等待者之前;同优先级保持 FIFO。纯函数,单测覆盖。
|
||||||
|
nonisolated static func gateInsertionIndex(of priority: InferencePriority,
|
||||||
|
in waiting: [InferencePriority]) -> Int {
|
||||||
|
guard priority == .interactive else { return waiting.count }
|
||||||
|
return waiting.firstIndex(of: .background) ?? waiting.count
|
||||||
|
}
|
||||||
|
|
||||||
|
private func acquireGate(_ priority: InferencePriority = .interactive) async {
|
||||||
if !gateBusy {
|
if !gateBusy {
|
||||||
gateBusy = true
|
gateBusy = true
|
||||||
|
gateHolderPriority = priority
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// 前台请求撞上后台持有者:请其让位 —— 后台解码循环在下一个 token 抛 CancellationError。
|
||||||
|
if priority == .interactive, gateHolderPriority == .background {
|
||||||
|
preemptRequested = true
|
||||||
|
}
|
||||||
await withCheckedContinuation { (cont: CheckedContinuation<Void, Never>) in
|
await withCheckedContinuation { (cont: CheckedContinuation<Void, Never>) in
|
||||||
gateWaiters.append(cont)
|
let idx = Self.gateInsertionIndex(of: priority, in: gateWaiters.map(\.priority))
|
||||||
|
gateWaiters.insert(GateWaiter(priority: priority, cont: cont), at: idx)
|
||||||
}
|
}
|
||||||
// 被 releaseGate 唤醒时即已持有闸门(gateBusy 保持 true)。
|
// 被 releaseGate 唤醒时即已持有闸门(gateBusy 保持 true)。
|
||||||
}
|
}
|
||||||
|
|
||||||
private func releaseGate() {
|
private func releaseGate() {
|
||||||
|
preemptRequested = false
|
||||||
if gateWaiters.isEmpty {
|
if gateWaiters.isEmpty {
|
||||||
gateBusy = false
|
gateBusy = false
|
||||||
} else {
|
} else {
|
||||||
// 把闸门直接交给队首等待者,gateBusy 维持 true,不留空窗。
|
// 把闸门直接交给队首等待者,gateBusy 维持 true,不留空窗。
|
||||||
let next = gateWaiters.removeFirst()
|
let next = gateWaiters.removeFirst()
|
||||||
next.resume()
|
gateHolderPriority = next.priority
|
||||||
|
next.cont.resume()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 后台持有者每收到一个 token 查一次:前台在排队就让位。
|
||||||
|
private func shouldPreempt(_ priority: InferencePriority) -> Bool {
|
||||||
|
priority == .background && preemptRequested
|
||||||
|
}
|
||||||
|
|
||||||
private init() {}
|
private init() {}
|
||||||
|
|
||||||
/// App 启动时调用一次:给 MLX 的 GPU 缓冲池设上限,避免 reuse cache 在大模型常驻之上
|
/// App 启动时调用一次:给 MLX 的 GPU 缓冲池设上限,避免 reuse cache 在大模型常驻之上
|
||||||
@@ -180,9 +228,12 @@ actor AIRuntime {
|
|||||||
|
|
||||||
/// 流式生成。调用前应先 await prepare()。
|
/// 流式生成。调用前应先 await prepare()。
|
||||||
/// 注意:返回流是同步创建的,但跨 actor 调用 LLMSession 需要 await。
|
/// 注意:返回流是同步创建的,但跨 actor 调用 LLMSession 需要 await。
|
||||||
func generate(prompt: String, maxTokens: Int = 256) -> AsyncThrowingStream<TokenChunk, Error> {
|
/// priority = .background 时排队让行、解码中可被前台请求按 token 抢占(CancellationError 透传)。
|
||||||
|
func generate(prompt: String,
|
||||||
|
maxTokens: Int = 256,
|
||||||
|
priority: InferencePriority = .interactive) -> AsyncThrowingStream<TokenChunk, Error> {
|
||||||
if InferenceEngine.current == .mnn, mnnStatus == .ready {
|
if InferenceEngine.current == .mnn, mnnStatus == .ready {
|
||||||
return mnnGenerate(prompt: prompt, maxTokens: maxTokens)
|
return mnnGenerate(prompt: prompt, maxTokens: maxTokens, priority: priority)
|
||||||
}
|
}
|
||||||
// 在 actor 隔离上下文中捕获快照,Task 内不再访问 self.status / self.llmSession
|
// 在 actor 隔离上下文中捕获快照,Task 内不再访问 self.status / self.llmSession
|
||||||
let snapshotStatus = status
|
let snapshotStatus = status
|
||||||
@@ -195,7 +246,7 @@ actor AIRuntime {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 进闸门:保证本次 LLM 解码与任何 VL 解码 / 模型加载串行,绝不并发占显存。
|
// 进闸门:保证本次 LLM 解码与任何 VL 解码 / 模型加载串行,绝不并发占显存。
|
||||||
await self.acquireGate()
|
await self.acquireGate(priority)
|
||||||
do {
|
do {
|
||||||
// session.generate 跨 actor 边界,需要 await
|
// session.generate 跨 actor 边界,需要 await
|
||||||
let stream = await session.generate(prompt: prompt, maxTokens: maxTokens)
|
let stream = await session.generate(prompt: prompt, maxTokens: maxTokens)
|
||||||
@@ -203,12 +254,18 @@ actor AIRuntime {
|
|||||||
// 消费者(UI)提前关闭/取消时,下面的 checkCancellation 让本 Task 尽快退出,
|
// 消费者(UI)提前关闭/取消时,下面的 checkCancellation 让本 Task 尽快退出,
|
||||||
// 连带丢弃 session 流并触发其 onTermination,停止底层 MLX 解码,不空耗 GPU。
|
// 连带丢弃 session 流并触发其 onTermination,停止底层 MLX 解码,不空耗 GPU。
|
||||||
try Task.checkCancellation()
|
try Task.checkCancellation()
|
||||||
|
// 后台任务让位:前台请求在排队时,下一个 token 处主动退出。
|
||||||
|
if self.shouldPreempt(priority) { throw CancellationError() }
|
||||||
// Task 闭包在 generate() 内启动,继承 AIRuntime 的 actor 隔离;
|
// Task 闭包在 generate() 内启动,继承 AIRuntime 的 actor 隔离;
|
||||||
// 调用同 actor 的 recordRate 不需要 await
|
// 调用同 actor 的 recordRate 不需要 await
|
||||||
self.recordRate(chunk.decodeRate)
|
self.recordRate(chunk.decodeRate)
|
||||||
continuation.yield(chunk)
|
continuation.yield(chunk)
|
||||||
}
|
}
|
||||||
|
self.lastGenerateStats = await session.lastStats
|
||||||
continuation.finish()
|
continuation.finish()
|
||||||
|
} catch is CancellationError {
|
||||||
|
// 取消/抢占以 CancellationError 透传,调用方据此区分「让位」与「真失败」。
|
||||||
|
continuation.finish(throwing: CancellationError())
|
||||||
} catch {
|
} catch {
|
||||||
continuation.finish(throwing: AIRuntimeError.inferenceFailed("\(error)"))
|
continuation.finish(throwing: AIRuntimeError.inferenceFailed("\(error)"))
|
||||||
}
|
}
|
||||||
@@ -222,7 +279,9 @@ actor AIRuntime {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// MNN(CPU/SME2)文本流式生成。结构与 MLX 分支一致:进闸门、串行解码、记录速率。
|
/// MNN(CPU/SME2)文本流式生成。结构与 MLX 分支一致:进闸门、串行解码、记录速率。
|
||||||
private func mnnGenerate(prompt: String, maxTokens: Int) -> AsyncThrowingStream<TokenChunk, Error> {
|
private func mnnGenerate(prompt: String,
|
||||||
|
maxTokens: Int,
|
||||||
|
priority: InferencePriority) -> AsyncThrowingStream<TokenChunk, Error> {
|
||||||
let ready = (mnnStatus == .ready)
|
let ready = (mnnStatus == .ready)
|
||||||
return AsyncThrowingStream { continuation in
|
return AsyncThrowingStream { continuation in
|
||||||
let task = Task {
|
let task = Task {
|
||||||
@@ -230,15 +289,21 @@ actor AIRuntime {
|
|||||||
continuation.finish(throwing: AIRuntimeError.notReady)
|
continuation.finish(throwing: AIRuntimeError.notReady)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
await self.acquireGate()
|
await self.acquireGate(priority)
|
||||||
do {
|
do {
|
||||||
let stream = await self.mnn.generate(prompt: prompt, maxTokens: maxTokens)
|
let stream = await self.mnn.generate(prompt: prompt, maxTokens: maxTokens)
|
||||||
for try await chunk in stream {
|
for try await chunk in stream {
|
||||||
try Task.checkCancellation()
|
try Task.checkCancellation()
|
||||||
|
// 后台任务让位:前台请求在排队时,下一个 token 处主动退出
|
||||||
|
//(流终止触发 MNNBackend.onTermination → bridge.cancel())。
|
||||||
|
if self.shouldPreempt(priority) { throw CancellationError() }
|
||||||
self.recordRate(chunk.decodeRate)
|
self.recordRate(chunk.decodeRate)
|
||||||
continuation.yield(chunk)
|
continuation.yield(chunk)
|
||||||
}
|
}
|
||||||
|
self.lastGenerateStats = await self.mnn.lastStats
|
||||||
continuation.finish()
|
continuation.finish()
|
||||||
|
} catch is CancellationError {
|
||||||
|
continuation.finish(throwing: CancellationError())
|
||||||
} catch {
|
} catch {
|
||||||
continuation.finish(throwing: AIRuntimeError.inferenceFailed("\(error)"))
|
continuation.finish(throwing: AIRuntimeError.inferenceFailed("\(error)"))
|
||||||
}
|
}
|
||||||
|
|||||||
28
康康Tests/InferencePriorityTests.swift
Normal file
28
康康Tests/InferencePriorityTests.swift
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
import Testing
|
||||||
|
@testable import 康康
|
||||||
|
|
||||||
|
struct InferencePriorityTests {
|
||||||
|
|
||||||
|
@Test func interactiveJumpsAheadOfBackground() {
|
||||||
|
let idx = AIRuntime.gateInsertionIndex(of: .interactive,
|
||||||
|
in: [.interactive, .background, .background])
|
||||||
|
#expect(idx == 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func interactiveKeepsFIFOAmongInteractive() {
|
||||||
|
let idx = AIRuntime.gateInsertionIndex(of: .interactive,
|
||||||
|
in: [.interactive, .interactive])
|
||||||
|
#expect(idx == 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func backgroundAlwaysAppends() {
|
||||||
|
let idx = AIRuntime.gateInsertionIndex(of: .background,
|
||||||
|
in: [.interactive, .background])
|
||||||
|
#expect(idx == 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func emptyQueueInsertsAtZero() {
|
||||||
|
#expect(AIRuntime.gateInsertionIndex(of: .interactive, in: []) == 0)
|
||||||
|
#expect(AIRuntime.gateInsertionIndex(of: .background, in: []) == 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user