im_wower
·
2026-03-29
T-S042.md
1# Task T-S042:D1 异步适配器与同步队列
2
3## 状态
4
5- 当前状态:`已完成`
6- 规模预估:`M`
7- 依赖任务:`T-S039`(需要表结构,但代码可并行开发)
8- 建议执行者:`Codex`(新 package,参考旧版 D1Client 重写为 TypeScript async)
9
10## 直接给对话的提示词
11
12读 `/Users/george/code/baa-conductor/tasks/T-S042.md` 任务文档,完成开发任务。
13
14如需补背景,再读:
15
16- `/Users/george/code/baa-conductor/plans/ARTIFACT_STATIC_SERVICE.md`
17- `/Users/george/code/baa-old-files/baa/baa-server/lib/d1-client.js`(旧版 D1 适配器参考)
18
19## 当前基线
20
21- 仓库:`/Users/george/code/baa-conductor`
22- 分支基线:`main`
23- 提交:`6ea34e2`
24
25## 分支与 worktree(强制)
26
27每个任务必须使用独立的分支和 worktree,禁止直接在 main 上修改,禁止多个任务共用同一个 worktree。
28
29- 分支名:`feat/d1-async-adapter`
30- worktree 路径:`/Users/george/code/baa-conductor-d1-async-adapter`
31
32开工步骤:
33
341. `cd /Users/george/code/baa-conductor`
352. `git worktree add ../baa-conductor-d1-async-adapter -b feat/d1-async-adapter main`
363. `cd ../baa-conductor-d1-async-adapter`
374. 在这个 worktree 目录里开发,不要回到主仓库目录
38
39完成后提交与推送(由执行者完成,不要合并):
40
411. 在 worktree 里提交所有变更(包括更新后的任务文档)
422. `git push -u origin feat/d1-async-adapter`
43
44合并冲突处理:
45
461. 如果 `git merge` 报冲突,先 `git diff` 查看冲突文件
472. 手动解决冲突后 `git add` 冲突文件
483. `git merge --continue` 完成合并
494. 不要用 `git merge --abort` 然后 force 覆盖
50
51## 目标
52
53实现 TypeScript async 版本的 Cloudflare D1 适配器,以及本地 SQLite → D1 的异步同步队列。新建 D1 数据库,不复用旧版。
54
55## 背景
56
57旧版 baa-server 用 curl 同步调 D1 API,conductor 是 TypeScript 需要 async 版本。D1 作为分布式备份和跨设备访问层,本地 SQLite 先写不阻塞,D1 后台异步推送。
58
59## 涉及仓库
60
61- `/Users/george/code/baa-conductor`
62- `/Users/george/code/baa-old-files/baa/baa-server/lib/d1-client.js`(只读参考)
63
64## 范围
65
66- TypeScript async D1 客户端
67- 同步队列表(本地 SQLite)
68- 后台同步 worker
69- 环境变量配置
70
71## 推荐实现边界
72
73建议新增:
74
75- `packages/d1-client/` — 新 package
76 - `src/client.ts` — async D1 HTTP API 客户端
77 - `src/sync-queue.ts` — 同步队列管理
78 - `src/sync-worker.ts` — 后台同步 worker
79 - `src/types.ts` — 类型定义
80
81## 允许修改的目录
82
83- `/Users/george/code/baa-conductor/packages/` (新建 d1-client)
84- `/Users/george/code/baa-conductor/apps/conductor-daemon/src/index.ts` (启动 sync worker)
85
86## 尽量不要修改
87
88- `/Users/george/code/baa-conductor/packages/db/`
89- `/Users/george/code/baa-conductor/packages/artifact-db/` (T-S039 的范围)
90- `/Users/george/code/baa-conductor/plugins/`
91- `/Users/george/code/baa-conductor/apps/conductor-daemon/src/instructions/`
92
93## 必须完成
94
95### 1. D1 async 客户端
96
97- 参考旧版 `/Users/george/code/baa-old-files/baa/baa-server/lib/d1-client.js`
98- 使用 `fetch`(不用 curl)调 Cloudflare D1 REST API
99- 端点:`https://api.cloudflare.com/client/v4/accounts/{accountId}/d1/database/{databaseId}/query`
100- 方法:`exec(sql)`, `prepare(sql).run(...params)`, `prepare(sql).get(...params)`, `prepare(sql).all(...params)`
101- 所有方法返回 Promise
102- 环境变量:`D1_DATABASE_ID`、`D1_ACCOUNT_ID`、`CLOUDFLARE_API_TOKEN`
103- D1 未配置时,客户端返回 null(不报错),由调用方判断
104
105### 2. 同步队列
106
107- 本地 SQLite 新增 `d1_sync_queue` 表:
108 ```sql
109 CREATE TABLE IF NOT EXISTS d1_sync_queue (
110 id INTEGER PRIMARY KEY AUTOINCREMENT,
111 table_name TEXT NOT NULL,
112 record_id TEXT NOT NULL,
113 operation TEXT NOT NULL, -- 'insert' | 'update' | 'delete'
114 payload TEXT NOT NULL, -- JSON
115 created_at INTEGER NOT NULL,
116 attempts INTEGER NOT NULL DEFAULT 0,
117 last_attempt_at INTEGER,
118 status TEXT NOT NULL DEFAULT 'pending' -- 'pending' | 'synced' | 'failed'
119 );
120 ```
121- 写入本地 artifact DB 后,自动往 sync_queue 插入一条待同步记录
122- 提供 `enqueueSyncRecord()` 和 `dequeuePendingSyncRecords()` 方法
123
124### 3. 后台同步 worker
125
126- conductor 启动时启动 sync worker
127- 定时扫描 `d1_sync_queue`,取 pending 记录批量推送到 D1
128- 成功后标记 `status = 'synced'`
129- 失败后增加 `attempts`,指数退避重试(1s → 2s → 4s → ... 最大 5 分钟)
130- 连续失败 10 次后标记 `status = 'failed'`,不再重试
131- D1 未配置时,worker 不启动(静默跳过)
132- worker 异常不影响 conductor 主流程
133
134### 4. 建 D1 远程数据库
135
136- 提供建库脚本或说明文档
137- 表结构与本地 artifact DB 完全一致(messages、executions、sessions)
138- 不复用旧版 D1 数据库
139
140## 需要特别注意
141
142- D1 同步是可选的,D1 不可用时本地功能完全不受影响
143- sync worker 不能阻塞 conductor 主线程
144- 同步队列表放在本地 artifact.db 中(与 messages/executions 同库)
145- 旧版 D1Client 用的是 `execSync`(curl),新版必须用 async fetch
146- 所有开发必须在 worktree 中进行,不要在主仓库目录修改代码
147
148## 验收标准
149
150- D1 环境变量未设置时,conductor 正常启动,sync worker 不启动
151- D1 环境变量设置后,sync worker 启动,定时扫描队列
152- 本地写入 message 后,sync_queue 中出现 pending 记录
153- sync worker 成功推送后,记录标记为 synced
154- D1 不可达时,记录保持 pending,指数退避重试
155- 连续失败超限后,记录标记为 failed
156- sync worker 异常不影响 conductor 主进程
157
158## 推荐验证命令
159
160- `cd /Users/george/code/baa-conductor-d1-async-adapter && pnpm build`
161- `cd /Users/george/code/baa-conductor-d1-async-adapter && pnpm test`
162- 设置 D1 环境变量后启动 conductor,观察 sync worker 日志
163
164## 执行记录
165
166> 以下内容由执行任务的 AI 填写,创建任务时留空。
167
168### 开始执行
169
170- 执行者:Claude Code (Opus 4.6)
171- 开始时间:2026-03-29
172- 状态变更:`待开始` → `进行中`
173
174### 完成摘要
175
176- 完成时间:2026-03-29
177- 状态变更:`进行中` → `已完成`
178- 修改了哪些文件:
179 - `packages/d1-client/` — 新建 package(6 个源文件 + 1 个测试文件 + 1 个 SQL 建库脚本)
180 - `src/types.ts` — D1 客户端、同步队列、sync worker 类型定义 + sync_queue 建表 SQL
181 - `src/client.ts` — async D1 HTTP API 客户端(fetch 实现)+ `createD1Client` 工厂
182 - `src/sync-queue.ts` — SyncQueue 类,操作本地 d1_sync_queue 表
183 - `src/sync-worker.ts` — D1SyncWorker 后台同步 worker,指数退避重试
184 - `src/index.ts` — 导出 + `createD1SyncWorker` 便捷工厂
185 - `src/node-shims.d.ts` — node:sqlite 类型声明
186 - `src/d1-setup.sql` — D1 远程数据库建表脚本
187 - `src/index.test.js` — 11 个单元测试
188 - `apps/conductor-daemon/src/index.ts` — 导入 d1-client,ConductorRuntime 构造时创建 sync worker,start/stop 时启停
189 - `apps/conductor-daemon/package.json` — 新增 @baa-conductor/d1-client 依赖和 build 步骤
190- 核心实现思路:
191 - D1Client 用 fetch + AbortController 调 Cloudflare D1 REST API,接口兼容旧版(exec / prepare.run / prepare.get / prepare.all),全异步
192 - createD1Client 工厂从环境变量创建客户端,缺少任何变量时返回 null(静默跳过)
193 - SyncQueue 直接操作本地 SQLite d1_sync_queue 表,提供 enqueue / dequeue / markSynced / markAttemptFailed / purgeSynced
194 - D1SyncWorker 定时扫描 pending 记录,每条记录独立推送到 D1(INSERT OR REPLACE),失败按指数退避(1s→2s→4s...最大5分钟),10 次后标记 failed
195 - createD1SyncWorker 便捷工厂封装了 DatabaseSync 创建 + SyncQueue 初始化 + D1Client 创建,conductor-daemon 只需一行调用
196 - ConductorRuntime 构造时通过 createD1SyncWorker 创建 worker(D1 未配置时返回 null),start() 时启动,stop() 时停止
197- 跑了哪些测试:
198 - `pnpm -F @baa-conductor/d1-client test`:11 个测试全部通过
199 - `pnpm build`:全量 workspace build 成功
200 - conductor-daemon 测试中的失败是 main 分支上已存在的问题(localApiBase 相关),与本次修改无关
201
202### 执行过程中遇到的问题
203
204- conductor-daemon 没有 node:sqlite 类型声明(node-shims.d.ts),不能直接 import DatabaseSync。解决方案:在 d1-client 中提供 createD1SyncWorker 工厂函数,封装 DatabaseSync 依赖,daemon 只导入工厂函数
205- conductor-daemon 的 ConductorRuntime 构造函数不是 async,不能用 dynamic import。解决方案:在 d1-client 的 index.ts 中用顶层 static import
206
207### 剩余风险
208
209- sync worker 目前不会自动清理 synced 记录(purgeSynced 方法已实现但未自动调用),长期运行可能积累大量已同步记录 → 已由 T-S045 解决
210- 尚未与 artifact-db 的 insertMessage/insertExecution 集成(自动往 sync_queue 插入记录) → 已由 T-S045 解决
211- **D1 远程数据库尚未创建**:代码和建表脚本(`d1-setup.sql`)已就绪,但 Cloudflare 上还未执行建库操作。当前环境变量指向旧版 baa 的 D1 数据库(表结构不匹配),sync worker 连上去同步不会成功。需要:(1) 在 Cloudflare 创建新 D1 数据库 (2) 用 `d1-setup.sql` 建表 (3) 更新 launchd 环境变量 `D1_DATABASE_ID` 指向新库 (4) 重启 conductor。本地功能不受影响。
212