baa-conductor

git clone 

commit
3f5e0e4
parent
cecd1c1
author
im_wower
date
2026-03-30 13:55:53 +0800 CST
discuss: Claude 代码审核 — 现有基础设施盘点 + 新建范围收窄为三件事
1 files changed,  +141, -0
M plans/discuss/DISCUSS-TIMED-JOBS-RENEWAL-REQUIREMENTS.md
+141, -0
  1@@ -183,3 +183,144 @@ API 代理受 Cloudflare 27-30s 超时限制。续命消息发送如果卡住,
  2 2. **讨论点 1**(对话身份映射)— 直接影响表结构设计
  3 3. **讨论点 3**(状态精简)— 影响首版实现复杂度
  4 4. 其余点可以在实现过程中逐步确认
  5+
  6+
  7+## Claude 代码审核回复 — 2026-03-30
  8+
  9+> 基于对以下代码的完整阅读:
 10+> - `ops/sql/schema.sql` — D1 控制面 schema
 11+> - `state/artifact.db` — 本地 SQLite schema(messages / executions / sessions / d1_sync_queue)
 12+> - `apps/conductor-daemon/src/instructions/dedupe.ts` — 指令级去重
 13+> - `apps/conductor-daemon/src/instructions/store.ts` — 持久化去重 + journal
 14+> - `apps/conductor-daemon/src/instructions/ingest.ts` — 消息摄入全流程
 15+> - `packages/d1-client/src/sync-queue.ts` — D1 同步队列
 16+> - `packages/d1-client/src/sync-worker.ts` — D1 同步 worker(轮询 + 指数退避 + 批量推送)
 17+> - `plugins/baa-firefox/final-message.js` — 三家 AI SSE 拦截(ChatGPT / Claude / Gemini)
 18+> - 当前数据:messages 31 条(chatgpt 4, gemini 27),executions 1 条,sync_queue pending 0
 19+
 20+### 现有基础设施盘点
 21+
 22+以下能力已经在代码中实现,续命模块可以直接复用,不需要重建:
 23+
 24+**存储层:** `artifact.db`(SQLite)已有 messages / executions / sessions 三张表 + `d1_sync_queue` 同步队列。`D1SyncWorker` 以可配置间隔轮询,批量推送到 Cloudflare D1,支持指数退避(`baseDelayMs × 2^attempts`,上限 `maxDelayMs`),超过 `maxAttempts` 标记 failed。续命任务表直接加到 `artifact.db`,同步到 D1 复用 `SyncQueue.enqueueSyncRecord()`。
 25+
 26+**去重层:** 三层去重已就位。插件侧 `recentRelayKeys`(内存,最近 20 条,key = `platform|conversation_id|assistant_message_id|raw_text`)。消息级 `baa_message_dedupes` 表(`dedupe_key` = `sha256(platform + assistant_message_id + raw_text)`,UNIQUE 约束)。指令级 `baa_instruction_dedupes` 表(`dedupe_key` = `sha256(platform + conversation_id + assistant_message_id + target + tool + params)`)。续命任务去重以 `message_dedupe_key` 为锚点即可。
 27+
 28+**插件拦截层:** `final-message.js`(28KB)已完整实现三家 AI 的 SSE 结束消息拦截。ChatGPT 支持候选人评分 + 流式合并 + terminal status 判断。Claude 支持 completion event 累积 + URL 提取 conversation_id。Gemini 支持 `)]}'` 前缀处理 + 深度遍历 + prompt 排除。SSE 流式和完整响应两种模式均已覆盖。当前已有 31 条消息入库,说明链路在跑。
 29+
 30+**消息摄入层:** `BaaLiveInstructionIngest.ingestAssistantFinalMessage()` 已完成从 WS 接收 → 去重判断 → 持久化到 artifact.db → 指令提取 → 执行 → journal 记录的完整流程。
 31+
 32+### 对讨论稿五个澄清点的逐条确认
 33+
 34+**1. SQLite + D1 同步:已就绪,无需额外设计。**
 35+
 36+续命任务表加到 `artifact.db`,`D1SyncWorker` 的 `SYNC_COLUMN_WHITELIST` 里加一个 `renewal_jobs` 条目即可自动同步到 D1。
 37+
 38+**2. 去重:现有机制覆盖了需求。**
 39+
 40+George 提出的"平台类型 + 平台消息ID + 内容 hash"正是 `buildBaaLiveInstructionMessageDedupeKey` 的实现逻辑。续命任务生成时,用 `message_dedupe_key` 查重,同一条消息不重复生成续命任务。
 41+
 42+**3. 上下文摘要 + 公开链接:需要新建,但基础字段已有。**
 43+
 44+`messages` 表已有 `summary`(当前大部分为 null)、`static_path`、`page_url` 字段。续命 payload 建议:
 45+
 46+- 摘要:`raw_text` 前 200 字截断 + `...`(首版),后续可改为 AI 摘要
 47+- 链接:拼 `https://conductor.makefile.so/artifacts/messages/{message_id}` 或直接用 `page_url`(原始对话页面 URL)
 48+- 格式:固定模板,如 `"[续命] 上下文摘要:{summary}\n对话链接:{link}"`
 49+
 50+**4. 失败 + 通道暂停:退避机制可复用,通道状态需新建。**
 51+
 52+`D1SyncWorker` 的重试模式(attempts 计数 + 指数退避 + maxAttempts 后 failed)可以直接用在续命任务上。新增需求是**通道级状态管理**:
 53+
 54+- 新建 `renewal_channels` 表或在续命任务表上加 `channel_status` 字段
 55+- 状态:`active` / `paused` / `disabled`
 56+- 触发:续命任务 failed → 该 `platform + conversation_id` 通道自动 paused
 57+- 恢复:手动调用 conductor API(如 `POST /v1/renewal/channels/{id}/resume`)
 58+- 日志:扩展 `baa_execution_journal`,加 `summary_kind = 'renewal'`
 59+
 60+**5. 插件侧:已完成,不是前置依赖。**
 61+
 62+讨论稿第一版验收目标的第 1 步"插件拦截 SSE 结束消息"已在生产中运行。31 条消息已入库证明链路通畅。
 63+
 64+### 真正需要新建的三件事
 65+
 66+基于代码审核,续命模块的新建工作范围收窄为:
 67+
 68+**A. `renewal_jobs` 表** — 续命任务持久化
 69+
 70+```sql
 71+CREATE TABLE IF NOT EXISTS renewal_jobs (
 72+  job_id          TEXT PRIMARY KEY,
 73+  message_id      TEXT NOT NULL,
 74+  platform        TEXT NOT NULL,
 75+  conversation_id TEXT,
 76+  channel_key     TEXT NOT NULL,  -- "platform|conversation_id" 复合键
 77+  payload         TEXT NOT NULL,  -- 续命消息内容(摘要 + 链接)
 78+  status          TEXT NOT NULL DEFAULT 'pending',  -- pending / running / done / failed
 79+  attempt_count   INTEGER NOT NULL DEFAULT 0,
 80+  max_attempts    INTEGER NOT NULL DEFAULT 3,
 81+  last_error      TEXT,
 82+  created_at      INTEGER NOT NULL,
 83+  updated_at      INTEGER NOT NULL,
 84+  started_at      INTEGER,
 85+  finished_at     INTEGER,
 86+  FOREIGN KEY (message_id) REFERENCES messages(id)
 87+);
 88+CREATE INDEX IF NOT EXISTS idx_renewal_jobs_status ON renewal_jobs(status, created_at);
 89+CREATE INDEX IF NOT EXISTS idx_renewal_jobs_channel ON renewal_jobs(channel_key, status);
 90+```
 91+
 92+同步到 D1:在 `SYNC_COLUMN_WHITELIST` 加 `renewal_jobs` 条目。
 93+
 94+**B. 续命判断函数 `shouldRenew(message): boolean`**
 95+
 96+```typescript
 97+// 首版最简规则,独立函数方便后续加策略
 98+function shouldRenew(message: IngestedMessage): boolean {
 99+  // 1. 检查通道是否 paused/disabled
100+  // 2. 检查是否已有 pending/running 的续命任务(防重复)
101+  // 3. 首版:所有 final-message 都续命
102+  return true;
103+}
104+```
105+
106+后续扩展点:冷却期、对话级开关、平台级开关、内容过滤。
107+
108+摘要生成函数:
109+
110+```typescript
111+function buildRenewalPayload(message: ArtifactMessage): string {
112+  const summary = message.raw_text.length > 200
113+    ? message.raw_text.slice(0, 200) + '...'
114+    : message.raw_text;
115+  const link = message.page_url || `https://conductor.makefile.so/artifacts/messages/${message.id}`;
116+  return `[续命] 摘要:${summary}\n链接:${link}`;
117+}
118+```
119+
120+**C. 通道状态管理**
121+
122+可以不单独建表,直接在 `renewal_jobs` 的查询逻辑里实现:
123+
124+- 查某个 `channel_key` 最近 N 条任务是否全部 failed → 自动暂停
125+- 或者在 `system_state` 表里存 `renewal_channel:{channel_key}` → `{"status": "paused", "paused_at": ...}`
126+- 恢复端点:`POST /v1/renewal/channels/:channelKey/resume`
127+
128+### 对 Claude 上一轮六个讨论点的修正
129+
130+基于代码实际,修正之前的建议:
131+
132+- **讨论点 1(对话身份映射):** 不需要单独建 `local_conversations` 表。`messages` 表已有 `conversation_id`,续命任务用 `channel_key = platform|conversation_id` 做复合键即可。三家平台的 conversation_id 提取逻辑在 `final-message.js` 里已经完整实现。
133+- **讨论点 2(共享状态机):** `D1SyncWorker` 的 `attempts + status + exponential backoff` 就是现成的状态机骨架。`renewal_jobs` 直接复用同样模式。
134+- **讨论点 4(超时):** 续命发送是 conductor 内部调用浏览器 API 代理(走内网 `localhost` 或 `100.71.210.78:4317`),不经过 Cloudflare tunnel,无 27-30s 超时问题。但仍建议设 20s 内部超时。
135+- **讨论点 5(扫描周期):** 参考 `D1SyncWorker` 的 `DEFAULT_POLL_INTERVAL_MS`,续命 worker 用同样模式,首版 10s 间隔。
136+
137+### 建议的实现顺序
138+
139+1. 在 `artifact.db` schema 里加 `renewal_jobs` 表
140+2. 在 `SYNC_COLUMN_WHITELIST` 加白名单
141+3. 实现 `shouldRenew()` + `buildRenewalPayload()`
142+4. 实现 `RenewalWorker`(复用 `D1SyncWorker` 的 poll + backoff 模式)
143+5. 在 `ingestAssistantFinalMessage` 流程末尾挂钩:摄入成功 → shouldRenew → 写 renewal_jobs
144+6. 加通道暂停/恢复 API 端点
145+7. 端到端验证