baa-conductor

git clone 

baa-conductor / packages / d1-client / src
codex@macbookpro  ·  2026-04-01

index.test.js

  1import { describe, it } from "node:test";
  2import assert from "node:assert/strict";
  3import { tmpdir } from "node:os";
  4import { join } from "node:path";
  5import { mkdtempSync, readFileSync, rmSync } from "node:fs";
  6import { DatabaseSync } from "node:sqlite";
  7
  8import { D1Client, createD1Client, SyncQueue, D1SyncWorker, createD1SyncWorker } from "../dist/index.js";
  9
 10describe("createD1Client", () => {
 11  it("returns null when env vars are missing", () => {
 12    assert.strictEqual(createD1Client({}), null);
 13    assert.strictEqual(createD1Client({ D1_ACCOUNT_ID: "a" }), null);
 14    assert.strictEqual(createD1Client({ D1_ACCOUNT_ID: "a", D1_DATABASE_ID: "b" }), null);
 15  });
 16
 17  it("returns a D1Client when all env vars are set", () => {
 18    const client = createD1Client({
 19      D1_ACCOUNT_ID: "acc",
 20      D1_DATABASE_ID: "db",
 21      CLOUDFLARE_API_TOKEN: "tok"
 22    });
 23    assert.ok(client instanceof D1Client);
 24  });
 25
 26  it("trims whitespace-only env vars and returns null", () => {
 27    const client = createD1Client({
 28      D1_ACCOUNT_ID: "  ",
 29      D1_DATABASE_ID: "db",
 30      CLOUDFLARE_API_TOKEN: "tok"
 31    });
 32    assert.strictEqual(client, null);
 33  });
 34});
 35
 36describe("D1Client", () => {
 37  it("prepare returns an object with run, get, all methods", () => {
 38    const client = new D1Client({
 39      accountId: "acc",
 40      databaseId: "db",
 41      apiToken: "tok"
 42    });
 43    const stmt = client.prepare("SELECT 1");
 44    assert.strictEqual(typeof stmt.run, "function");
 45    assert.strictEqual(typeof stmt.get, "function");
 46    assert.strictEqual(typeof stmt.all, "function");
 47  });
 48});
 49
 50describe("SyncQueue", () => {
 51  it("enqueue and dequeue sync records", () => {
 52    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
 53    try {
 54      const db = new DatabaseSync(join(tmpDir, "test.db"));
 55      const queue = new SyncQueue(db);
 56
 57      queue.enqueueSyncRecord({
 58        tableName: "messages",
 59        recordId: "msg_001",
 60        operation: "insert",
 61        payload: { id: "msg_001", text: "hello" }
 62      });
 63
 64      const records = queue.dequeuePendingSyncRecords(10);
 65      assert.strictEqual(records.length, 1);
 66      assert.strictEqual(records[0].tableName, "messages");
 67      assert.strictEqual(records[0].recordId, "msg_001");
 68      assert.strictEqual(records[0].operation, "insert");
 69      assert.strictEqual(records[0].status, "pending");
 70      assert.strictEqual(records[0].attempts, 0);
 71
 72      const payload = JSON.parse(records[0].payload);
 73      assert.strictEqual(payload.id, "msg_001");
 74
 75      db.close();
 76    } finally {
 77      rmSync(tmpDir, { recursive: true, force: true });
 78    }
 79  });
 80
 81  it("markSynced updates status", () => {
 82    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
 83    try {
 84      const db = new DatabaseSync(join(tmpDir, "test.db"));
 85      const queue = new SyncQueue(db);
 86
 87      queue.enqueueSyncRecord({
 88        tableName: "messages",
 89        recordId: "msg_002",
 90        operation: "insert",
 91        payload: { id: "msg_002" }
 92      });
 93
 94      const records = queue.dequeuePendingSyncRecords(10);
 95      queue.markSynced(records[0].id);
 96
 97      const pending = queue.dequeuePendingSyncRecords(10);
 98      assert.strictEqual(pending.length, 0);
 99
100      db.close();
101    } finally {
102      rmSync(tmpDir, { recursive: true, force: true });
103    }
104  });
105
106  it("markAttemptFailed increments attempts and marks failed after max", () => {
107    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
108    try {
109      const db = new DatabaseSync(join(tmpDir, "test.db"));
110      const queue = new SyncQueue(db);
111
112      queue.enqueueSyncRecord({
113        tableName: "sessions",
114        recordId: "s_001",
115        operation: "update",
116        payload: { id: "s_001" }
117      });
118
119      let records = queue.dequeuePendingSyncRecords(10);
120      const id = records[0].id;
121
122      // Fail 9 times — should stay pending
123      for (let i = 0; i < 9; i++) {
124        queue.markAttemptFailed(id, i, 10);
125      }
126
127      records = queue.dequeuePendingSyncRecords(10);
128      assert.strictEqual(records.length, 1);
129      assert.strictEqual(records[0].attempts, 9);
130
131      // Fail once more — should be marked failed
132      queue.markAttemptFailed(id, 9, 10);
133
134      records = queue.dequeuePendingSyncRecords(10);
135      assert.strictEqual(records.length, 0);
136
137      db.close();
138    } finally {
139      rmSync(tmpDir, { recursive: true, force: true });
140    }
141  });
142
143  it("purgeSynced removes old synced records", () => {
144    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
145    try {
146      const db = new DatabaseSync(join(tmpDir, "test.db"));
147      const queue = new SyncQueue(db);
148
149      queue.enqueueSyncRecord({
150        tableName: "messages",
151        recordId: "msg_003",
152        operation: "insert",
153        payload: { id: "msg_003" }
154      });
155
156      const records = queue.dequeuePendingSyncRecords(10);
157      queue.markSynced(records[0].id);
158
159      // Purge records older than 0ms (everything synced)
160      queue.purgeSynced(0);
161
162      // Should have been purged
163      const remaining = queue.dequeuePendingSyncRecords(10);
164      assert.strictEqual(remaining.length, 0);
165
166      db.close();
167    } finally {
168      rmSync(tmpDir, { recursive: true, force: true });
169    }
170  });
171});
172
173describe("D1SyncWorker", () => {
174  it("start and stop lifecycle", () => {
175    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
176    try {
177      const db = new DatabaseSync(join(tmpDir, "test.db"));
178      const queue = new SyncQueue(db);
179      const client = new D1Client({
180        accountId: "acc",
181        databaseId: "db",
182        apiToken: "tok"
183      });
184
185      const logs = [];
186      const worker = new D1SyncWorker({
187        d1: client,
188        queue,
189        config: { pollIntervalMs: 100 },
190        log: (msg) => logs.push(msg)
191      });
192
193      assert.strictEqual(worker.isRunning(), false);
194      worker.start();
195      assert.strictEqual(worker.isRunning(), true);
196
197      // Double start is safe
198      worker.start();
199      assert.strictEqual(worker.isRunning(), true);
200
201      worker.stop();
202      assert.strictEqual(worker.isRunning(), false);
203      assert.ok(logs.some((l) => l.includes("started")));
204      assert.ok(logs.some((l) => l.includes("stopped")));
205
206      db.close();
207    } finally {
208      rmSync(tmpDir, { recursive: true, force: true });
209    }
210  });
211
212  it("syncs records for whitelisted tables and columns", async () => {
213    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
214
215    try {
216      const db = new DatabaseSync(join(tmpDir, "test.db"));
217      const queue = new SyncQueue(db);
218      const prepared = [];
219      const worker = new D1SyncWorker({
220        d1: {
221          prepare(statement) {
222            const entry = {
223              statement,
224              params: []
225            };
226            prepared.push(entry);
227
228            return {
229              async run(...params) {
230                entry.params = params;
231              }
232            };
233          }
234        },
235        queue,
236        log: () => {}
237      });
238
239      queue.enqueueSyncRecord({
240        tableName: "messages",
241        recordId: "msg_004",
242        operation: "insert",
243        payload: {
244          id: "msg_004",
245          platform: "chatgpt",
246          role: "assistant",
247          raw_text: "hello",
248          observed_at: 123,
249          static_path: "/tmp/msg_004.txt",
250          created_at: 123
251        }
252      });
253
254      const [record] = queue.dequeuePendingSyncRecords(10);
255      await worker.syncRecord(record);
256
257      assert.equal(prepared.length, 1);
258      assert.match(prepared[0].statement, /^INSERT INTO messages \(/);
259      assert.deepEqual(prepared[0].params, [
260        "msg_004",
261        "chatgpt",
262        "assistant",
263        "hello",
264        123,
265        "/tmp/msg_004.txt",
266        123
267      ]);
268      assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
269
270      db.close();
271    } finally {
272      rmSync(tmpDir, { recursive: true, force: true });
273    }
274  });
275
276  it("syncs renewal storage tables with whitelisted columns", async () => {
277    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
278
279    try {
280      const db = new DatabaseSync(join(tmpDir, "test.db"));
281      const queue = new SyncQueue(db);
282      const prepared = [];
283      const worker = new D1SyncWorker({
284        d1: {
285          prepare(statement) {
286            const entry = {
287              statement,
288              params: []
289            };
290            prepared.push(entry);
291
292            return {
293              async run(...params) {
294                entry.params = params;
295              }
296            };
297          }
298        },
299        queue,
300        log: () => {}
301      });
302
303      queue.enqueueSyncRecord({
304        tableName: "local_conversations",
305        recordId: "lc_001",
306        operation: "insert",
307        payload: {
308          local_conversation_id: "lc_001",
309          platform: "claude",
310          automation_status: "auto",
311          last_non_paused_automation_status: "auto",
312          pause_reason: "execution_failure",
313          last_error: "boom",
314          execution_state: "idle",
315          consecutive_failure_count: 2,
316          repeated_message_count: 1,
317          repeated_renewal_count: 0,
318          last_message_fingerprint: "msg-hash",
319          last_renewal_fingerprint: "renew-hash",
320          title: "Renewal thread",
321          summary: "Latest summary",
322          last_message_id: "msg_001",
323          last_message_at: 122,
324          cooldown_until: 140,
325          paused_at: 141,
326          created_at: 123,
327          updated_at: 124
328        }
329      });
330      queue.enqueueSyncRecord({
331        tableName: "renewal_jobs",
332        recordId: "job_001",
333        operation: "update",
334        payload: {
335          job_id: "job_001",
336          local_conversation_id: "lc_001",
337          message_id: "msg_001",
338          status: "running",
339          payload: "[renew]",
340          payload_kind: "text",
341          target_snapshot: "{\"clientId\":\"firefox-claude\"}",
342          attempt_count: 1,
343          max_attempts: 3,
344          next_attempt_at: null,
345          last_attempt_at: 456,
346          last_error: null,
347          log_path: "logs/renewal/2026-03-28.jsonl",
348          started_at: 456,
349          finished_at: null,
350          created_at: 123,
351          updated_at: 456
352        }
353      });
354      queue.enqueueSyncRecord({
355        tableName: "browser_request_policy_state",
356        recordId: "global",
357        operation: "update",
358        payload: {
359          state_key: "global",
360          value_json: "{\"version\":1,\"platforms\":[],\"targets\":[]}",
361          updated_at: 789
362        }
363      });
364
365      const records = queue.dequeuePendingSyncRecords(10);
366      await worker.syncRecord(records[0]);
367      await worker.syncRecord(records[1]);
368      await worker.syncRecord(records[2]);
369
370      assert.equal(prepared.length, 3);
371      assert.match(prepared[0].statement, /^INSERT INTO local_conversations \(/);
372      assert.deepEqual(prepared[0].params, [
373        "lc_001",
374        "claude",
375        "auto",
376        "auto",
377        "execution_failure",
378        "boom",
379        "idle",
380        2,
381        1,
382        0,
383        "msg-hash",
384        "renew-hash",
385        "Renewal thread",
386        "Latest summary",
387        "msg_001",
388        122,
389        140,
390        141,
391        123,
392        124
393      ]);
394      assert.match(prepared[1].statement, /^INSERT INTO renewal_jobs \(/);
395      assert.deepEqual(prepared[1].params, [
396        "job_001",
397        "lc_001",
398        "msg_001",
399        "running",
400        "[renew]",
401        "text",
402        "{\"clientId\":\"firefox-claude\"}",
403        1,
404        3,
405        null,
406        456,
407        null,
408        "logs/renewal/2026-03-28.jsonl",
409        456,
410        null,
411        123,
412        456
413      ]);
414      assert.match(prepared[2].statement, /^INSERT INTO browser_request_policy_state \(/);
415      assert.deepEqual(prepared[2].params, [
416        "global",
417        "{\"version\":1,\"platforms\":[],\"targets\":[]}",
418        789
419      ]);
420      assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
421
422      db.close();
423    } finally {
424      rmSync(tmpDir, { recursive: true, force: true });
425    }
426  });
427
428  it("preserves created_at for renewal storage rows during D1 upsert conflicts", async () => {
429    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
430    let queueDb = null;
431    let remoteDb = null;
432
433    try {
434      queueDb = new DatabaseSync(join(tmpDir, "queue.db"));
435      remoteDb = new DatabaseSync(join(tmpDir, "remote.db"));
436      remoteDb.exec(readFileSync(new URL("./d1-setup.sql", import.meta.url), "utf8"));
437
438      const queue = new SyncQueue(queueDb);
439      const worker = new D1SyncWorker({
440        d1: {
441          prepare(statement) {
442            const prepared = remoteDb.prepare(statement);
443            return {
444              async run(...params) {
445                prepared.run(...params);
446              }
447            };
448          }
449        },
450        queue,
451        log: () => {}
452      });
453
454      queue.enqueueSyncRecord({
455        tableName: "local_conversations",
456        recordId: "lc_immutable",
457        operation: "insert",
458        payload: {
459          local_conversation_id: "lc_immutable",
460          platform: "claude",
461          automation_status: "manual",
462          title: "Before",
463          created_at: 100,
464          updated_at: 110
465        }
466      });
467      queue.enqueueSyncRecord({
468        tableName: "messages",
469        recordId: "msg_immutable",
470        operation: "insert",
471        payload: {
472          id: "msg_immutable",
473          platform: "claude",
474          conversation_id: "conv_immutable",
475          role: "assistant",
476          raw_text: "hello",
477          observed_at: 200,
478          static_path: "/tmp/msg_immutable.txt",
479          created_at: 200
480        }
481      });
482      queue.enqueueSyncRecord({
483        tableName: "conversation_links",
484        recordId: "link_immutable",
485        operation: "insert",
486        payload: {
487          link_id: "link_immutable",
488          local_conversation_id: "lc_immutable",
489          platform: "claude",
490          remote_conversation_id: "conv_immutable",
491          page_title: "Before",
492          observed_at: 210,
493          created_at: 210,
494          updated_at: 220
495        }
496      });
497      queue.enqueueSyncRecord({
498        tableName: "renewal_jobs",
499        recordId: "job_immutable",
500        operation: "insert",
501        payload: {
502          job_id: "job_immutable",
503          local_conversation_id: "lc_immutable",
504          message_id: "msg_immutable",
505          status: "pending",
506          payload: "[renew]",
507          payload_kind: "text",
508          attempt_count: 0,
509          max_attempts: 3,
510          next_attempt_at: 300,
511          created_at: 300,
512          updated_at: 310
513        }
514      });
515
516      for (const record of queue.dequeuePendingSyncRecords(10)) {
517        await worker.syncRecord(record);
518      }
519
520      queue.enqueueSyncRecord({
521        tableName: "local_conversations",
522        recordId: "lc_immutable",
523        operation: "update",
524        payload: {
525          local_conversation_id: "lc_immutable",
526          platform: "claude",
527          automation_status: "auto",
528          title: "After",
529          created_at: 999,
530          updated_at: 410
531        }
532      });
533      queue.enqueueSyncRecord({
534        tableName: "conversation_links",
535        recordId: "link_immutable",
536        operation: "update",
537        payload: {
538          link_id: "link_immutable",
539          local_conversation_id: "lc_immutable",
540          platform: "claude",
541          remote_conversation_id: "conv_immutable",
542          page_title: "After",
543          observed_at: 420,
544          created_at: 999,
545          updated_at: 430
546        }
547      });
548      queue.enqueueSyncRecord({
549        tableName: "renewal_jobs",
550        recordId: "job_immutable",
551        operation: "update",
552        payload: {
553          job_id: "job_immutable",
554          local_conversation_id: "lc_immutable",
555          message_id: "msg_immutable",
556          status: "running",
557          payload: "[renew] updated",
558          payload_kind: "text",
559          attempt_count: 1,
560          max_attempts: 3,
561          next_attempt_at: null,
562          last_attempt_at: 430,
563          created_at: 999,
564          updated_at: 440
565        }
566      });
567
568      for (const record of queue.dequeuePendingSyncRecords(10)) {
569        await worker.syncRecord(record);
570      }
571
572      const localConversation = remoteDb.prepare(`
573        SELECT created_at, updated_at, automation_status, title
574        FROM local_conversations
575        WHERE local_conversation_id = ?;
576      `).get("lc_immutable");
577      const conversationLink = remoteDb.prepare(`
578        SELECT created_at, updated_at, page_title
579        FROM conversation_links
580        WHERE link_id = ?;
581      `).get("link_immutable");
582      const renewalJob = remoteDb.prepare(`
583        SELECT created_at, updated_at, status, payload
584        FROM renewal_jobs
585        WHERE job_id = ?;
586      `).get("job_immutable");
587
588      assert.equal(localConversation?.created_at, 100);
589      assert.equal(localConversation?.updated_at, 410);
590      assert.equal(localConversation?.automation_status, "auto");
591      assert.equal(localConversation?.title, "After");
592
593      assert.equal(conversationLink?.created_at, 210);
594      assert.equal(conversationLink?.updated_at, 430);
595      assert.equal(conversationLink?.page_title, "After");
596
597      assert.equal(renewalJob?.created_at, 300);
598      assert.equal(renewalJob?.updated_at, 440);
599      assert.equal(renewalJob?.status, "running");
600      assert.equal(renewalJob?.payload, "[renew] updated");
601    } finally {
602      queueDb?.close();
603      remoteDb?.close();
604      rmSync(tmpDir, { recursive: true, force: true });
605    }
606  });
607
608  it("rejects records for non-whitelisted tables", async () => {
609    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
610
611    try {
612      const db = new DatabaseSync(join(tmpDir, "test.db"));
613      const queue = new SyncQueue(db);
614      const prepared = [];
615      const worker = new D1SyncWorker({
616        d1: {
617          prepare(statement) {
618            prepared.push(statement);
619            return {
620              async run() {}
621            };
622          }
623        },
624        queue,
625        log: () => {}
626      });
627
628      queue.enqueueSyncRecord({
629        tableName: "messages; DROP TABLE messages; --",
630        recordId: "msg_005",
631        operation: "insert",
632        payload: {
633          id: "msg_005",
634          raw_text: "hello"
635        }
636      });
637
638      const [record] = queue.dequeuePendingSyncRecords(10);
639      await worker.syncRecord(record);
640
641      assert.equal(prepared.length, 0);
642      assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
643
644      db.close();
645    } finally {
646      rmSync(tmpDir, { recursive: true, force: true });
647    }
648  });
649
650  it("rejects records with non-whitelisted columns", async () => {
651    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
652
653    try {
654      const db = new DatabaseSync(join(tmpDir, "test.db"));
655      const queue = new SyncQueue(db);
656      const prepared = [];
657      const worker = new D1SyncWorker({
658        d1: {
659          prepare(statement) {
660            prepared.push(statement);
661            return {
662              async run() {}
663            };
664          }
665        },
666        queue,
667        log: () => {}
668      });
669
670      queue.enqueueSyncRecord({
671        tableName: "messages",
672        recordId: "msg_006",
673        operation: "update",
674        payload: {
675          id: "msg_006",
676          raw_text: "hello",
677          hacked_column: "nope"
678        }
679      });
680
681      const [record] = queue.dequeuePendingSyncRecords(10);
682      await worker.syncRecord(record);
683
684      assert.equal(prepared.length, 0);
685      assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
686
687      db.close();
688    } finally {
689      rmSync(tmpDir, { recursive: true, force: true });
690    }
691  });
692});
693
694describe("createD1SyncWorker", () => {
695  it("returns null when D1 env vars are missing", () => {
696    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
697    try {
698      const result = createD1SyncWorker({
699        env: {},
700        databasePath: join(tmpDir, "test.db")
701      });
702      assert.strictEqual(result, null);
703    } finally {
704      rmSync(tmpDir, { recursive: true, force: true });
705    }
706  });
707
708  it("returns a worker when D1 env vars are set", () => {
709    const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
710    try {
711      const worker = createD1SyncWorker({
712        env: {
713          D1_ACCOUNT_ID: "acc",
714          D1_DATABASE_ID: "db",
715          CLOUDFLARE_API_TOKEN: "tok"
716        },
717        databasePath: join(tmpDir, "test.db")
718      });
719      assert.ok(worker instanceof D1SyncWorker);
720    } finally {
721      rmSync(tmpDir, { recursive: true, force: true });
722    }
723  });
724});
725
726describe("d1-setup.sql", () => {
727  it("contains renewal storage tables", () => {
728    const setupSql = readFileSync(new URL("./d1-setup.sql", import.meta.url), "utf8");
729
730    assert.match(setupSql, /CREATE TABLE IF NOT EXISTS local_conversations/u);
731    assert.match(setupSql, /CREATE TABLE IF NOT EXISTS conversation_links/u);
732    assert.match(setupSql, /CREATE TABLE IF NOT EXISTS renewal_jobs/u);
733    assert.match(setupSql, /CREATE TABLE IF NOT EXISTS browser_request_policy_state/u);
734    assert.match(setupSql, /idx_conversation_links_null_route/u);
735    assert.match(setupSql, /idx_conversation_links_null_page/u);
736    assert.match(setupSql, /idx_renewal_jobs_status_due/u);
737    assert.match(setupSql, /idx_browser_request_policy_state_updated/u);
738  });
739});