- commit
- ee40b9a
- parent
- 769f669
- author
- im_wower
- date
- 2026-03-29 01:22:09 +0800 CST
feat: wire artifact-db writes to D1 sync queue ArtifactStore now accepts an optional SyncEnqueuer (via setSyncQueue) and enqueues a sync record after each successful insertMessage, insertExecution, and upsertSession. D1SyncWorker exposes getQueue() so ConductorRuntime can inject the queue at startup. The worker also purges old synced records after each poll cycle (keeping 1000 by default). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
8 files changed,
+162,
-5
+5,
-0
1@@ -2178,6 +2178,11 @@ export class ConductorRuntime {
2 databasePath: join(resolvedStateDir, ARTIFACT_DB_FILENAME),
3 fetchImpl: options.fetchImpl
4 });
5+
6+ // Inject sync queue into artifact store so local writes enqueue D1 sync records.
7+ if (this.d1SyncWorker != null) {
8+ this.artifactStore.setSyncQueue(this.d1SyncWorker.getQueue());
9+ }
10 }
11
12 async start(): Promise<ConductorRuntimeSnapshot> {
+1,
-0
1@@ -27,5 +27,6 @@ export {
2 type SessionIndexEntry,
3 type SessionRecord,
4 type SessionTimelineEntry,
5+ type SyncEnqueuer,
6 type UpsertSessionInput
7 } from "./types.js";
+80,
-0
1@@ -32,6 +32,7 @@ import {
2 type SessionIndexEntry,
3 type SessionRecord,
4 type SessionTimelineEntry,
5+ type SyncEnqueuer,
6 type UpsertSessionInput
7 } from "./types.js";
8
9@@ -183,6 +184,7 @@ export class ArtifactStore {
10 private readonly publicBaseUrl: string | null;
11 private readonly sessionIndexLimit: number;
12 private readonly summaryLength: number;
13+ private syncQueue: SyncEnqueuer | null = null;
14
15 constructor(config: ArtifactStoreConfig) {
16 const databasePath = normalizeRequiredPath(config.databasePath, "databasePath");
17@@ -224,6 +226,10 @@ export class ArtifactStore {
18 return this.publicBaseUrl;
19 }
20
21+ setSyncQueue(queue: SyncEnqueuer | null): void {
22+ this.syncQueue = queue;
23+ }
24+
25 async getExecution(instructionId: string): Promise<ExecutionRecord | null> {
26 const row = this.getRow<ExecutionRow>(
27 "SELECT * FROM executions WHERE instruction_id = ? LIMIT 1;",
28@@ -281,6 +287,8 @@ export class ArtifactStore {
29 );
30 });
31
32+ this.enqueueSync("executions", record.instructionId, executionSyncPayload(record));
33+
34 return record;
35 }
36
37@@ -307,6 +315,8 @@ export class ArtifactStore {
38 );
39 });
40
41+ this.enqueueSync("messages", record.id, messageSyncPayload(record));
42+
43 return record;
44 }
45
46@@ -391,6 +401,8 @@ export class ArtifactStore {
47 );
48 });
49
50+ this.enqueueSync("sessions", record.id, sessionSyncPayload(record));
51+
52 return record;
53 }
54
55@@ -595,6 +607,23 @@ export class ArtifactStore {
56 });
57 }
58
59+ private enqueueSync(tableName: string, recordId: string, payload: Record<string, unknown>): void {
60+ if (this.syncQueue == null) {
61+ return;
62+ }
63+
64+ try {
65+ this.syncQueue.enqueueSyncRecord({
66+ tableName,
67+ recordId,
68+ operation: "insert",
69+ payload
70+ });
71+ } catch {
72+ // Best-effort: enqueue failure must not affect the already-committed local write.
73+ }
74+ }
75+
76 private renderConfig(): { publicBaseUrl: string | null } {
77 return {
78 publicBaseUrl: this.publicBaseUrl
79@@ -971,6 +1000,57 @@ function resolveSessionSummary(
80 return existing.summary;
81 }
82
83+function messageSyncPayload(record: MessageRecord): Record<string, unknown> {
84+ return {
85+ id: record.id,
86+ platform: record.platform,
87+ conversation_id: record.conversationId,
88+ role: record.role,
89+ raw_text: record.rawText,
90+ summary: record.summary,
91+ observed_at: record.observedAt,
92+ static_path: record.staticPath,
93+ page_url: record.pageUrl,
94+ page_title: record.pageTitle,
95+ organization_id: record.organizationId,
96+ created_at: record.createdAt
97+ };
98+}
99+
100+function executionSyncPayload(record: ExecutionRecord): Record<string, unknown> {
101+ return {
102+ instruction_id: record.instructionId,
103+ message_id: record.messageId,
104+ target: record.target,
105+ tool: record.tool,
106+ params: record.params,
107+ params_kind: record.paramsKind,
108+ result_ok: record.resultOk ? 1 : 0,
109+ result_data: record.resultData,
110+ result_summary: record.resultSummary,
111+ result_error: record.resultError,
112+ http_status: record.httpStatus,
113+ executed_at: record.executedAt,
114+ static_path: record.staticPath,
115+ created_at: record.createdAt
116+ };
117+}
118+
119+function sessionSyncPayload(record: SessionRecord): Record<string, unknown> {
120+ return {
121+ id: record.id,
122+ platform: record.platform,
123+ conversation_id: record.conversationId,
124+ started_at: record.startedAt,
125+ last_activity_at: record.lastActivityAt,
126+ message_count: record.messageCount,
127+ execution_count: record.executionCount,
128+ summary: record.summary,
129+ static_path: record.staticPath,
130+ created_at: record.createdAt
131+ };
132+}
133+
134 function buildStableHash(value: string): string {
135 let hash = 2_166_136_261;
136
+14,
-0
1@@ -17,6 +17,20 @@ export interface ArtifactStoreConfig {
2 summaryLength?: number;
3 }
4
5+/**
6+ * Minimal interface for enqueuing sync records.
7+ * Matches SyncQueue.enqueueSyncRecord from d1-client so artifact-db
8+ * does not need a direct dependency on d1-client.
9+ */
10+export interface SyncEnqueuer {
11+ enqueueSyncRecord(input: {
12+ tableName: string;
13+ recordId: string;
14+ operation: string;
15+ payload: unknown;
16+ }): void;
17+}
18+
19 export interface MessageRecord {
20 id: string;
21 platform: string;
+18,
-0
1@@ -58,6 +58,17 @@ DELETE FROM d1_sync_queue
2 WHERE status = 'synced' AND last_attempt_at < ?;
3 `;
4
5+const DELETE_SYNCED_KEEP_RECENT_SQL = `
6+DELETE FROM d1_sync_queue
7+WHERE status = 'synced'
8+ AND id NOT IN (
9+ SELECT id FROM d1_sync_queue
10+ WHERE status = 'synced'
11+ ORDER BY last_attempt_at DESC
12+ LIMIT ?
13+ );
14+`;
15+
16 // ---------------------------------------------------------------------------
17 // SyncQueue
18 // ---------------------------------------------------------------------------
19@@ -120,6 +131,13 @@ export class SyncQueue {
20 const cutoff = Date.now() - olderThanMs;
21 this.db.prepare(DELETE_SYNCED_SQL).run(cutoff);
22 }
23+
24+ /**
25+ * Purge synced records, keeping only the most recent `keepCount`.
26+ */
27+ purgeSyncedKeepRecent(keepCount: number): void {
28+ this.db.prepare(DELETE_SYNCED_KEEP_RECENT_SQL).run(keepCount);
29+ }
30 }
31
32 // ---------------------------------------------------------------------------
+19,
-0
1@@ -9,6 +9,8 @@ import {
2 DEFAULT_POLL_INTERVAL_MS
3 } from "./types.js";
4
5+const DEFAULT_PURGE_KEEP_COUNT = 1000;
6+
7 export interface SyncWorkerDeps {
8 d1: D1Client;
9 queue: SyncQueue;
10@@ -33,6 +35,7 @@ export class D1SyncWorker {
11 private readonly maxAttempts: number;
12 private readonly baseDelayMs: number;
13 private readonly maxDelayMs: number;
14+ private readonly purgeKeepCount: number;
15 private readonly log: (message: string) => void;
16
17 private timer: ReturnType<typeof globalThis.setTimeout> | null = null;
18@@ -47,9 +50,14 @@ export class D1SyncWorker {
19 this.maxAttempts = deps.config?.maxAttempts ?? DEFAULT_MAX_ATTEMPTS;
20 this.baseDelayMs = deps.config?.baseDelayMs ?? DEFAULT_BASE_DELAY_MS;
21 this.maxDelayMs = deps.config?.maxDelayMs ?? DEFAULT_MAX_DELAY_MS;
22+ this.purgeKeepCount = deps.config?.purgeKeepCount ?? DEFAULT_PURGE_KEEP_COUNT;
23 this.log = deps.log ?? defaultLog;
24 }
25
26+ getQueue(): SyncQueue {
27+ return this.queue;
28+ }
29+
30 /**
31 * Start the background polling loop. Safe to call multiple times — subsequent
32 * calls are no-ops while the worker is already running.
33@@ -106,6 +114,7 @@ export class D1SyncWorker {
34
35 try {
36 await this.processBatch();
37+ this.purgeCompleted();
38 } catch (error) {
39 this.log(
40 `[d1-sync] unexpected error: ${error instanceof Error ? error.message : String(error)}`
41@@ -148,6 +157,16 @@ export class D1SyncWorker {
42 }
43 }
44
45+ private purgeCompleted(): void {
46+ try {
47+ this.queue.purgeSyncedKeepRecent(this.purgeKeepCount);
48+ } catch (error) {
49+ this.log(
50+ `[d1-sync] purge failed: ${error instanceof Error ? error.message : String(error)}`
51+ );
52+ }
53+ }
54+
55 private async syncRecord(record: SyncQueueRecord): Promise<void> {
56 try {
57 const payload: Record<string, unknown> = JSON.parse(record.payload);
+2,
-0
1@@ -95,6 +95,8 @@ export interface SyncWorkerConfig {
2 baseDelayMs?: number;
3 /** Maximum backoff delay in milliseconds (default 300 000 = 5 minutes). */
4 maxDelayMs?: number;
5+ /** Number of synced records to keep when purging (default 1000). */
6+ purgeKeepCount?: number;
7 }
8
9 export const DEFAULT_POLL_INTERVAL_MS = 10_000;
+23,
-5
1@@ -2,7 +2,7 @@
2
3 ## 状态
4
5-- 当前状态:`待开始`
6+- 当前状态:`已完成`
7 - 规模预估:`S`
8 - 依赖任务:`T-S040`、`T-S042`
9 - 建议执行者:`Claude`(需要理解 artifact-db store 和 d1-client sync-queue 的交互,在写入路径中精确插入 enqueue 调用)
10@@ -123,21 +123,39 @@ T-S042 实现了 D1 客户端和同步队列,但还没与 artifact-db 的写
11
12 ### 开始执行
13
14-- 执行者:
15-- 开始时间:
16+- 执行者:Claude
17+- 开始时间:2026-03-29
18 - 状态变更:`待开始` → `进行中`
19
20 ### 完成摘要
21
22-- 完成时间:
23+- 完成时间:2026-03-29
24 - 状态变更:`进行中` → `已完成`
25 - 修改了哪些文件:
26+ - `packages/artifact-db/src/types.ts` — 新增 `SyncEnqueuer` 接口
27+ - `packages/artifact-db/src/index.ts` — 导出 `SyncEnqueuer` 类型
28+ - `packages/artifact-db/src/store.ts` — ArtifactStore 增加 `syncQueue` 字段、`setSyncQueue()` setter,三个写入方法(insertMessage、insertExecution、upsertSession)在事务提交后 enqueue 同步记录
29+ - `packages/d1-client/src/types.ts` — SyncWorkerConfig 增加 `purgeKeepCount` 选项
30+ - `packages/d1-client/src/sync-queue.ts` — 新增 `purgeSyncedKeepRecent()` 按数量保留的清理方法
31+ - `packages/d1-client/src/sync-worker.ts` — 新增 `getQueue()` 访问器,tick 后自动调用 `purgeCompleted()`
32+ - `apps/conductor-daemon/src/index.ts` — ConductorRuntime 构造函数中将 D1SyncWorker 的 SyncQueue 注入 ArtifactStore
33 - 核心实现思路:
34+ - 在 artifact-db 中定义最小化 `SyncEnqueuer` 接口,避免对 d1-client 的直接依赖
35+ - 通过 setter 注入(而非构造函数),因为 ArtifactStore 在 D1SyncWorker 之前创建
36+ - enqueue 在 `executeWrite`(SQLite 事务)提交成功之后执行,失败时静默 catch,不影响本地写入
37+ - payload 使用 snake_case 列名,与 D1 表结构一致,供 sync-worker 的 buildUpsertSql 直接使用
38+ - sync worker 每次 tick 后调用 `purgeSyncedKeepRecent(1000)` 清理已同步记录
39 - 跑了哪些测试:
40+ - `pnpm build` — 全量构建通过
41+ - `pnpm --filter @baa-conductor/artifact-db test` — 1 test passed
42+ - `pnpm --filter @baa-conductor/d1-client test` — 11 tests passed
43+ - `pnpm test` — conductor-daemon 有若干预存失败(main 分支同样失败,与本次改动无关)
44
45 ### 执行过程中遇到的问题
46
47-> 记录执行过程中遇到的阻塞、环境问题、临时绕过方案等。合并时由合并者判断是否需要修复或建新任务。
48+- conductor-daemon 测试中有多个 `handleConductorHttpRequest` 相关的 `TypeError: Cannot read properties of undefined (reading 'localApiBase')` 失败,经确认 main 分支同样存在,与本次改动无关。
49
50 ### 剩余风险
51
52+- `insertMessage` / `insertExecution` 内部调用 `buildDerivedSessionRecord` 会隐式 upsert session,但本次只在顶层三个方法末尾 enqueue 对应表(messages/executions/sessions),内部 session upsert 没有单独 enqueue。如果需要 D1 侧也同步 session 的每次更新,可以在 `writeSessionArtifacts` 之后也加 enqueue,但这会增加同步量。当前策略是:session 数据通过后续的 `upsertSession` 调用或下一次 message/execution 写入时自然同步。
53+