baa-conductor

git clone 

commit
95a1772
parent
8809f1c
author
im_wower
date
2026-03-29 02:13:39 +0800 CST
fix: harden sync SQL and transport shutdown
9 files changed,  +582, -35
M apps/claude-coded/src/daemon.ts
+10, -1
 1@@ -27,18 +27,27 @@ export interface ClaudeCodedChildProcessLike {
 2     on(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
 3     on(event: "end", listener: () => void): unknown;
 4     on(event: "error", listener: (error: Error) => void): unknown;
 5+    off?(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
 6+    off?(event: "end", listener: () => void): unknown;
 7+    off?(event: "error", listener: (error: Error) => void): unknown;
 8     setEncoding?(encoding: string): unknown;
 9   };
10   stdout?: {
11     on(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
12     on(event: "end", listener: () => void): unknown;
13     on(event: "error", listener: (error: Error) => void): unknown;
14+    off?(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
15+    off?(event: "end", listener: () => void): unknown;
16+    off?(event: "error", listener: (error: Error) => void): unknown;
17     setEncoding?(encoding: string): unknown;
18   };
19   kill(signal?: string): boolean;
20   on(event: "error", listener: (error: Error) => void): unknown;
21   on(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
22   on(event: "spawn", listener: () => void): unknown;
23+  off?(event: "error", listener: (error: Error) => void): unknown;
24+  off?(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
25+  off?(event: "spawn", listener: () => void): unknown;
26   once(event: "error", listener: (error: Error) => void): unknown;
27   once(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
28   once(event: "spawn", listener: () => void): unknown;
29@@ -479,7 +488,7 @@ export class ClaudeCodedDaemon {
30       onCloseDiagnostic: (diagnostic) => {
31         this.recordEvent({
32           level: "warn",
33-          type: "transport.closed",
34+          type: diagnostic.source === "stderr.error" ? "transport.stderr.error" : "transport.closed",
35           message: diagnostic.message,
36           detail: {
37             source: diagnostic.source,
A apps/claude-coded/src/stream-json-transport.test.js
+89, -0
 1@@ -0,0 +1,89 @@
 2+import assert from "node:assert/strict";
 3+import { EventEmitter } from "node:events";
 4+import test from "node:test";
 5+
 6+import { createStreamJsonTransport } from "../dist/index.js";
 7+
 8+class MockReadable extends EventEmitter {
 9+  setEncoding() {}
10+}
11+
12+class MockWritable {
13+  endCallCount = 0;
14+  writes = [];
15+
16+  end(chunk) {
17+    this.endCallCount += 1;
18+    if (chunk != null) {
19+      this.writes.push(chunk);
20+    }
21+  }
22+
23+  write(chunk) {
24+    this.writes.push(chunk);
25+    return true;
26+  }
27+}
28+
29+class MockProcess extends EventEmitter {
30+  constructor() {
31+    super();
32+    this.stdin = new MockWritable();
33+    this.stdout = new MockReadable();
34+    this.stderr = new MockReadable();
35+  }
36+}
37+
38+test("createStreamJsonTransport removes stdio listeners on close", () => {
39+  const child = new MockProcess();
40+  const transport = createStreamJsonTransport({ process: child });
41+
42+  transport.connect();
43+
44+  assert.equal(child.stdout.listenerCount("data"), 1);
45+  assert.equal(child.stdout.listenerCount("end"), 1);
46+  assert.equal(child.stdout.listenerCount("error"), 1);
47+  assert.equal(child.stderr.listenerCount("data"), 1);
48+  assert.equal(child.stderr.listenerCount("error"), 1);
49+  assert.equal(child.listenerCount("error"), 1);
50+  assert.equal(child.listenerCount("exit"), 1);
51+
52+  transport.close();
53+
54+  assert.equal(child.stdout.listenerCount("data"), 0);
55+  assert.equal(child.stdout.listenerCount("end"), 0);
56+  assert.equal(child.stdout.listenerCount("error"), 0);
57+  assert.equal(child.stderr.listenerCount("data"), 0);
58+  assert.equal(child.stderr.listenerCount("error"), 0);
59+  assert.equal(child.listenerCount("error"), 0);
60+  assert.equal(child.listenerCount("exit"), 0);
61+  assert.equal(child.stdin.endCallCount, 1);
62+  assert.equal(transport.closed, true);
63+  assert.equal(transport.connected, false);
64+});
65+
66+test("createStreamJsonTransport captures stderr stream errors without closing", () => {
67+  const child = new MockProcess();
68+  const diagnostics = [];
69+  const stderrMessages = [];
70+  const transport = createStreamJsonTransport({
71+    process: child,
72+    onCloseDiagnostic: (diagnostic) => diagnostics.push(diagnostic),
73+    onStderr: (text) => stderrMessages.push(text)
74+  });
75+
76+  transport.connect();
77+  child.stderr.emit("error", new Error("stderr failed"));
78+
79+  assert.equal(transport.closed, false);
80+  assert.equal(transport.connected, true);
81+  assert.deepEqual(stderrMessages, [
82+    "Claude Code stderr stream error: stderr failed"
83+  ]);
84+  assert.deepEqual(diagnostics, [
85+    {
86+      source: "stderr.error",
87+      message: "Claude Code stderr stream error: stderr failed"
88+    }
89+  ]);
90+});
M apps/claude-coded/src/stream-json-transport.ts
+69, -20
  1@@ -7,6 +7,9 @@ export interface StreamJsonReadableStream {
  2   on(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
  3   on(event: "end", listener: () => void): unknown;
  4   on(event: "error", listener: (error: Error) => void): unknown;
  5+  off?(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
  6+  off?(event: "end", listener: () => void): unknown;
  7+  off?(event: "error", listener: (error: Error) => void): unknown;
  8   setEncoding?(encoding: string): unknown;
  9 }
 10 
 11@@ -17,11 +20,14 @@ export interface StreamJsonProcessLike {
 12   stdout?: StreamJsonReadableStream;
 13   on(event: "error", listener: (error: Error) => void): unknown;
 14   on(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
 15+  off?(event: "error", listener: (error: Error) => void): unknown;
 16+  off?(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
 17 }
 18 
 19 export type StreamJsonCloseSource =
 20   | "process.error"
 21   | "process.exit"
 22+  | "stderr.error"
 23   | "stdout.end"
 24   | "stdout.error";
 25 
 26@@ -56,6 +62,16 @@ export function createStreamJsonTransport(config: StreamJsonTransportConfig): St
 27   let buffer = "";
 28   let closed = false;
 29   let connected = false;
 30+  let detachListeners: (() => void) | null = null;
 31+  const decoder = new TextDecoder();
 32+
 33+  const decodeChunk = (chunk: string | Uint8Array): string =>
 34+    typeof chunk === "string" ? chunk : decoder.decode(chunk);
 35+
 36+  const cleanupListeners = (): void => {
 37+    detachListeners?.();
 38+    detachListeners = null;
 39+  };
 40 
 41   const emitBufferedMessages = (): void => {
 42     while (true) {
 43@@ -86,6 +102,10 @@ export function createStreamJsonTransport(config: StreamJsonTransportConfig): St
 44     }
 45   };
 46 
 47+  const emitDiagnostic = (diagnostic: StreamJsonCloseDiagnostic): void => {
 48+    config.onCloseDiagnostic?.(diagnostic);
 49+  };
 50+
 51   const closeTransport = (
 52     error: Error,
 53     diagnostic: Omit<StreamJsonCloseDiagnostic, "message">
 54@@ -96,7 +116,9 @@ export function createStreamJsonTransport(config: StreamJsonTransportConfig): St
 55 
 56     closed = true;
 57     connected = false;
 58-    config.onCloseDiagnostic?.({
 59+    cleanupListeners();
 60+    buffer = "";
 61+    emitDiagnostic({
 62       ...diagnostic,
 63       message: error.message
 64     });
 65@@ -127,35 +149,37 @@ export function createStreamJsonTransport(config: StreamJsonTransportConfig): St
 66 
 67       connected = true;
 68       stdout.setEncoding?.("utf8");
 69-      stdout.on("data", (chunk) => {
 70-        buffer += typeof chunk === "string" ? chunk : new TextDecoder().decode(chunk);
 71+      const handleStdoutData = (chunk: string | Uint8Array): void => {
 72+        buffer += decodeChunk(chunk);
 73         emitBufferedMessages();
 74-      });
 75-      stdout.on("end", () => {
 76+      };
 77+      const handleStdoutEnd = (): void => {
 78         closeTransport(new Error("Claude Code stdout ended."), {
 79           source: "stdout.end"
 80         });
 81-      });
 82-      stdout.on("error", (error) => {
 83+      };
 84+      const handleStdoutError = (error: Error): void => {
 85         closeTransport(error, {
 86           source: "stdout.error"
 87         });
 88-      });
 89-
 90-      if (stderr != null) {
 91-        stderr.setEncoding?.("utf8");
 92-        stderr.on("data", (chunk) => {
 93-          const text = typeof chunk === "string" ? chunk : new TextDecoder().decode(chunk);
 94-          config.onStderr?.(text);
 95+      };
 96+      const handleStderrData = (chunk: string | Uint8Array): void => {
 97+        config.onStderr?.(decodeChunk(chunk));
 98+      };
 99+      const handleStderrError = (error: Error): void => {
100+        const message = `Claude Code stderr stream error: ${error.message}`;
101+        config.onStderr?.(message);
102+        emitDiagnostic({
103+          source: "stderr.error",
104+          message
105         });
106-      }
107-
108-      config.process.on("error", (error) => {
109+      };
110+      const handleProcessError = (error: Error): void => {
111         closeTransport(error, {
112           source: "process.error"
113         });
114-      });
115-      config.process.on("exit", (code, signal) => {
116+      };
117+      const handleProcessExit = (code: number | null, signal: string | null): void => {
118         closeTransport(
119           new Error(
120             `Claude Code child exited (code=${String(code)}, signal=${String(signal)}).`
121@@ -166,7 +190,30 @@ export function createStreamJsonTransport(config: StreamJsonTransportConfig): St
122             source: "process.exit"
123           }
124         );
125-      });
126+      };
127+
128+      stdout.on("data", handleStdoutData);
129+      stdout.on("end", handleStdoutEnd);
130+      stdout.on("error", handleStdoutError);
131+
132+      if (stderr != null) {
133+        stderr.setEncoding?.("utf8");
134+        stderr.on("data", handleStderrData);
135+        stderr.on("error", handleStderrError);
136+      }
137+
138+      config.process.on("error", handleProcessError);
139+      config.process.on("exit", handleProcessExit);
140+
141+      detachListeners = () => {
142+        stdout.off?.("data", handleStdoutData);
143+        stdout.off?.("end", handleStdoutEnd);
144+        stdout.off?.("error", handleStdoutError);
145+        stderr?.off?.("data", handleStderrData);
146+        stderr?.off?.("error", handleStderrError);
147+        config.process.off?.("error", handleProcessError);
148+        config.process.off?.("exit", handleProcessExit);
149+      };
150     },
151 
152     send(message: Record<string, unknown>): void {
153@@ -189,6 +236,8 @@ export function createStreamJsonTransport(config: StreamJsonTransportConfig): St
154 
155       closed = true;
156       connected = false;
157+      cleanupListeners();
158+      buffer = "";
159 
160       try {
161         config.process.stdin?.end();
M apps/conductor-daemon/src/index.test.js
+149, -0
  1@@ -4353,6 +4353,155 @@ test("ConductorRuntime exposes a minimal runtime snapshot for CLI and status sur
  2   });
  3 });
  4 
  5+function createStubRuntimeDaemon(overrides = {}) {
  6+  return {
  7+    start: async () => createStubRuntimeDaemonSnapshot(),
  8+    stop: async () => createStubRuntimeDaemonSnapshot(),
  9+    getStatusSnapshot: () => createStubRuntimeDaemonSnapshot(),
 10+    getStartupChecklist: () => [],
 11+    describeIdentity: () => "mini-main@mini(primary)",
 12+    getLoopStatus: () => ({
 13+      heartbeat: false,
 14+      lease: false
 15+    }),
 16+    ...overrides
 17+  };
 18+}
 19+
 20+function createStubRuntimeDaemonSnapshot() {
 21+  return {
 22+    nodeId: "mini-main",
 23+    host: "mini",
 24+    role: "primary",
 25+    leaseState: "standby",
 26+    schedulerEnabled: false,
 27+    currentLeaderId: null,
 28+    currentTerm: null,
 29+    leaseExpiresAt: null,
 30+    lastHeartbeatAt: null,
 31+    lastLeaseOperation: null,
 32+    nextLeaseOperation: "acquire",
 33+    consecutiveRenewFailures: 0,
 34+    lastError: null
 35+  };
 36+}
 37+
 38+test("ConductorRuntime awaits daemon shutdown when local API startup fails", async () => {
 39+  const stateDir = mkdtempSync(join(tmpdir(), "baa-conductor-runtime-start-stop-await-"));
 40+  const runtime = new ConductorRuntime(
 41+    {
 42+      nodeId: "mini-main",
 43+      host: "mini",
 44+      role: "primary",
 45+      controlApiBase: "https://control.example.test",
 46+      localApiBase: "http://127.0.0.1:0",
 47+      sharedToken: "replace-me",
 48+      paths: {
 49+        runsDir: "/tmp/runs",
 50+        stateDir
 51+      }
 52+    },
 53+    {
 54+      autoStartLoops: false,
 55+      now: () => 100
 56+    }
 57+  );
 58+
 59+  let daemonStopResolved = false;
 60+
 61+  runtime.localControlPlaneInitialized = true;
 62+  runtime.daemon = createStubRuntimeDaemon({
 63+    stop: async () => {
 64+      await new Promise((resolve) => setTimeout(resolve, 0));
 65+      daemonStopResolved = true;
 66+      return createStubRuntimeDaemonSnapshot();
 67+    }
 68+  });
 69+  runtime.localApiServer = {
 70+    start: async () => {
 71+      throw new Error("local api start failed");
 72+    },
 73+    stop: async () => {},
 74+    getBaseUrl: () => "http://127.0.0.1:0",
 75+    getFirefoxWebSocketUrl: () => null
 76+  };
 77+
 78+  try {
 79+    await assert.rejects(runtime.start(), /local api start failed/u);
 80+    assert.equal(daemonStopResolved, true);
 81+    assert.equal(runtime.getRuntimeSnapshot().runtime.started, false);
 82+  } finally {
 83+    rmSync(stateDir, {
 84+      force: true,
 85+      recursive: true
 86+    });
 87+  }
 88+});
 89+
 90+test("ConductorRuntime.stop waits for daemon shutdown before closing the local API server", async () => {
 91+  const stateDir = mkdtempSync(join(tmpdir(), "baa-conductor-runtime-stop-await-"));
 92+  const runtime = new ConductorRuntime(
 93+    {
 94+      nodeId: "mini-main",
 95+      host: "mini",
 96+      role: "primary",
 97+      controlApiBase: "https://control.example.test",
 98+      localApiBase: "http://127.0.0.1:0",
 99+      sharedToken: "replace-me",
100+      paths: {
101+        runsDir: "/tmp/runs",
102+        stateDir
103+      }
104+    },
105+    {
106+      autoStartLoops: false,
107+      now: () => 100
108+    }
109+  );
110+
111+  let daemonStopResolved = false;
112+  let releaseDaemonStop = null;
113+  let localApiStopSawDaemonResolved = false;
114+
115+  runtime.started = true;
116+  runtime.daemon = createStubRuntimeDaemon({
117+    stop: async () => {
118+      await new Promise((resolve) => {
119+        releaseDaemonStop = () => {
120+          daemonStopResolved = true;
121+          resolve();
122+        };
123+      });
124+      return createStubRuntimeDaemonSnapshot();
125+    }
126+  });
127+  runtime.localApiServer = {
128+    start: async () => {},
129+    stop: async () => {
130+      localApiStopSawDaemonResolved = daemonStopResolved;
131+    },
132+    getBaseUrl: () => "http://127.0.0.1:0",
133+    getFirefoxWebSocketUrl: () => null
134+  };
135+
136+  try {
137+    const stopPromise = runtime.stop();
138+    await new Promise((resolve) => setTimeout(resolve, 0));
139+    assert.equal(localApiStopSawDaemonResolved, false);
140+
141+    releaseDaemonStop();
142+    const stoppedSnapshot = await stopPromise;
143+
144+    assert.equal(stoppedSnapshot.runtime.started, false);
145+    assert.equal(localApiStopSawDaemonResolved, true);
146+  } finally {
147+    rmSync(stateDir, {
148+      force: true,
149+      recursive: true
150+    });
151+  }
152+});
153+
154 test("ConductorRuntime.stop closes active Firefox bridge clients and releases the local API listener", async () => {
155   const stateDir = mkdtempSync(join(tmpdir(), "baa-conductor-runtime-stop-"));
156   const runtime = new ConductorRuntime(
M apps/conductor-daemon/src/index.ts
+2, -2
 1@@ -2199,7 +2199,7 @@ export class ConductorRuntime {
 2         await this.localApiServer?.start();
 3       } catch (error) {
 4         this.started = false;
 5-        this.daemon.stop();
 6+        await this.daemon.stop();
 7         throw error;
 8       }
 9 
10@@ -2212,7 +2212,7 @@ export class ConductorRuntime {
11   async stop(): Promise<ConductorRuntimeSnapshot> {
12     this.started = false;
13     this.d1SyncWorker?.stop();
14-    this.daemon.stop();
15+    await this.daemon.stop();
16     await this.localApiServer?.stop();
17 
18     return this.getRuntimeSnapshot();
M packages/d1-client/src/d1-setup.sql
+2, -1
 1@@ -50,7 +50,8 @@ CREATE TABLE IF NOT EXISTS executions (
 2   http_status     INTEGER,
 3   executed_at     INTEGER NOT NULL,
 4   static_path     TEXT NOT NULL,
 5-  created_at      INTEGER NOT NULL
 6+  created_at      INTEGER NOT NULL,
 7+  FOREIGN KEY (message_id) REFERENCES messages(id)
 8 );
 9 
10 CREATE INDEX IF NOT EXISTS idx_executions_message
M packages/d1-client/src/index.test.js
+149, -0
  1@@ -208,6 +208,155 @@ describe("D1SyncWorker", () => {
  2       rmSync(tmpDir, { recursive: true, force: true });
  3     }
  4   });
  5+
  6+  it("syncs records for whitelisted tables and columns", async () => {
  7+    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
  8+
  9+    try {
 10+      const db = new DatabaseSync(join(tmpDir, "test.db"));
 11+      const queue = new SyncQueue(db);
 12+      const prepared = [];
 13+      const worker = new D1SyncWorker({
 14+        d1: {
 15+          prepare(statement) {
 16+            const entry = {
 17+              statement,
 18+              params: []
 19+            };
 20+            prepared.push(entry);
 21+
 22+            return {
 23+              async run(...params) {
 24+                entry.params = params;
 25+              }
 26+            };
 27+          }
 28+        },
 29+        queue,
 30+        log: () => {}
 31+      });
 32+
 33+      queue.enqueueSyncRecord({
 34+        tableName: "messages",
 35+        recordId: "msg_004",
 36+        operation: "insert",
 37+        payload: {
 38+          id: "msg_004",
 39+          platform: "chatgpt",
 40+          role: "assistant",
 41+          raw_text: "hello",
 42+          observed_at: 123,
 43+          static_path: "/tmp/msg_004.txt",
 44+          created_at: 123
 45+        }
 46+      });
 47+
 48+      const [record] = queue.dequeuePendingSyncRecords(10);
 49+      await worker.syncRecord(record);
 50+
 51+      assert.equal(prepared.length, 1);
 52+      assert.match(prepared[0].statement, /^INSERT INTO messages \(/);
 53+      assert.deepEqual(prepared[0].params, [
 54+        "msg_004",
 55+        "chatgpt",
 56+        "assistant",
 57+        "hello",
 58+        123,
 59+        "/tmp/msg_004.txt",
 60+        123
 61+      ]);
 62+      assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
 63+
 64+      db.close();
 65+    } finally {
 66+      rmSync(tmpDir, { recursive: true, force: true });
 67+    }
 68+  });
 69+
 70+  it("rejects records for non-whitelisted tables", async () => {
 71+    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
 72+
 73+    try {
 74+      const db = new DatabaseSync(join(tmpDir, "test.db"));
 75+      const queue = new SyncQueue(db);
 76+      const prepared = [];
 77+      const worker = new D1SyncWorker({
 78+        d1: {
 79+          prepare(statement) {
 80+            prepared.push(statement);
 81+            return {
 82+              async run() {}
 83+            };
 84+          }
 85+        },
 86+        queue,
 87+        log: () => {}
 88+      });
 89+
 90+      queue.enqueueSyncRecord({
 91+        tableName: "messages; DROP TABLE messages; --",
 92+        recordId: "msg_005",
 93+        operation: "insert",
 94+        payload: {
 95+          id: "msg_005",
 96+          raw_text: "hello"
 97+        }
 98+      });
 99+
100+      const [record] = queue.dequeuePendingSyncRecords(10);
101+      await worker.syncRecord(record);
102+
103+      assert.equal(prepared.length, 0);
104+      assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
105+
106+      db.close();
107+    } finally {
108+      rmSync(tmpDir, { recursive: true, force: true });
109+    }
110+  });
111+
112+  it("rejects records with non-whitelisted columns", async () => {
113+    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
114+
115+    try {
116+      const db = new DatabaseSync(join(tmpDir, "test.db"));
117+      const queue = new SyncQueue(db);
118+      const prepared = [];
119+      const worker = new D1SyncWorker({
120+        d1: {
121+          prepare(statement) {
122+            prepared.push(statement);
123+            return {
124+              async run() {}
125+            };
126+          }
127+        },
128+        queue,
129+        log: () => {}
130+      });
131+
132+      queue.enqueueSyncRecord({
133+        tableName: "messages",
134+        recordId: "msg_006",
135+        operation: "update",
136+        payload: {
137+          id: "msg_006",
138+          raw_text: "hello",
139+          hacked_column: "nope"
140+        }
141+      });
142+
143+      const [record] = queue.dequeuePendingSyncRecords(10);
144+      await worker.syncRecord(record);
145+
146+      assert.equal(prepared.length, 0);
147+      assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
148+
149+      db.close();
150+    } finally {
151+      rmSync(tmpDir, { recursive: true, force: true });
152+    }
153+  });
154 });
155 
156 describe("createD1SyncWorker", () => {
M packages/d1-client/src/sync-worker.ts
+85, -7
  1@@ -11,6 +11,53 @@ import {
  2 
  3 const DEFAULT_PURGE_KEEP_COUNT = 1000;
  4 
  5+const SYNC_COLUMN_WHITELIST = {
  6+  messages: new Set([
  7+    "id",
  8+    "platform",
  9+    "conversation_id",
 10+    "role",
 11+    "raw_text",
 12+    "summary",
 13+    "observed_at",
 14+    "static_path",
 15+    "page_url",
 16+    "page_title",
 17+    "organization_id",
 18+    "created_at"
 19+  ]),
 20+  executions: new Set([
 21+    "instruction_id",
 22+    "message_id",
 23+    "target",
 24+    "tool",
 25+    "params",
 26+    "params_kind",
 27+    "result_ok",
 28+    "result_data",
 29+    "result_summary",
 30+    "result_error",
 31+    "http_status",
 32+    "executed_at",
 33+    "static_path",
 34+    "created_at"
 35+  ]),
 36+  sessions: new Set([
 37+    "id",
 38+    "platform",
 39+    "conversation_id",
 40+    "started_at",
 41+    "last_activity_at",
 42+    "message_count",
 43+    "execution_count",
 44+    "summary",
 45+    "static_path",
 46+    "created_at"
 47+  ])
 48+} as const;
 49+
 50+type SyncTableName = keyof typeof SYNC_COLUMN_WHITELIST;
 51+
 52 export interface SyncWorkerDeps {
 53   d1: D1Client;
 54   queue: SyncQueue;
 55@@ -173,7 +220,7 @@ export class D1SyncWorker {
 56       const sql = buildSyncSql(record.tableName, record.operation, payload);
 57 
 58       if (sql == null) {
 59-        // Unknown operation — mark synced to avoid infinite retries.
 60+        // Invalid or unsupported sync payloads are marked synced to avoid infinite retries.
 61         this.queue.markSynced(record.id);
 62         return;
 63       }
 64@@ -205,12 +252,18 @@ function buildSyncSql(
 65   operation: string,
 66   payload: Record<string, unknown>
 67 ): GeneratedSql | null {
 68+  const keys = getWhitelistedPayloadKeys(tableName, payload);
 69+
 70+  if (keys == null) {
 71+    return null;
 72+  }
 73+
 74   switch (operation) {
 75     case "insert":
 76     case "update":
 77-      return buildUpsertSql(tableName, payload);
 78+      return buildUpsertSql(tableName, payload, keys);
 79     case "delete":
 80-      return buildDeleteSql(tableName, payload);
 81+      return buildDeleteSql(tableName, payload, keys);
 82     default:
 83       return null;
 84   }
 85@@ -218,9 +271,9 @@ function buildSyncSql(
 86 
 87 function buildUpsertSql(
 88   tableName: string,
 89-  payload: Record<string, unknown>
 90+  payload: Record<string, unknown>,
 91+  keys: readonly string[]
 92 ): GeneratedSql {
 93-  const keys = Object.keys(payload);
 94   const placeholders = keys.map(() => "?").join(", ");
 95   const columns = keys.join(", ");
 96   const updates = keys.map((k) => `${k} = excluded.${k}`).join(", ");
 97@@ -236,10 +289,10 @@ function buildUpsertSql(
 98 
 99 function buildDeleteSql(
100   tableName: string,
101-  payload: Record<string, unknown>
102+  payload: Record<string, unknown>,
103+  keys: readonly string[]
104 ): GeneratedSql {
105   // The payload for deletes is expected to contain the primary key column(s).
106-  const keys = Object.keys(payload);
107   const conditions = keys.map((k) => `${k} = ?`).join(" AND ");
108   const params = keys.map((k) => payload[k]);
109 
110@@ -253,6 +306,31 @@ function buildDeleteSql(
111 // Helpers
112 // ---------------------------------------------------------------------------
113 
114+function getWhitelistedPayloadKeys(
115+  tableName: string,
116+  payload: Record<string, unknown>
117+): string[] | null {
118+  const allowedColumns = SYNC_COLUMN_WHITELIST[tableName as SyncTableName];
119+
120+  if (allowedColumns == null) {
121+    return null;
122+  }
123+
124+  const keys = Object.keys(payload);
125+
126+  if (keys.length === 0) {
127+    return null;
128+  }
129+
130+  for (const key of keys) {
131+    if (!allowedColumns.has(key)) {
132+      return null;
133+    }
134+  }
135+
136+  return keys;
137+}
138+
139 function computeBackoff(
140   attempts: number,
141   baseMs: number,
M tasks/T-BUG-027.md
+27, -4
 1@@ -2,7 +2,7 @@
 2 
 3 ## 状态
 4 
 5-- 当前状态:`待开始`
 6+- 当前状态:`已完成`
 7 - 规模预估:`M`
 8 - 依赖任务:无
 9 - 建议执行者:`Claude`(涉及安全修复和 async 生命周期管理,需要理解多个模块的交互)
10@@ -144,21 +144,44 @@
11 
12 ### 开始执行
13 
14-- 执行者:
15-- 开始时间:
16+- 执行者:`Codex (GPT-5)`
17+- 开始时间:`2026-03-29 02:02:36 +0800`
18 - 状态变更:`待开始` → `进行中`
19 
20 ### 完成摘要
21 
22-- 完成时间:
23+- 完成时间:`2026-03-29 02:12:55 +0800`
24 - 状态变更:`进行中` → `已完成`
25 - 修改了哪些文件:
26+  - `packages/d1-client/src/sync-worker.ts`
27+  - `packages/d1-client/src/d1-setup.sql`
28+  - `packages/d1-client/src/index.test.js`
29+  - `apps/claude-coded/src/stream-json-transport.ts`
30+  - `apps/claude-coded/src/daemon.ts`
31+  - `apps/claude-coded/src/stream-json-transport.test.js`
32+  - `apps/conductor-daemon/src/index.ts`
33+  - `apps/conductor-daemon/src/index.test.js`
34+  - `tasks/T-BUG-027.md`
35 - 核心实现思路:
36+  - 为 D1 sync SQL 生成增加显式表名/列名白名单,遇到非白名单表或列直接拒绝同步,避免从 sync queue payload 拼接 SQL 标识符。
37+  - 让 D1 schema 的 `executions.message_id` 与本地 artifact-db schema 对齐,补上外键约束。
38+  - 为 `stream-json-transport` 保存并清理 stdio/process 监听器,在 `close()` 和异常关闭路径都解绑;同时为 `stderr` 增加 `error` handler,并把诊断上报到 daemon 事件流。
39+  - 在 `ConductorRuntime.start()` 的本地 API 启动失败路径和 `stop()` 路径都 `await this.daemon.stop()`,确保子进程停止完成后再继续收尾。
40 - 跑了哪些测试:
41+  - `cd /Users/george/code/baa-conductor-security-and-resource-leaks && pnpm --filter @baa-conductor/d1-client test`
42+  - `cd /Users/george/code/baa-conductor-security-and-resource-leaks && pnpm --filter @baa-conductor/claude-coded build`
43+  - `cd /Users/george/code/baa-conductor-security-and-resource-leaks && node --test apps/claude-coded/src/stream-json-transport.test.js`
44+  - `cd /Users/george/code/baa-conductor-security-and-resource-leaks && pnpm --filter @baa-conductor/conductor-daemon test`
45+  - `cd /Users/george/code/baa-conductor-security-and-resource-leaks && pnpm build`
46+  - `cd /Users/george/code/baa-conductor-security-and-resource-leaks && pnpm test`
47 
48 ### 执行过程中遇到的问题
49 
50 > 记录执行过程中遇到的阻塞、环境问题、临时绕过方案等。合并时由合并者判断是否需要修复或建新任务。
51 
52+- 新建 worktree 后缺少依赖,`pnpm exec tsc` 无法找到 `tsc`。已在独立 worktree 中执行 `pnpm install` 后继续开发和验证。
53+- 仓库级 `pnpm test` 默认不覆盖 `@baa-conductor/d1-client` 和 `@baa-conductor/claude-coded`,因此额外单独执行了这两处的定向测试。
54+
55 ### 剩余风险
56 
57+- `claude-coded` 的 transport 回归测试目前通过单独的 `node --test` 执行,尚未接入仓库级 `verify-workspace.mjs test` 流程。