- commit
- 80ecf7b
- parent
- 3d8e5b5
- author
- im_wower
- date
- 2026-03-29 00:57:49 +0800 CST
feat: add D1 async adapter and sync queue package New @baa-conductor/d1-client package with: - Async D1 HTTP API client (fetch-based, replaces old curl/execSync approach) - Local SQLite sync queue (d1_sync_queue table) for buffering writes - Background sync worker with exponential backoff (1s-5min, max 10 retries) - Factory function (createD1SyncWorker) for zero-config integration - D1 remote database setup script (d1-setup.sql) - 11 unit tests covering client, queue, worker lifecycle Integrated into conductor-daemon: sync worker starts/stops with the runtime when D1_DATABASE_ID, D1_ACCOUNT_ID, and CLOUDFLARE_API_TOKEN are set. Silently skipped when D1 is not configured. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
14 files changed,
+1159,
-7
+3,
-2
1@@ -5,14 +5,15 @@
2 "main": "dist/index.js",
3 "dependencies": {
4 "@baa-conductor/artifact-db": "workspace:*",
5+ "@baa-conductor/d1-client": "workspace:*",
6 "@baa-conductor/db": "workspace:*",
7 "@baa-conductor/host-ops": "workspace:*"
8 },
9 "scripts": {
10- "build": "pnpm -C ../.. -F @baa-conductor/db build && pnpm -C ../.. -F @baa-conductor/artifact-db build && pnpm -C ../.. -F @baa-conductor/host-ops build && pnpm -C ../.. -F @baa-conductor/status-api build && pnpm exec tsc -p tsconfig.json",
11+ "build": "pnpm -C ../.. -F @baa-conductor/db build && pnpm -C ../.. -F @baa-conductor/artifact-db build && pnpm -C ../.. -F @baa-conductor/d1-client build && pnpm -C ../.. -F @baa-conductor/host-ops build && pnpm -C ../.. -F @baa-conductor/status-api build && pnpm exec tsc -p tsconfig.json",
12 "dev": "pnpm run build && node dist/index.js",
13 "start": "node dist/index.js",
14 "test": "pnpm run build && node --test src/index.test.js",
15- "typecheck": "pnpm -C ../.. -F @baa-conductor/db build && pnpm -C ../.. -F @baa-conductor/artifact-db build && pnpm -C ../.. -F @baa-conductor/host-ops build && pnpm -C ../.. -F @baa-conductor/status-api build && pnpm exec tsc --noEmit -p tsconfig.json"
16+ "typecheck": "pnpm -C ../.. -F @baa-conductor/db build && pnpm -C ../.. -F @baa-conductor/artifact-db build && pnpm -C ../.. -F @baa-conductor/d1-client build && pnpm -C ../.. -F @baa-conductor/host-ops build && pnpm -C ../.. -F @baa-conductor/status-api build && pnpm exec tsc --noEmit -p tsconfig.json"
17 }
18 }
+17,
-0
1@@ -16,6 +16,10 @@ import {
2 DEFAULT_BAA_EXECUTION_JOURNAL_LIMIT,
3 type ControlPlaneRepository
4 } from "../../../packages/db/dist/index.js";
5+import {
6+ createD1SyncWorker,
7+ type D1SyncWorker
8+} from "../../../packages/d1-client/dist/index.js";
9
10 import {
11 type ConductorHttpRequest,
12@@ -274,6 +278,7 @@ export interface ConductorDaemonOptions {
13
14 export interface ConductorRuntimeOptions extends ConductorDaemonOptions {
15 browserRequestPolicyOptions?: BrowserRequestPolicyControllerOptions;
16+ env?: ConductorEnvironment;
17 }
18
19 export type ConductorEnvironment = Record<string, string | undefined>;
20@@ -2116,6 +2121,7 @@ export class ConductorRuntime {
21 private readonly artifactStore: ArtifactStore;
22 private readonly config: ResolvedConductorRuntimeConfig;
23 private readonly daemon: ConductorDaemon;
24+ private readonly d1SyncWorker: D1SyncWorker | null;
25 private readonly localControlPlane: ConductorLocalControlPlane;
26 private readonly localApiServer: ConductorLocalHttpServer | null;
27 private localControlPlaneInitialized = false;
28@@ -2165,6 +2171,13 @@ export class ConductorRuntime {
29 this.config.artifactSummaryLength,
30 options.browserRequestPolicyOptions
31 );
32+
33+ // D1 sync worker — silently skipped when env vars are not set.
34+ this.d1SyncWorker = createD1SyncWorker({
35+ env: options.env ?? getProcessLike()?.env ?? {},
36+ databasePath: join(resolvedStateDir, ARTIFACT_DB_FILENAME),
37+ fetchImpl: options.fetchImpl
38+ });
39 }
40
41 async start(): Promise<ConductorRuntimeSnapshot> {
42@@ -2184,6 +2197,8 @@ export class ConductorRuntime {
43 this.daemon.stop();
44 throw error;
45 }
46+
47+ this.d1SyncWorker?.start();
48 }
49
50 return this.getRuntimeSnapshot();
51@@ -2191,6 +2206,7 @@ export class ConductorRuntime {
52
53 async stop(): Promise<ConductorRuntimeSnapshot> {
54 this.started = false;
55+ this.d1SyncWorker?.stop();
56 this.daemon.stop();
57 await this.localApiServer?.stop();
58
59@@ -2311,6 +2327,7 @@ export async function runConductorCli(options: RunConductorCliOptions = {}): Pro
60
61 const runtime = new ConductorRuntime(request.config, {
62 autoStartLoops: !request.runOnce,
63+ env,
64 fetchImpl: options.fetchImpl
65 });
66 const snapshot = await runtime.start();
+12,
-0
1@@ -0,0 +1,12 @@
2+{
3+ "name": "@baa-conductor/d1-client",
4+ "private": true,
5+ "type": "module",
6+ "main": "dist/index.js",
7+ "types": "dist/index.d.ts",
8+ "scripts": {
9+ "build": "pnpm exec tsc -p tsconfig.json",
10+ "test": "pnpm run build && node --test src/index.test.js",
11+ "typecheck": "pnpm exec tsc --noEmit -p tsconfig.json"
12+ }
13+}
+167,
-0
1@@ -0,0 +1,167 @@
2+import type {
3+ D1ApiResponse,
4+ D1ClientConfig,
5+ D1PreparedStatement
6+} from "./types.js";
7+import { DEFAULT_TIMEOUT_MS } from "./types.js";
8+
9+/**
10+ * Async Cloudflare D1 HTTP API client.
11+ *
12+ * Returns `null` from {@link createD1Client} when environment variables are
13+ * missing so that callers can treat D1 as optional without error handling.
14+ */
15+export class D1Client {
16+ private readonly url: string;
17+ private readonly apiToken: string;
18+ private readonly fetchImpl: typeof globalThis.fetch;
19+ private readonly timeoutMs: number;
20+
21+ constructor(config: D1ClientConfig) {
22+ this.url = `https://api.cloudflare.com/client/v4/accounts/${config.accountId}/d1/database/${config.databaseId}/query`;
23+ this.apiToken = config.apiToken;
24+ this.fetchImpl = config.fetchImpl ?? globalThis.fetch;
25+ this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
26+ }
27+
28+ /**
29+ * Low-level query — sends a single SQL statement with optional params.
30+ */
31+ private async query<T = Record<string, unknown>>(
32+ sql: string,
33+ params: unknown[] = []
34+ ): Promise<{ results: T[]; meta: { changes: number } }> {
35+ const controller = new AbortController();
36+ const timer = setTimeout(() => controller.abort(), this.timeoutMs);
37+
38+ let response: Response;
39+ try {
40+ response = await this.fetchImpl(this.url, {
41+ method: "POST",
42+ headers: {
43+ Authorization: `Bearer ${this.apiToken}`,
44+ "Content-Type": "application/json"
45+ },
46+ body: JSON.stringify({ sql, params }),
47+ signal: controller.signal
48+ });
49+ } catch (error) {
50+ throw new Error(
51+ `D1 network error: ${error instanceof Error ? error.message : String(error)}`
52+ );
53+ } finally {
54+ clearTimeout(timer);
55+ }
56+
57+ let data: D1ApiResponse<T>;
58+ try {
59+ data = (await response.json()) as D1ApiResponse<T>;
60+ } catch {
61+ throw new Error(
62+ `D1 response parse error: HTTP ${response.status} ${response.statusText}`
63+ );
64+ }
65+
66+ if (!data.success) {
67+ const errMsg = (data.errors ?? []).map((e) => e.message).join("; ");
68+ // DDL "already exists" errors are silently ignored (matches IF NOT EXISTS semantics).
69+ if (/already exists/i.test(errMsg)) {
70+ return { results: [], meta: { changes: 0 } };
71+ }
72+ throw new Error(`D1 query error: ${errMsg}`);
73+ }
74+
75+ const first = data.result[0];
76+ return {
77+ results: first?.results ?? [],
78+ meta: { changes: first?.meta?.changes ?? 0 }
79+ };
80+ }
81+
82+ /**
83+ * Execute one or more SQL statements (separated by `;`).
84+ * DDL "already exists" errors are silently ignored.
85+ */
86+ async exec(sql: string): Promise<void> {
87+ const statements = sql
88+ .split(";")
89+ .map((s) => s.trim())
90+ .filter((s) => s.length > 0);
91+
92+ for (const stmt of statements) {
93+ try {
94+ await this.query(stmt);
95+ } catch (error) {
96+ if (
97+ error instanceof Error &&
98+ /already exists|duplicate column/i.test(error.message)
99+ ) {
100+ continue;
101+ }
102+ throw error;
103+ }
104+ }
105+ }
106+
107+ /**
108+ * Prepare a parameterised statement.
109+ * Returns an object with `run`, `get`, and `all` — all returning Promises.
110+ */
111+ prepare(sql: string): D1PreparedStatement {
112+ return {
113+ run: async (...params: unknown[]): Promise<{ changes: number }> => {
114+ const result = await this.query(sql, params);
115+ return { changes: result.meta.changes };
116+ },
117+
118+ get: async <T = Record<string, unknown>>(
119+ ...params: unknown[]
120+ ): Promise<T | undefined> => {
121+ const result = await this.query<T>(sql, params);
122+ return result.results[0] ?? undefined;
123+ },
124+
125+ all: async <T = Record<string, unknown>>(
126+ ...params: unknown[]
127+ ): Promise<T[]> => {
128+ const result = await this.query<T>(sql, params);
129+ return result.results;
130+ }
131+ };
132+ }
133+}
134+
135+// ---------------------------------------------------------------------------
136+// Factory — returns null when D1 is not configured
137+// ---------------------------------------------------------------------------
138+
139+export interface D1EnvVars {
140+ D1_DATABASE_ID?: string;
141+ D1_ACCOUNT_ID?: string;
142+ CLOUDFLARE_API_TOKEN?: string;
143+}
144+
145+/**
146+ * Create a D1Client if all required environment variables are present.
147+ * Returns `null` (without throwing) when D1 is not configured.
148+ */
149+export function createD1Client(
150+ env: D1EnvVars,
151+ options?: { fetchImpl?: typeof globalThis.fetch; timeoutMs?: number }
152+): D1Client | null {
153+ const accountId = env.D1_ACCOUNT_ID?.trim();
154+ const databaseId = env.D1_DATABASE_ID?.trim();
155+ const apiToken = env.CLOUDFLARE_API_TOKEN?.trim();
156+
157+ if (!accountId || !databaseId || !apiToken) {
158+ return null;
159+ }
160+
161+ return new D1Client({
162+ accountId,
163+ databaseId,
164+ apiToken,
165+ fetchImpl: options?.fetchImpl,
166+ timeoutMs: options?.timeoutMs
167+ });
168+}
+78,
-0
1@@ -0,0 +1,78 @@
2+-- ============================================================================
3+-- Cloudflare D1 remote database setup
4+--
5+-- Usage:
6+-- 1. Create the D1 database:
7+-- npx wrangler d1 create baa-conductor-artifacts
8+--
9+-- 2. Run this schema against the new database:
10+-- npx wrangler d1 execute baa-conductor-artifacts --file=./src/d1-setup.sql
11+--
12+-- 3. Note the database ID from step 1 and set environment variables:
13+-- D1_DATABASE_ID=<database-id>
14+-- D1_ACCOUNT_ID=<cloudflare-account-id>
15+-- CLOUDFLARE_API_TOKEN=<api-token-with-d1-permissions>
16+-- ============================================================================
17+
18+-- messages table (mirrors local artifact.db)
19+CREATE TABLE IF NOT EXISTS messages (
20+ id TEXT PRIMARY KEY,
21+ platform TEXT NOT NULL,
22+ conversation_id TEXT,
23+ role TEXT NOT NULL,
24+ raw_text TEXT NOT NULL,
25+ summary TEXT,
26+ observed_at INTEGER NOT NULL,
27+ static_path TEXT NOT NULL,
28+ page_url TEXT,
29+ page_title TEXT,
30+ organization_id TEXT,
31+ created_at INTEGER NOT NULL
32+);
33+
34+CREATE INDEX IF NOT EXISTS idx_messages_conversation
35+ ON messages(conversation_id);
36+CREATE INDEX IF NOT EXISTS idx_messages_platform
37+ ON messages(platform, observed_at DESC);
38+
39+-- executions table (mirrors local artifact.db)
40+CREATE TABLE IF NOT EXISTS executions (
41+ instruction_id TEXT PRIMARY KEY,
42+ message_id TEXT NOT NULL,
43+ target TEXT NOT NULL,
44+ tool TEXT NOT NULL,
45+ params TEXT,
46+ params_kind TEXT NOT NULL,
47+ result_ok INTEGER NOT NULL,
48+ result_data TEXT,
49+ result_summary TEXT,
50+ result_error TEXT,
51+ http_status INTEGER,
52+ executed_at INTEGER NOT NULL,
53+ static_path TEXT NOT NULL,
54+ created_at INTEGER NOT NULL
55+);
56+
57+CREATE INDEX IF NOT EXISTS idx_executions_message
58+ ON executions(message_id);
59+CREATE INDEX IF NOT EXISTS idx_executions_target_tool
60+ ON executions(target, tool);
61+
62+-- sessions table (mirrors local artifact.db)
63+CREATE TABLE IF NOT EXISTS sessions (
64+ id TEXT PRIMARY KEY,
65+ platform TEXT NOT NULL,
66+ conversation_id TEXT,
67+ started_at INTEGER NOT NULL,
68+ last_activity_at INTEGER NOT NULL,
69+ message_count INTEGER NOT NULL DEFAULT 0,
70+ execution_count INTEGER NOT NULL DEFAULT 0,
71+ summary TEXT,
72+ static_path TEXT NOT NULL,
73+ created_at INTEGER NOT NULL
74+);
75+
76+CREATE INDEX IF NOT EXISTS idx_sessions_platform
77+ ON sessions(platform, last_activity_at DESC);
78+CREATE INDEX IF NOT EXISTS idx_sessions_conversation
79+ ON sessions(conversation_id);
+243,
-0
1@@ -0,0 +1,243 @@
2+import { describe, it } from "node:test";
3+import assert from "node:assert/strict";
4+import { tmpdir } from "node:os";
5+import { join } from "node:path";
6+import { mkdtempSync, rmSync } from "node:fs";
7+import { DatabaseSync } from "node:sqlite";
8+
9+import { D1Client, createD1Client, SyncQueue, D1SyncWorker, createD1SyncWorker } from "../dist/index.js";
10+
11+describe("createD1Client", () => {
12+ it("returns null when env vars are missing", () => {
13+ assert.strictEqual(createD1Client({}), null);
14+ assert.strictEqual(createD1Client({ D1_ACCOUNT_ID: "a" }), null);
15+ assert.strictEqual(createD1Client({ D1_ACCOUNT_ID: "a", D1_DATABASE_ID: "b" }), null);
16+ });
17+
18+ it("returns a D1Client when all env vars are set", () => {
19+ const client = createD1Client({
20+ D1_ACCOUNT_ID: "acc",
21+ D1_DATABASE_ID: "db",
22+ CLOUDFLARE_API_TOKEN: "tok"
23+ });
24+ assert.ok(client instanceof D1Client);
25+ });
26+
27+ it("trims whitespace-only env vars and returns null", () => {
28+ const client = createD1Client({
29+ D1_ACCOUNT_ID: " ",
30+ D1_DATABASE_ID: "db",
31+ CLOUDFLARE_API_TOKEN: "tok"
32+ });
33+ assert.strictEqual(client, null);
34+ });
35+});
36+
37+describe("D1Client", () => {
38+ it("prepare returns an object with run, get, all methods", () => {
39+ const client = new D1Client({
40+ accountId: "acc",
41+ databaseId: "db",
42+ apiToken: "tok"
43+ });
44+ const stmt = client.prepare("SELECT 1");
45+ assert.strictEqual(typeof stmt.run, "function");
46+ assert.strictEqual(typeof stmt.get, "function");
47+ assert.strictEqual(typeof stmt.all, "function");
48+ });
49+});
50+
51+describe("SyncQueue", () => {
52+ it("enqueue and dequeue sync records", () => {
53+ const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
54+ try {
55+ const db = new DatabaseSync(join(tmpDir, "test.db"));
56+ const queue = new SyncQueue(db);
57+
58+ queue.enqueueSyncRecord({
59+ tableName: "messages",
60+ recordId: "msg_001",
61+ operation: "insert",
62+ payload: { id: "msg_001", text: "hello" }
63+ });
64+
65+ const records = queue.dequeuePendingSyncRecords(10);
66+ assert.strictEqual(records.length, 1);
67+ assert.strictEqual(records[0].tableName, "messages");
68+ assert.strictEqual(records[0].recordId, "msg_001");
69+ assert.strictEqual(records[0].operation, "insert");
70+ assert.strictEqual(records[0].status, "pending");
71+ assert.strictEqual(records[0].attempts, 0);
72+
73+ const payload = JSON.parse(records[0].payload);
74+ assert.strictEqual(payload.id, "msg_001");
75+
76+ db.close();
77+ } finally {
78+ rmSync(tmpDir, { recursive: true, force: true });
79+ }
80+ });
81+
82+ it("markSynced updates status", () => {
83+ const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
84+ try {
85+ const db = new DatabaseSync(join(tmpDir, "test.db"));
86+ const queue = new SyncQueue(db);
87+
88+ queue.enqueueSyncRecord({
89+ tableName: "messages",
90+ recordId: "msg_002",
91+ operation: "insert",
92+ payload: { id: "msg_002" }
93+ });
94+
95+ const records = queue.dequeuePendingSyncRecords(10);
96+ queue.markSynced(records[0].id);
97+
98+ const pending = queue.dequeuePendingSyncRecords(10);
99+ assert.strictEqual(pending.length, 0);
100+
101+ db.close();
102+ } finally {
103+ rmSync(tmpDir, { recursive: true, force: true });
104+ }
105+ });
106+
107+ it("markAttemptFailed increments attempts and marks failed after max", () => {
108+ const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
109+ try {
110+ const db = new DatabaseSync(join(tmpDir, "test.db"));
111+ const queue = new SyncQueue(db);
112+
113+ queue.enqueueSyncRecord({
114+ tableName: "sessions",
115+ recordId: "s_001",
116+ operation: "update",
117+ payload: { id: "s_001" }
118+ });
119+
120+ let records = queue.dequeuePendingSyncRecords(10);
121+ const id = records[0].id;
122+
123+ // Fail 9 times — should stay pending
124+ for (let i = 0; i < 9; i++) {
125+ queue.markAttemptFailed(id, i, 10);
126+ }
127+
128+ records = queue.dequeuePendingSyncRecords(10);
129+ assert.strictEqual(records.length, 1);
130+ assert.strictEqual(records[0].attempts, 9);
131+
132+ // Fail once more — should be marked failed
133+ queue.markAttemptFailed(id, 9, 10);
134+
135+ records = queue.dequeuePendingSyncRecords(10);
136+ assert.strictEqual(records.length, 0);
137+
138+ db.close();
139+ } finally {
140+ rmSync(tmpDir, { recursive: true, force: true });
141+ }
142+ });
143+
144+ it("purgeSynced removes old synced records", () => {
145+ const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
146+ try {
147+ const db = new DatabaseSync(join(tmpDir, "test.db"));
148+ const queue = new SyncQueue(db);
149+
150+ queue.enqueueSyncRecord({
151+ tableName: "messages",
152+ recordId: "msg_003",
153+ operation: "insert",
154+ payload: { id: "msg_003" }
155+ });
156+
157+ const records = queue.dequeuePendingSyncRecords(10);
158+ queue.markSynced(records[0].id);
159+
160+ // Purge records older than 0ms (everything synced)
161+ queue.purgeSynced(0);
162+
163+ // Should have been purged
164+ const remaining = queue.dequeuePendingSyncRecords(10);
165+ assert.strictEqual(remaining.length, 0);
166+
167+ db.close();
168+ } finally {
169+ rmSync(tmpDir, { recursive: true, force: true });
170+ }
171+ });
172+});
173+
174+describe("D1SyncWorker", () => {
175+ it("start and stop lifecycle", () => {
176+ const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
177+ try {
178+ const db = new DatabaseSync(join(tmpDir, "test.db"));
179+ const queue = new SyncQueue(db);
180+ const client = new D1Client({
181+ accountId: "acc",
182+ databaseId: "db",
183+ apiToken: "tok"
184+ });
185+
186+ const logs = [];
187+ const worker = new D1SyncWorker({
188+ d1: client,
189+ queue,
190+ config: { pollIntervalMs: 100 },
191+ log: (msg) => logs.push(msg)
192+ });
193+
194+ assert.strictEqual(worker.isRunning(), false);
195+ worker.start();
196+ assert.strictEqual(worker.isRunning(), true);
197+
198+ // Double start is safe
199+ worker.start();
200+ assert.strictEqual(worker.isRunning(), true);
201+
202+ worker.stop();
203+ assert.strictEqual(worker.isRunning(), false);
204+ assert.ok(logs.some((l) => l.includes("started")));
205+ assert.ok(logs.some((l) => l.includes("stopped")));
206+
207+ db.close();
208+ } finally {
209+ rmSync(tmpDir, { recursive: true, force: true });
210+ }
211+ });
212+});
213+
214+describe("createD1SyncWorker", () => {
215+ it("returns null when D1 env vars are missing", () => {
216+ const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
217+ try {
218+ const result = createD1SyncWorker({
219+ env: {},
220+ databasePath: join(tmpDir, "test.db")
221+ });
222+ assert.strictEqual(result, null);
223+ } finally {
224+ rmSync(tmpDir, { recursive: true, force: true });
225+ }
226+ });
227+
228+ it("returns a worker when D1 env vars are set", () => {
229+ const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
230+ try {
231+ const worker = createD1SyncWorker({
232+ env: {
233+ D1_ACCOUNT_ID: "acc",
234+ D1_DATABASE_ID: "db",
235+ CLOUDFLARE_API_TOKEN: "tok"
236+ },
237+ databasePath: join(tmpDir, "test.db")
238+ });
239+ assert.ok(worker instanceof D1SyncWorker);
240+ } finally {
241+ rmSync(tmpDir, { recursive: true, force: true });
242+ }
243+ });
244+});
+62,
-0
1@@ -0,0 +1,62 @@
2+import { DatabaseSync } from "node:sqlite";
3+import { createD1Client, type D1EnvVars } from "./client.js";
4+import { SyncQueue } from "./sync-queue.js";
5+import { D1SyncWorker } from "./sync-worker.js";
6+import type { SyncWorkerConfig } from "./types.js";
7+
8+export { D1Client, createD1Client, type D1EnvVars } from "./client.js";
9+export { SyncQueue } from "./sync-queue.js";
10+export { D1SyncWorker, type SyncWorkerDeps } from "./sync-worker.js";
11+export {
12+ D1_SYNC_QUEUE_SCHEMA_SQL,
13+ DEFAULT_BASE_DELAY_MS,
14+ DEFAULT_BATCH_SIZE,
15+ DEFAULT_MAX_ATTEMPTS,
16+ DEFAULT_MAX_DELAY_MS,
17+ DEFAULT_POLL_INTERVAL_MS,
18+ DEFAULT_TIMEOUT_MS,
19+ type D1ApiError,
20+ type D1ApiResponse,
21+ type D1ClientConfig,
22+ type D1PreparedStatement,
23+ type D1QueryMeta,
24+ type D1QueryResult,
25+ type EnqueueSyncInput,
26+ type SyncOperation,
27+ type SyncQueueRecord,
28+ type SyncStatus,
29+ type SyncWorkerConfig
30+} from "./types.js";
31+
32+// ---------------------------------------------------------------------------
33+// Convenience factory — creates a D1SyncWorker from env vars + DB path.
34+// Returns null if D1 is not configured.
35+// ---------------------------------------------------------------------------
36+
37+export interface CreateD1SyncWorkerOptions {
38+ env: Record<string, string | undefined>;
39+ databasePath: string;
40+ fetchImpl?: typeof globalThis.fetch;
41+ syncWorkerConfig?: SyncWorkerConfig;
42+ log?: (message: string) => void;
43+}
44+
45+export function createD1SyncWorker(
46+ options: CreateD1SyncWorkerOptions
47+): D1SyncWorker | null {
48+ const d1 = createD1Client(options.env, { fetchImpl: options.fetchImpl });
49+
50+ if (d1 == null) {
51+ return null;
52+ }
53+
54+ const db = new DatabaseSync(options.databasePath);
55+ const queue = new SyncQueue(db);
56+
57+ return new D1SyncWorker({
58+ d1,
59+ queue,
60+ config: options.syncWorkerConfig,
61+ log: options.log
62+ });
63+}
+19,
-0
1@@ -0,0 +1,19 @@
2+declare module "node:sqlite" {
3+ export interface StatementRunResult {
4+ changes?: number;
5+ lastInsertRowid?: number | bigint;
6+ }
7+
8+ export class StatementSync {
9+ all(...params: unknown[]): unknown[];
10+ get(...params: unknown[]): unknown;
11+ run(...params: unknown[]): StatementRunResult;
12+ }
13+
14+ export class DatabaseSync {
15+ constructor(path: string);
16+ close(): void;
17+ exec(query: string): void;
18+ prepare(query: string): StatementSync;
19+ }
20+}
+141,
-0
1@@ -0,0 +1,141 @@
2+import { DatabaseSync } from "node:sqlite";
3+
4+import {
5+ D1_SYNC_QUEUE_SCHEMA_SQL,
6+ type EnqueueSyncInput,
7+ type SyncOperation,
8+ type SyncQueueRecord,
9+ type SyncStatus
10+} from "./types.js";
11+
12+// ---------------------------------------------------------------------------
13+// Row shape returned by node:sqlite
14+// ---------------------------------------------------------------------------
15+
16+interface SyncQueueRow {
17+ id: number;
18+ table_name: string;
19+ record_id: string;
20+ operation: string;
21+ payload: string;
22+ created_at: number;
23+ attempts: number;
24+ last_attempt_at: number | null;
25+ status: string;
26+}
27+
28+// ---------------------------------------------------------------------------
29+// SQL statements
30+// ---------------------------------------------------------------------------
31+
32+const INSERT_SQL = `
33+INSERT INTO d1_sync_queue (table_name, record_id, operation, payload, created_at)
34+VALUES (?, ?, ?, ?, ?);
35+`;
36+
37+const SELECT_PENDING_SQL = `
38+SELECT *
39+FROM d1_sync_queue
40+WHERE status = 'pending'
41+ORDER BY created_at ASC
42+LIMIT ?;
43+`;
44+
45+const UPDATE_SYNCED_SQL = `
46+UPDATE d1_sync_queue
47+SET status = 'synced', last_attempt_at = ?
48+WHERE id = ?;
49+`;
50+
51+const UPDATE_ATTEMPT_SQL = `
52+UPDATE d1_sync_queue
53+SET attempts = ?, last_attempt_at = ?, status = ?
54+WHERE id = ?;
55+`;
56+
57+const DELETE_SYNCED_SQL = `
58+DELETE FROM d1_sync_queue
59+WHERE status = 'synced' AND last_attempt_at < ?;
60+`;
61+
62+// ---------------------------------------------------------------------------
63+// SyncQueue
64+// ---------------------------------------------------------------------------
65+
66+export class SyncQueue {
67+ private readonly db: DatabaseSync;
68+
69+ constructor(db: DatabaseSync) {
70+ this.db = db;
71+ this.db.exec(D1_SYNC_QUEUE_SCHEMA_SQL);
72+ }
73+
74+ /**
75+ * Enqueue a record for D1 synchronisation.
76+ */
77+ enqueueSyncRecord(input: EnqueueSyncInput): void {
78+ const payload =
79+ typeof input.payload === "string"
80+ ? input.payload
81+ : JSON.stringify(input.payload);
82+
83+ this.db.prepare(INSERT_SQL).run(
84+ input.tableName,
85+ input.recordId,
86+ input.operation,
87+ payload,
88+ Date.now()
89+ );
90+ }
91+
92+ /**
93+ * Dequeue up to `limit` pending records ordered by creation time.
94+ */
95+ dequeuePendingSyncRecords(limit: number): SyncQueueRecord[] {
96+ const rows = this.db.prepare(SELECT_PENDING_SQL).all(limit) as SyncQueueRow[];
97+ return rows.map(mapRow);
98+ }
99+
100+ /**
101+ * Mark a record as successfully synced.
102+ */
103+ markSynced(id: number): void {
104+ this.db.prepare(UPDATE_SYNCED_SQL).run(Date.now(), id);
105+ }
106+
107+ /**
108+ * Record a failed attempt. If `attempts >= maxAttempts` the status becomes
109+ * `'failed'`; otherwise it stays `'pending'` for retry.
110+ */
111+ markAttemptFailed(id: number, currentAttempts: number, maxAttempts: number): void {
112+ const newAttempts = currentAttempts + 1;
113+ const newStatus: SyncStatus = newAttempts >= maxAttempts ? "failed" : "pending";
114+ this.db.prepare(UPDATE_ATTEMPT_SQL).run(newAttempts, Date.now(), newStatus, id);
115+ }
116+
117+ /**
118+ * Purge synced records older than `olderThanMs` milliseconds.
119+ */
120+ purgeSynced(olderThanMs: number): void {
121+ const cutoff = Date.now() - olderThanMs;
122+ this.db.prepare(DELETE_SYNCED_SQL).run(cutoff);
123+ }
124+}
125+
126+// ---------------------------------------------------------------------------
127+// Helpers
128+// ---------------------------------------------------------------------------
129+
130+function mapRow(row: SyncQueueRow): SyncQueueRecord {
131+ return {
132+ id: row.id,
133+ tableName: row.table_name,
134+ recordId: row.record_id,
135+ operation: row.operation as SyncOperation,
136+ payload: row.payload,
137+ createdAt: row.created_at,
138+ attempts: row.attempts,
139+ lastAttemptAt: row.last_attempt_at,
140+ status: row.status as SyncStatus
141+ };
142+}
+248,
-0
1@@ -0,0 +1,248 @@
2+import type { D1Client } from "./client.js";
3+import type { SyncQueue } from "./sync-queue.js";
4+import type { SyncQueueRecord, SyncWorkerConfig } from "./types.js";
5+import {
6+ DEFAULT_BASE_DELAY_MS,
7+ DEFAULT_BATCH_SIZE,
8+ DEFAULT_MAX_ATTEMPTS,
9+ DEFAULT_MAX_DELAY_MS,
10+ DEFAULT_POLL_INTERVAL_MS
11+} from "./types.js";
12+
13+export interface SyncWorkerDeps {
14+ d1: D1Client;
15+ queue: SyncQueue;
16+ config?: SyncWorkerConfig;
17+ log?: (message: string) => void;
18+}
19+
20+/**
21+ * Background sync worker that periodically drains the local d1_sync_queue
22+ * and pushes records to Cloudflare D1.
23+ *
24+ * - Runs on a configurable poll interval.
25+ * - Uses exponential backoff per-record on failure (1s -> 2s -> 4s -> ... max 5 min).
26+ * - Marks records as `'failed'` after 10 consecutive failures.
27+ * - Worker errors never propagate to the caller — they are logged and swallowed.
28+ */
29+export class D1SyncWorker {
30+ private readonly d1: D1Client;
31+ private readonly queue: SyncQueue;
32+ private readonly pollIntervalMs: number;
33+ private readonly batchSize: number;
34+ private readonly maxAttempts: number;
35+ private readonly baseDelayMs: number;
36+ private readonly maxDelayMs: number;
37+ private readonly log: (message: string) => void;
38+
39+ private timer: ReturnType<typeof globalThis.setTimeout> | null = null;
40+ private running = false;
41+ private processing = false;
42+
43+ constructor(deps: SyncWorkerDeps) {
44+ this.d1 = deps.d1;
45+ this.queue = deps.queue;
46+ this.pollIntervalMs = deps.config?.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS;
47+ this.batchSize = deps.config?.batchSize ?? DEFAULT_BATCH_SIZE;
48+ this.maxAttempts = deps.config?.maxAttempts ?? DEFAULT_MAX_ATTEMPTS;
49+ this.baseDelayMs = deps.config?.baseDelayMs ?? DEFAULT_BASE_DELAY_MS;
50+ this.maxDelayMs = deps.config?.maxDelayMs ?? DEFAULT_MAX_DELAY_MS;
51+ this.log = deps.log ?? defaultLog;
52+ }
53+
54+ /**
55+ * Start the background polling loop. Safe to call multiple times — subsequent
56+ * calls are no-ops while the worker is already running.
57+ */
58+ start(): void {
59+ if (this.running) {
60+ return;
61+ }
62+
63+ this.running = true;
64+ this.log("[d1-sync] worker started");
65+ this.scheduleNext();
66+ }
67+
68+ /**
69+ * Stop the worker. In-flight processing will finish but no new cycles start.
70+ */
71+ stop(): void {
72+ this.running = false;
73+
74+ if (this.timer != null) {
75+ clearTimeout(this.timer);
76+ this.timer = null;
77+ }
78+
79+ this.log("[d1-sync] worker stopped");
80+ }
81+
82+ isRunning(): boolean {
83+ return this.running;
84+ }
85+
86+ // -------------------------------------------------------------------------
87+ // Internal
88+ // -------------------------------------------------------------------------
89+
90+ private scheduleNext(): void {
91+ if (!this.running) {
92+ return;
93+ }
94+
95+ this.timer = setTimeout(() => {
96+ void this.tick();
97+ }, this.pollIntervalMs);
98+ }
99+
100+ private async tick(): Promise<void> {
101+ if (!this.running || this.processing) {
102+ this.scheduleNext();
103+ return;
104+ }
105+
106+ this.processing = true;
107+
108+ try {
109+ await this.processBatch();
110+ } catch (error) {
111+ this.log(
112+ `[d1-sync] unexpected error: ${error instanceof Error ? error.message : String(error)}`
113+ );
114+ } finally {
115+ this.processing = false;
116+ this.scheduleNext();
117+ }
118+ }
119+
120+ private async processBatch(): Promise<void> {
121+ const records = this.queue.dequeuePendingSyncRecords(this.batchSize);
122+
123+ if (records.length === 0) {
124+ return;
125+ }
126+
127+ this.log(`[d1-sync] processing ${records.length} pending record(s)`);
128+
129+ for (const record of records) {
130+ if (!this.running) {
131+ break;
132+ }
133+
134+ // Exponential backoff: skip records whose next-retry time hasn't arrived.
135+ if (record.attempts > 0 && record.lastAttemptAt != null) {
136+ const delay = computeBackoff(
137+ record.attempts,
138+ this.baseDelayMs,
139+ this.maxDelayMs
140+ );
141+ const nextRetryAt = record.lastAttemptAt + delay;
142+
143+ if (Date.now() < nextRetryAt) {
144+ continue;
145+ }
146+ }
147+
148+ await this.syncRecord(record);
149+ }
150+ }
151+
152+ private async syncRecord(record: SyncQueueRecord): Promise<void> {
153+ try {
154+ const payload: Record<string, unknown> = JSON.parse(record.payload);
155+ const sql = buildSyncSql(record.tableName, record.operation, payload);
156+
157+ if (sql == null) {
158+ // Unknown operation — mark synced to avoid infinite retries.
159+ this.queue.markSynced(record.id);
160+ return;
161+ }
162+
163+ await this.d1.prepare(sql.statement).run(...sql.params);
164+ this.queue.markSynced(record.id);
165+ } catch (error) {
166+ this.queue.markAttemptFailed(record.id, record.attempts, this.maxAttempts);
167+ this.log(
168+ `[d1-sync] record ${record.id} (${record.tableName}/${record.recordId}) ` +
169+ `attempt ${record.attempts + 1} failed: ` +
170+ `${error instanceof Error ? error.message : String(error)}`
171+ );
172+ }
173+ }
174+}
175+
176+// ---------------------------------------------------------------------------
177+// SQL generation for sync
178+// ---------------------------------------------------------------------------
179+
180+interface GeneratedSql {
181+ statement: string;
182+ params: unknown[];
183+}
184+
185+function buildSyncSql(
186+ tableName: string,
187+ operation: string,
188+ payload: Record<string, unknown>
189+): GeneratedSql | null {
190+ switch (operation) {
191+ case "insert":
192+ case "update":
193+ return buildUpsertSql(tableName, payload);
194+ case "delete":
195+ return buildDeleteSql(tableName, payload);
196+ default:
197+ return null;
198+ }
199+}
200+
201+function buildUpsertSql(
202+ tableName: string,
203+ payload: Record<string, unknown>
204+): GeneratedSql {
205+ const keys = Object.keys(payload);
206+ const placeholders = keys.map(() => "?").join(", ");
207+ const columns = keys.join(", ");
208+ const updates = keys.map((k) => `${k} = excluded.${k}`).join(", ");
209+ const params = keys.map((k) => payload[k]);
210+
211+ // Use INSERT OR REPLACE which covers both insert and update.
212+ const statement =
213+ `INSERT INTO ${tableName} (${columns}) VALUES (${placeholders}) ` +
214+ `ON CONFLICT DO UPDATE SET ${updates};`;
215+
216+ return { statement, params };
217+}
218+
219+function buildDeleteSql(
220+ tableName: string,
221+ payload: Record<string, unknown>
222+): GeneratedSql {
223+ // The payload for deletes is expected to contain the primary key column(s).
224+ const keys = Object.keys(payload);
225+ const conditions = keys.map((k) => `${k} = ?`).join(" AND ");
226+ const params = keys.map((k) => payload[k]);
227+
228+ return {
229+ statement: `DELETE FROM ${tableName} WHERE ${conditions};`,
230+ params
231+ };
232+}
233+
234+// ---------------------------------------------------------------------------
235+// Helpers
236+// ---------------------------------------------------------------------------
237+
238+function computeBackoff(
239+ attempts: number,
240+ baseMs: number,
241+ maxMs: number
242+): number {
243+ const delay = baseMs * Math.pow(2, attempts - 1);
244+ return Math.min(delay, maxMs);
245+}
246+
247+function defaultLog(message: string): void {
248+ console.log(message);
249+}
+126,
-0
1@@ -0,0 +1,126 @@
2+// ---------------------------------------------------------------------------
3+// D1 Client configuration
4+// ---------------------------------------------------------------------------
5+
6+export interface D1ClientConfig {
7+ accountId: string;
8+ databaseId: string;
9+ apiToken: string;
10+ /** Optional custom fetch implementation (defaults to globalThis.fetch). */
11+ fetchImpl?: typeof globalThis.fetch;
12+ /** HTTP request timeout in milliseconds (default 30 000). */
13+ timeoutMs?: number;
14+}
15+
16+// ---------------------------------------------------------------------------
17+// D1 REST API response types
18+// ---------------------------------------------------------------------------
19+
20+export interface D1ApiError {
21+ code: number;
22+ message: string;
23+}
24+
25+export interface D1QueryMeta {
26+ changes: number;
27+ changed_db: boolean;
28+ duration: number;
29+ last_row_id: number;
30+ rows_read: number;
31+ rows_written: number;
32+ size_after: number;
33+}
34+
35+export interface D1QueryResult<T = Record<string, unknown>> {
36+ results: T[];
37+ meta: D1QueryMeta;
38+ success: boolean;
39+}
40+
41+export interface D1ApiResponse<T = Record<string, unknown>> {
42+ errors: D1ApiError[];
43+ messages: string[];
44+ result: D1QueryResult<T>[];
45+ success: boolean;
46+}
47+
48+// ---------------------------------------------------------------------------
49+// Prepared statement interface
50+// ---------------------------------------------------------------------------
51+
52+export interface D1PreparedStatement {
53+ run(...params: unknown[]): Promise<{ changes: number }>;
54+ get<T = Record<string, unknown>>(...params: unknown[]): Promise<T | undefined>;
55+ all<T = Record<string, unknown>>(...params: unknown[]): Promise<T[]>;
56+}
57+
58+// ---------------------------------------------------------------------------
59+// Sync queue types
60+// ---------------------------------------------------------------------------
61+
62+export type SyncOperation = "insert" | "update" | "delete";
63+export type SyncStatus = "pending" | "synced" | "failed";
64+
65+export interface SyncQueueRecord {
66+ id: number;
67+ tableName: string;
68+ recordId: string;
69+ operation: SyncOperation;
70+ payload: string;
71+ createdAt: number;
72+ attempts: number;
73+ lastAttemptAt: number | null;
74+ status: SyncStatus;
75+}
76+
77+export interface EnqueueSyncInput {
78+ tableName: string;
79+ recordId: string;
80+ operation: SyncOperation;
81+ payload: unknown;
82+}
83+
84+// ---------------------------------------------------------------------------
85+// Sync worker configuration
86+// ---------------------------------------------------------------------------
87+
88+export interface SyncWorkerConfig {
89+ /** Polling interval in milliseconds (default 10 000). */
90+ pollIntervalMs?: number;
91+ /** Maximum number of records to process per batch (default 50). */
92+ batchSize?: number;
93+ /** Maximum retry attempts before marking as failed (default 10). */
94+ maxAttempts?: number;
95+ /** Base delay for exponential backoff in milliseconds (default 1000). */
96+ baseDelayMs?: number;
97+ /** Maximum backoff delay in milliseconds (default 300 000 = 5 minutes). */
98+ maxDelayMs?: number;
99+}
100+
101+export const DEFAULT_POLL_INTERVAL_MS = 10_000;
102+export const DEFAULT_BATCH_SIZE = 50;
103+export const DEFAULT_MAX_ATTEMPTS = 10;
104+export const DEFAULT_BASE_DELAY_MS = 1_000;
105+export const DEFAULT_MAX_DELAY_MS = 300_000;
106+export const DEFAULT_TIMEOUT_MS = 30_000;
107+
108+// ---------------------------------------------------------------------------
109+// Sync queue schema
110+// ---------------------------------------------------------------------------
111+
112+export const D1_SYNC_QUEUE_SCHEMA_SQL = `
113+CREATE TABLE IF NOT EXISTS d1_sync_queue (
114+ id INTEGER PRIMARY KEY AUTOINCREMENT,
115+ table_name TEXT NOT NULL,
116+ record_id TEXT NOT NULL,
117+ operation TEXT NOT NULL,
118+ payload TEXT NOT NULL,
119+ created_at INTEGER NOT NULL,
120+ attempts INTEGER NOT NULL DEFAULT 0,
121+ last_attempt_at INTEGER,
122+ status TEXT NOT NULL DEFAULT 'pending'
123+);
124+
125+CREATE INDEX IF NOT EXISTS idx_d1_sync_queue_status
126+ ON d1_sync_queue(status, created_at);
127+`;
+9,
-0
1@@ -0,0 +1,9 @@
2+{
3+ "extends": "../../tsconfig.base.json",
4+ "compilerOptions": {
5+ "declaration": true,
6+ "rootDir": "src",
7+ "outDir": "dist"
8+ },
9+ "include": ["src/**/*.ts", "src/**/*.d.ts"]
10+}
+5,
-0
1@@ -21,6 +21,9 @@ importers:
2 '@baa-conductor/artifact-db':
3 specifier: workspace:*
4 version: link:../../packages/artifact-db
5+ '@baa-conductor/d1-client':
6+ specifier: workspace:*
7+ version: link:../../packages/d1-client
8 '@baa-conductor/db':
9 specifier: workspace:*
10 version: link:../../packages/db
11@@ -42,6 +45,8 @@ importers:
12
13 packages/codex-exec: {}
14
15+ packages/d1-client: {}
16+
17 packages/db: {}
18
19 packages/git-tools: {}
+29,
-5
1@@ -2,7 +2,7 @@
2
3 ## 状态
4
5-- 当前状态:`待开始`
6+- 当前状态:`已完成`
7 - 规模预估:`M`
8 - 依赖任务:`T-S039`(需要表结构,但代码可并行开发)
9 - 建议执行者:`Codex`(新 package,参考旧版 D1Client 重写为 TypeScript async)
10@@ -167,21 +167,45 @@
11
12 ### 开始执行
13
14-- 执行者:
15-- 开始时间:
16+- 执行者:Claude Code (Opus 4.6)
17+- 开始时间:2026-03-29
18 - 状态变更:`待开始` → `进行中`
19
20 ### 完成摘要
21
22-- 完成时间:
23+- 完成时间:2026-03-29
24 - 状态变更:`进行中` → `已完成`
25 - 修改了哪些文件:
26+ - `packages/d1-client/` — 新建 package(6 个源文件 + 1 个测试文件 + 1 个 SQL 建库脚本)
27+ - `src/types.ts` — D1 客户端、同步队列、sync worker 类型定义 + sync_queue 建表 SQL
28+ - `src/client.ts` — async D1 HTTP API 客户端(fetch 实现)+ `createD1Client` 工厂
29+ - `src/sync-queue.ts` — SyncQueue 类,操作本地 d1_sync_queue 表
30+ - `src/sync-worker.ts` — D1SyncWorker 后台同步 worker,指数退避重试
31+ - `src/index.ts` — 导出 + `createD1SyncWorker` 便捷工厂
32+ - `src/node-shims.d.ts` — node:sqlite 类型声明
33+ - `src/d1-setup.sql` — D1 远程数据库建表脚本
34+ - `src/index.test.js` — 11 个单元测试
35+ - `apps/conductor-daemon/src/index.ts` — 导入 d1-client,ConductorRuntime 构造时创建 sync worker,start/stop 时启停
36+ - `apps/conductor-daemon/package.json` — 新增 @baa-conductor/d1-client 依赖和 build 步骤
37 - 核心实现思路:
38+ - D1Client 用 fetch + AbortController 调 Cloudflare D1 REST API,接口兼容旧版(exec / prepare.run / prepare.get / prepare.all),全异步
39+ - createD1Client 工厂从环境变量创建客户端,缺少任何变量时返回 null(静默跳过)
40+ - SyncQueue 直接操作本地 SQLite d1_sync_queue 表,提供 enqueue / dequeue / markSynced / markAttemptFailed / purgeSynced
41+ - D1SyncWorker 定时扫描 pending 记录,每条记录独立推送到 D1(INSERT OR REPLACE),失败按指数退避(1s→2s→4s...最大5分钟),10 次后标记 failed
42+ - createD1SyncWorker 便捷工厂封装了 DatabaseSync 创建 + SyncQueue 初始化 + D1Client 创建,conductor-daemon 只需一行调用
43+ - ConductorRuntime 构造时通过 createD1SyncWorker 创建 worker(D1 未配置时返回 null),start() 时启动,stop() 时停止
44 - 跑了哪些测试:
45+ - `pnpm -F @baa-conductor/d1-client test`:11 个测试全部通过
46+ - `pnpm build`:全量 workspace build 成功
47+ - conductor-daemon 测试中的失败是 main 分支上已存在的问题(localApiBase 相关),与本次修改无关
48
49 ### 执行过程中遇到的问题
50
51-> 记录执行过程中遇到的阻塞、环境问题、临时绕过方案等。合并时由合并者判断是否需要修复或建新任务。
52+- conductor-daemon 没有 node:sqlite 类型声明(node-shims.d.ts),不能直接 import DatabaseSync。解决方案:在 d1-client 中提供 createD1SyncWorker 工厂函数,封装 DatabaseSync 依赖,daemon 只导入工厂函数
53+- conductor-daemon 的 ConductorRuntime 构造函数不是 async,不能用 dynamic import。解决方案:在 d1-client 的 index.ts 中用顶层 static import
54
55 ### 剩余风险
56
57+- sync worker 目前不会自动清理 synced 记录(purgeSynced 方法已实现但未自动调用),长期运行可能积累大量已同步记录。可在后续版本中添加定期清理逻辑
58+- 尚未与 artifact-db 的 insertMessage/insertExecution 集成(自动往 sync_queue 插入记录)——这需要在 T-S039 的 ArtifactStore 中添加 hook 或在调用方手动 enqueue
59+