baa-conductor

git clone 

baa-conductor / tasks
im_wower  ·  2026-03-29

T-S042.md

  1# Task T-S042:D1 异步适配器与同步队列
  2
  3## 状态
  4
  5- 当前状态:`已完成`
  6- 规模预估:`M`
  7- 依赖任务:`T-S039`(需要表结构,但代码可并行开发)
  8- 建议执行者:`Codex`(新 package,参考旧版 D1Client 重写为 TypeScript async)
  9
 10## 直接给对话的提示词
 11
 12`/Users/george/code/baa-conductor/tasks/T-S042.md` 任务文档,完成开发任务。
 13
 14如需补背景,再读:
 15
 16- `/Users/george/code/baa-conductor/plans/ARTIFACT_STATIC_SERVICE.md`
 17- `/Users/george/code/baa-old-files/baa/baa-server/lib/d1-client.js`(旧版 D1 适配器参考)
 18
 19## 当前基线
 20
 21- 仓库:`/Users/george/code/baa-conductor`
 22- 分支基线:`main`
 23- 提交:`6ea34e2`
 24
 25## 分支与 worktree(强制)
 26
 27每个任务必须使用独立的分支和 worktree,禁止直接在 main 上修改,禁止多个任务共用同一个 worktree。
 28
 29- 分支名:`feat/d1-async-adapter`
 30- worktree 路径:`/Users/george/code/baa-conductor-d1-async-adapter`
 31
 32开工步骤:
 33
 341. `cd /Users/george/code/baa-conductor`
 352. `git worktree add ../baa-conductor-d1-async-adapter -b feat/d1-async-adapter main`
 363. `cd ../baa-conductor-d1-async-adapter`
 374. 在这个 worktree 目录里开发,不要回到主仓库目录
 38
 39完成后提交与推送(由执行者完成,不要合并):
 40
 411. 在 worktree 里提交所有变更(包括更新后的任务文档)
 422. `git push -u origin feat/d1-async-adapter`
 43
 44合并冲突处理:
 45
 461. 如果 `git merge` 报冲突,先 `git diff` 查看冲突文件
 472. 手动解决冲突后 `git add` 冲突文件
 483. `git merge --continue` 完成合并
 494. 不要用 `git merge --abort` 然后 force 覆盖
 50
 51## 目标
 52
 53实现 TypeScript async 版本的 Cloudflare D1 适配器,以及本地 SQLite → D1 的异步同步队列。新建 D1 数据库,不复用旧版。
 54
 55## 背景
 56
 57旧版 baa-server 用 curl 同步调 D1 API,conductor 是 TypeScript 需要 async 版本。D1 作为分布式备份和跨设备访问层,本地 SQLite 先写不阻塞,D1 后台异步推送。
 58
 59## 涉及仓库
 60
 61- `/Users/george/code/baa-conductor`
 62- `/Users/george/code/baa-old-files/baa/baa-server/lib/d1-client.js`(只读参考)
 63
 64## 范围
 65
 66- TypeScript async D1 客户端
 67- 同步队列表(本地 SQLite)
 68- 后台同步 worker
 69- 环境变量配置
 70
 71## 推荐实现边界
 72
 73建议新增:
 74
 75- `packages/d1-client/` — 新 package
 76  - `src/client.ts` — async D1 HTTP API 客户端
 77  - `src/sync-queue.ts` — 同步队列管理
 78  - `src/sync-worker.ts` — 后台同步 worker
 79  - `src/types.ts` — 类型定义
 80
 81## 允许修改的目录
 82
 83- `/Users/george/code/baa-conductor/packages/` (新建 d1-client)
 84- `/Users/george/code/baa-conductor/apps/conductor-daemon/src/index.ts` (启动 sync worker)
 85
 86## 尽量不要修改
 87
 88- `/Users/george/code/baa-conductor/packages/db/`
 89- `/Users/george/code/baa-conductor/packages/artifact-db/` (T-S039 的范围)
 90- `/Users/george/code/baa-conductor/plugins/`
 91- `/Users/george/code/baa-conductor/apps/conductor-daemon/src/instructions/`
 92
 93## 必须完成
 94
 95### 1. D1 async 客户端
 96
 97- 参考旧版 `/Users/george/code/baa-old-files/baa/baa-server/lib/d1-client.js`
 98- 使用 `fetch`(不用 curl)调 Cloudflare D1 REST API
 99- 端点:`https://api.cloudflare.com/client/v4/accounts/{accountId}/d1/database/{databaseId}/query`
100- 方法:`exec(sql)`, `prepare(sql).run(...params)`, `prepare(sql).get(...params)`, `prepare(sql).all(...params)`
101- 所有方法返回 Promise
102- 环境变量:`D1_DATABASE_ID`、`D1_ACCOUNT_ID`、`CLOUDFLARE_API_TOKEN`
103- D1 未配置时,客户端返回 null(不报错),由调用方判断
104
105### 2. 同步队列
106
107- 本地 SQLite 新增 `d1_sync_queue` 表:
108  ```sql
109  CREATE TABLE IF NOT EXISTS d1_sync_queue (
110    id INTEGER PRIMARY KEY AUTOINCREMENT,
111    table_name TEXT NOT NULL,
112    record_id TEXT NOT NULL,
113    operation TEXT NOT NULL,      -- 'insert' | 'update' | 'delete'
114    payload TEXT NOT NULL,        -- JSON
115    created_at INTEGER NOT NULL,
116    attempts INTEGER NOT NULL DEFAULT 0,
117    last_attempt_at INTEGER,
118    status TEXT NOT NULL DEFAULT 'pending'  -- 'pending' | 'synced' | 'failed'
119  );
120  ```
121- 写入本地 artifact DB 后,自动往 sync_queue 插入一条待同步记录
122- 提供 `enqueueSyncRecord()``dequeuePendingSyncRecords()` 方法
123
124### 3. 后台同步 worker
125
126- conductor 启动时启动 sync worker
127- 定时扫描 `d1_sync_queue`,取 pending 记录批量推送到 D1
128- 成功后标记 `status = 'synced'`
129- 失败后增加 `attempts`,指数退避重试(1s → 2s → 4s → ... 最大 5 分钟)
130- 连续失败 10 次后标记 `status = 'failed'`,不再重试
131- D1 未配置时,worker 不启动(静默跳过)
132- worker 异常不影响 conductor 主流程
133
134### 4. 建 D1 远程数据库
135
136- 提供建库脚本或说明文档
137- 表结构与本地 artifact DB 完全一致(messages、executions、sessions)
138- 不复用旧版 D1 数据库
139
140## 需要特别注意
141
142- D1 同步是可选的,D1 不可用时本地功能完全不受影响
143- sync worker 不能阻塞 conductor 主线程
144- 同步队列表放在本地 artifact.db 中(与 messages/executions 同库)
145- 旧版 D1Client 用的是 `execSync`(curl),新版必须用 async fetch
146- 所有开发必须在 worktree 中进行,不要在主仓库目录修改代码
147
148## 验收标准
149
150- D1 环境变量未设置时,conductor 正常启动,sync worker 不启动
151- D1 环境变量设置后,sync worker 启动,定时扫描队列
152- 本地写入 message 后,sync_queue 中出现 pending 记录
153- sync worker 成功推送后,记录标记为 synced
154- D1 不可达时,记录保持 pending,指数退避重试
155- 连续失败超限后,记录标记为 failed
156- sync worker 异常不影响 conductor 主进程
157
158## 推荐验证命令
159
160- `cd /Users/george/code/baa-conductor-d1-async-adapter && pnpm build`
161- `cd /Users/george/code/baa-conductor-d1-async-adapter && pnpm test`
162- 设置 D1 环境变量后启动 conductor,观察 sync worker 日志
163
164## 执行记录
165
166> 以下内容由执行任务的 AI 填写,创建任务时留空。
167
168### 开始执行
169
170- 执行者:Claude Code (Opus 4.6)
171- 开始时间:2026-03-29
172- 状态变更:`待开始` → `进行中`
173
174### 完成摘要
175
176- 完成时间:2026-03-29
177- 状态变更:`进行中` → `已完成`
178- 修改了哪些文件:
179  - `packages/d1-client/` — 新建 package(6 个源文件 + 1 个测试文件 + 1 个 SQL 建库脚本)
180    - `src/types.ts` — D1 客户端、同步队列、sync worker 类型定义 + sync_queue 建表 SQL
181    - `src/client.ts` — async D1 HTTP API 客户端(fetch 实现)+ `createD1Client` 工厂
182    - `src/sync-queue.ts` — SyncQueue 类,操作本地 d1_sync_queue 表
183    - `src/sync-worker.ts` — D1SyncWorker 后台同步 worker,指数退避重试
184    - `src/index.ts` — 导出 + `createD1SyncWorker` 便捷工厂
185    - `src/node-shims.d.ts` — node:sqlite 类型声明
186    - `src/d1-setup.sql` — D1 远程数据库建表脚本
187    - `src/index.test.js` — 11 个单元测试
188  - `apps/conductor-daemon/src/index.ts` — 导入 d1-client,ConductorRuntime 构造时创建 sync worker,start/stop 时启停
189  - `apps/conductor-daemon/package.json` — 新增 @baa-conductor/d1-client 依赖和 build 步骤
190- 核心实现思路:
191  - D1Client 用 fetch + AbortController 调 Cloudflare D1 REST API,接口兼容旧版(exec / prepare.run / prepare.get / prepare.all),全异步
192  - createD1Client 工厂从环境变量创建客户端,缺少任何变量时返回 null(静默跳过)
193  - SyncQueue 直接操作本地 SQLite d1_sync_queue 表,提供 enqueue / dequeue / markSynced / markAttemptFailed / purgeSynced
194  - D1SyncWorker 定时扫描 pending 记录,每条记录独立推送到 D1(INSERT OR REPLACE),失败按指数退避(1s→2s→4s...最大5分钟),10 次后标记 failed
195  - createD1SyncWorker 便捷工厂封装了 DatabaseSync 创建 + SyncQueue 初始化 + D1Client 创建,conductor-daemon 只需一行调用
196  - ConductorRuntime 构造时通过 createD1SyncWorker 创建 worker(D1 未配置时返回 null),start() 时启动,stop() 时停止
197- 跑了哪些测试:
198  - `pnpm -F @baa-conductor/d1-client test`:11 个测试全部通过
199  - `pnpm build`:全量 workspace build 成功
200  - conductor-daemon 测试中的失败是 main 分支上已存在的问题(localApiBase 相关),与本次修改无关
201
202### 执行过程中遇到的问题
203
204- conductor-daemon 没有 node:sqlite 类型声明(node-shims.d.ts),不能直接 import DatabaseSync。解决方案:在 d1-client 中提供 createD1SyncWorker 工厂函数,封装 DatabaseSync 依赖,daemon 只导入工厂函数
205- conductor-daemon 的 ConductorRuntime 构造函数不是 async,不能用 dynamic import。解决方案:在 d1-client 的 index.ts 中用顶层 static import
206
207### 剩余风险
208
209- sync worker 目前不会自动清理 synced 记录(purgeSynced 方法已实现但未自动调用),长期运行可能积累大量已同步记录 → 已由 T-S045 解决
210- 尚未与 artifact-db 的 insertMessage/insertExecution 集成(自动往 sync_queue 插入记录) → 已由 T-S045 解决
211- **D1 远程数据库尚未创建**:代码和建表脚本(`d1-setup.sql`)已就绪,但 Cloudflare 上还未执行建库操作。当前环境变量指向旧版 baa 的 D1 数据库(表结构不匹配),sync worker 连上去同步不会成功。需要:(1) 在 Cloudflare 创建新 D1 数据库 (2) 用 `d1-setup.sql` 建表 (3) 更新 launchd 环境变量 `D1_DATABASE_ID` 指向新库 (4) 重启 conductor。本地功能不受影响。
212