codex@macbookpro
·
2026-04-01
index.test.js
1import { describe, it } from "node:test";
2import assert from "node:assert/strict";
3import { tmpdir } from "node:os";
4import { join } from "node:path";
5import { mkdtempSync, readFileSync, rmSync } from "node:fs";
6import { DatabaseSync } from "node:sqlite";
7
8import { D1Client, createD1Client, SyncQueue, D1SyncWorker, createD1SyncWorker } from "../dist/index.js";
9
10describe("createD1Client", () => {
11 it("returns null when env vars are missing", () => {
12 assert.strictEqual(createD1Client({}), null);
13 assert.strictEqual(createD1Client({ D1_ACCOUNT_ID: "a" }), null);
14 assert.strictEqual(createD1Client({ D1_ACCOUNT_ID: "a", D1_DATABASE_ID: "b" }), null);
15 });
16
17 it("returns a D1Client when all env vars are set", () => {
18 const client = createD1Client({
19 D1_ACCOUNT_ID: "acc",
20 D1_DATABASE_ID: "db",
21 CLOUDFLARE_API_TOKEN: "tok"
22 });
23 assert.ok(client instanceof D1Client);
24 });
25
26 it("trims whitespace-only env vars and returns null", () => {
27 const client = createD1Client({
28 D1_ACCOUNT_ID: " ",
29 D1_DATABASE_ID: "db",
30 CLOUDFLARE_API_TOKEN: "tok"
31 });
32 assert.strictEqual(client, null);
33 });
34});
35
36describe("D1Client", () => {
37 it("prepare returns an object with run, get, all methods", () => {
38 const client = new D1Client({
39 accountId: "acc",
40 databaseId: "db",
41 apiToken: "tok"
42 });
43 const stmt = client.prepare("SELECT 1");
44 assert.strictEqual(typeof stmt.run, "function");
45 assert.strictEqual(typeof stmt.get, "function");
46 assert.strictEqual(typeof stmt.all, "function");
47 });
48});
49
50describe("SyncQueue", () => {
51 it("enqueue and dequeue sync records", () => {
52 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
53 try {
54 const db = new DatabaseSync(join(tmpDir, "test.db"));
55 const queue = new SyncQueue(db);
56
57 queue.enqueueSyncRecord({
58 tableName: "messages",
59 recordId: "msg_001",
60 operation: "insert",
61 payload: { id: "msg_001", text: "hello" }
62 });
63
64 const records = queue.dequeuePendingSyncRecords(10);
65 assert.strictEqual(records.length, 1);
66 assert.strictEqual(records[0].tableName, "messages");
67 assert.strictEqual(records[0].recordId, "msg_001");
68 assert.strictEqual(records[0].operation, "insert");
69 assert.strictEqual(records[0].status, "pending");
70 assert.strictEqual(records[0].attempts, 0);
71
72 const payload = JSON.parse(records[0].payload);
73 assert.strictEqual(payload.id, "msg_001");
74
75 db.close();
76 } finally {
77 rmSync(tmpDir, { recursive: true, force: true });
78 }
79 });
80
81 it("markSynced updates status", () => {
82 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
83 try {
84 const db = new DatabaseSync(join(tmpDir, "test.db"));
85 const queue = new SyncQueue(db);
86
87 queue.enqueueSyncRecord({
88 tableName: "messages",
89 recordId: "msg_002",
90 operation: "insert",
91 payload: { id: "msg_002" }
92 });
93
94 const records = queue.dequeuePendingSyncRecords(10);
95 queue.markSynced(records[0].id);
96
97 const pending = queue.dequeuePendingSyncRecords(10);
98 assert.strictEqual(pending.length, 0);
99
100 db.close();
101 } finally {
102 rmSync(tmpDir, { recursive: true, force: true });
103 }
104 });
105
106 it("markAttemptFailed increments attempts and marks failed after max", () => {
107 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
108 try {
109 const db = new DatabaseSync(join(tmpDir, "test.db"));
110 const queue = new SyncQueue(db);
111
112 queue.enqueueSyncRecord({
113 tableName: "sessions",
114 recordId: "s_001",
115 operation: "update",
116 payload: { id: "s_001" }
117 });
118
119 let records = queue.dequeuePendingSyncRecords(10);
120 const id = records[0].id;
121
122 // Fail 9 times — should stay pending
123 for (let i = 0; i < 9; i++) {
124 queue.markAttemptFailed(id, i, 10);
125 }
126
127 records = queue.dequeuePendingSyncRecords(10);
128 assert.strictEqual(records.length, 1);
129 assert.strictEqual(records[0].attempts, 9);
130
131 // Fail once more — should be marked failed
132 queue.markAttemptFailed(id, 9, 10);
133
134 records = queue.dequeuePendingSyncRecords(10);
135 assert.strictEqual(records.length, 0);
136
137 db.close();
138 } finally {
139 rmSync(tmpDir, { recursive: true, force: true });
140 }
141 });
142
143 it("purgeSynced removes old synced records", () => {
144 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
145 try {
146 const db = new DatabaseSync(join(tmpDir, "test.db"));
147 const queue = new SyncQueue(db);
148
149 queue.enqueueSyncRecord({
150 tableName: "messages",
151 recordId: "msg_003",
152 operation: "insert",
153 payload: { id: "msg_003" }
154 });
155
156 const records = queue.dequeuePendingSyncRecords(10);
157 queue.markSynced(records[0].id);
158
159 // Purge records older than 0ms (everything synced)
160 queue.purgeSynced(0);
161
162 // Should have been purged
163 const remaining = queue.dequeuePendingSyncRecords(10);
164 assert.strictEqual(remaining.length, 0);
165
166 db.close();
167 } finally {
168 rmSync(tmpDir, { recursive: true, force: true });
169 }
170 });
171});
172
173describe("D1SyncWorker", () => {
174 it("start and stop lifecycle", () => {
175 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
176 try {
177 const db = new DatabaseSync(join(tmpDir, "test.db"));
178 const queue = new SyncQueue(db);
179 const client = new D1Client({
180 accountId: "acc",
181 databaseId: "db",
182 apiToken: "tok"
183 });
184
185 const logs = [];
186 const worker = new D1SyncWorker({
187 d1: client,
188 queue,
189 config: { pollIntervalMs: 100 },
190 log: (msg) => logs.push(msg)
191 });
192
193 assert.strictEqual(worker.isRunning(), false);
194 worker.start();
195 assert.strictEqual(worker.isRunning(), true);
196
197 // Double start is safe
198 worker.start();
199 assert.strictEqual(worker.isRunning(), true);
200
201 worker.stop();
202 assert.strictEqual(worker.isRunning(), false);
203 assert.ok(logs.some((l) => l.includes("started")));
204 assert.ok(logs.some((l) => l.includes("stopped")));
205
206 db.close();
207 } finally {
208 rmSync(tmpDir, { recursive: true, force: true });
209 }
210 });
211
212 it("syncs records for whitelisted tables and columns", async () => {
213 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
214
215 try {
216 const db = new DatabaseSync(join(tmpDir, "test.db"));
217 const queue = new SyncQueue(db);
218 const prepared = [];
219 const worker = new D1SyncWorker({
220 d1: {
221 prepare(statement) {
222 const entry = {
223 statement,
224 params: []
225 };
226 prepared.push(entry);
227
228 return {
229 async run(...params) {
230 entry.params = params;
231 }
232 };
233 }
234 },
235 queue,
236 log: () => {}
237 });
238
239 queue.enqueueSyncRecord({
240 tableName: "messages",
241 recordId: "msg_004",
242 operation: "insert",
243 payload: {
244 id: "msg_004",
245 platform: "chatgpt",
246 role: "assistant",
247 raw_text: "hello",
248 observed_at: 123,
249 static_path: "/tmp/msg_004.txt",
250 created_at: 123
251 }
252 });
253
254 const [record] = queue.dequeuePendingSyncRecords(10);
255 await worker.syncRecord(record);
256
257 assert.equal(prepared.length, 1);
258 assert.match(prepared[0].statement, /^INSERT INTO messages \(/);
259 assert.deepEqual(prepared[0].params, [
260 "msg_004",
261 "chatgpt",
262 "assistant",
263 "hello",
264 123,
265 "/tmp/msg_004.txt",
266 123
267 ]);
268 assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
269
270 db.close();
271 } finally {
272 rmSync(tmpDir, { recursive: true, force: true });
273 }
274 });
275
276 it("syncs renewal storage tables with whitelisted columns", async () => {
277 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
278
279 try {
280 const db = new DatabaseSync(join(tmpDir, "test.db"));
281 const queue = new SyncQueue(db);
282 const prepared = [];
283 const worker = new D1SyncWorker({
284 d1: {
285 prepare(statement) {
286 const entry = {
287 statement,
288 params: []
289 };
290 prepared.push(entry);
291
292 return {
293 async run(...params) {
294 entry.params = params;
295 }
296 };
297 }
298 },
299 queue,
300 log: () => {}
301 });
302
303 queue.enqueueSyncRecord({
304 tableName: "local_conversations",
305 recordId: "lc_001",
306 operation: "insert",
307 payload: {
308 local_conversation_id: "lc_001",
309 platform: "claude",
310 automation_status: "auto",
311 last_non_paused_automation_status: "auto",
312 pause_reason: "execution_failure",
313 last_error: "boom",
314 execution_state: "idle",
315 consecutive_failure_count: 2,
316 repeated_message_count: 1,
317 repeated_renewal_count: 0,
318 last_message_fingerprint: "msg-hash",
319 last_renewal_fingerprint: "renew-hash",
320 title: "Renewal thread",
321 summary: "Latest summary",
322 last_message_id: "msg_001",
323 last_message_at: 122,
324 cooldown_until: 140,
325 paused_at: 141,
326 created_at: 123,
327 updated_at: 124
328 }
329 });
330 queue.enqueueSyncRecord({
331 tableName: "renewal_jobs",
332 recordId: "job_001",
333 operation: "update",
334 payload: {
335 job_id: "job_001",
336 local_conversation_id: "lc_001",
337 message_id: "msg_001",
338 status: "running",
339 payload: "[renew]",
340 payload_kind: "text",
341 target_snapshot: "{\"clientId\":\"firefox-claude\"}",
342 attempt_count: 1,
343 max_attempts: 3,
344 next_attempt_at: null,
345 last_attempt_at: 456,
346 last_error: null,
347 log_path: "logs/renewal/2026-03-28.jsonl",
348 started_at: 456,
349 finished_at: null,
350 created_at: 123,
351 updated_at: 456
352 }
353 });
354 queue.enqueueSyncRecord({
355 tableName: "browser_request_policy_state",
356 recordId: "global",
357 operation: "update",
358 payload: {
359 state_key: "global",
360 value_json: "{\"version\":1,\"platforms\":[],\"targets\":[]}",
361 updated_at: 789
362 }
363 });
364
365 const records = queue.dequeuePendingSyncRecords(10);
366 await worker.syncRecord(records[0]);
367 await worker.syncRecord(records[1]);
368 await worker.syncRecord(records[2]);
369
370 assert.equal(prepared.length, 3);
371 assert.match(prepared[0].statement, /^INSERT INTO local_conversations \(/);
372 assert.deepEqual(prepared[0].params, [
373 "lc_001",
374 "claude",
375 "auto",
376 "auto",
377 "execution_failure",
378 "boom",
379 "idle",
380 2,
381 1,
382 0,
383 "msg-hash",
384 "renew-hash",
385 "Renewal thread",
386 "Latest summary",
387 "msg_001",
388 122,
389 140,
390 141,
391 123,
392 124
393 ]);
394 assert.match(prepared[1].statement, /^INSERT INTO renewal_jobs \(/);
395 assert.deepEqual(prepared[1].params, [
396 "job_001",
397 "lc_001",
398 "msg_001",
399 "running",
400 "[renew]",
401 "text",
402 "{\"clientId\":\"firefox-claude\"}",
403 1,
404 3,
405 null,
406 456,
407 null,
408 "logs/renewal/2026-03-28.jsonl",
409 456,
410 null,
411 123,
412 456
413 ]);
414 assert.match(prepared[2].statement, /^INSERT INTO browser_request_policy_state \(/);
415 assert.deepEqual(prepared[2].params, [
416 "global",
417 "{\"version\":1,\"platforms\":[],\"targets\":[]}",
418 789
419 ]);
420 assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
421
422 db.close();
423 } finally {
424 rmSync(tmpDir, { recursive: true, force: true });
425 }
426 });
427
428 it("preserves created_at for renewal storage rows during D1 upsert conflicts", async () => {
429 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
430 let queueDb = null;
431 let remoteDb = null;
432
433 try {
434 queueDb = new DatabaseSync(join(tmpDir, "queue.db"));
435 remoteDb = new DatabaseSync(join(tmpDir, "remote.db"));
436 remoteDb.exec(readFileSync(new URL("./d1-setup.sql", import.meta.url), "utf8"));
437
438 const queue = new SyncQueue(queueDb);
439 const worker = new D1SyncWorker({
440 d1: {
441 prepare(statement) {
442 const prepared = remoteDb.prepare(statement);
443 return {
444 async run(...params) {
445 prepared.run(...params);
446 }
447 };
448 }
449 },
450 queue,
451 log: () => {}
452 });
453
454 queue.enqueueSyncRecord({
455 tableName: "local_conversations",
456 recordId: "lc_immutable",
457 operation: "insert",
458 payload: {
459 local_conversation_id: "lc_immutable",
460 platform: "claude",
461 automation_status: "manual",
462 title: "Before",
463 created_at: 100,
464 updated_at: 110
465 }
466 });
467 queue.enqueueSyncRecord({
468 tableName: "messages",
469 recordId: "msg_immutable",
470 operation: "insert",
471 payload: {
472 id: "msg_immutable",
473 platform: "claude",
474 conversation_id: "conv_immutable",
475 role: "assistant",
476 raw_text: "hello",
477 observed_at: 200,
478 static_path: "/tmp/msg_immutable.txt",
479 created_at: 200
480 }
481 });
482 queue.enqueueSyncRecord({
483 tableName: "conversation_links",
484 recordId: "link_immutable",
485 operation: "insert",
486 payload: {
487 link_id: "link_immutable",
488 local_conversation_id: "lc_immutable",
489 platform: "claude",
490 remote_conversation_id: "conv_immutable",
491 page_title: "Before",
492 observed_at: 210,
493 created_at: 210,
494 updated_at: 220
495 }
496 });
497 queue.enqueueSyncRecord({
498 tableName: "renewal_jobs",
499 recordId: "job_immutable",
500 operation: "insert",
501 payload: {
502 job_id: "job_immutable",
503 local_conversation_id: "lc_immutable",
504 message_id: "msg_immutable",
505 status: "pending",
506 payload: "[renew]",
507 payload_kind: "text",
508 attempt_count: 0,
509 max_attempts: 3,
510 next_attempt_at: 300,
511 created_at: 300,
512 updated_at: 310
513 }
514 });
515
516 for (const record of queue.dequeuePendingSyncRecords(10)) {
517 await worker.syncRecord(record);
518 }
519
520 queue.enqueueSyncRecord({
521 tableName: "local_conversations",
522 recordId: "lc_immutable",
523 operation: "update",
524 payload: {
525 local_conversation_id: "lc_immutable",
526 platform: "claude",
527 automation_status: "auto",
528 title: "After",
529 created_at: 999,
530 updated_at: 410
531 }
532 });
533 queue.enqueueSyncRecord({
534 tableName: "conversation_links",
535 recordId: "link_immutable",
536 operation: "update",
537 payload: {
538 link_id: "link_immutable",
539 local_conversation_id: "lc_immutable",
540 platform: "claude",
541 remote_conversation_id: "conv_immutable",
542 page_title: "After",
543 observed_at: 420,
544 created_at: 999,
545 updated_at: 430
546 }
547 });
548 queue.enqueueSyncRecord({
549 tableName: "renewal_jobs",
550 recordId: "job_immutable",
551 operation: "update",
552 payload: {
553 job_id: "job_immutable",
554 local_conversation_id: "lc_immutable",
555 message_id: "msg_immutable",
556 status: "running",
557 payload: "[renew] updated",
558 payload_kind: "text",
559 attempt_count: 1,
560 max_attempts: 3,
561 next_attempt_at: null,
562 last_attempt_at: 430,
563 created_at: 999,
564 updated_at: 440
565 }
566 });
567
568 for (const record of queue.dequeuePendingSyncRecords(10)) {
569 await worker.syncRecord(record);
570 }
571
572 const localConversation = remoteDb.prepare(`
573 SELECT created_at, updated_at, automation_status, title
574 FROM local_conversations
575 WHERE local_conversation_id = ?;
576 `).get("lc_immutable");
577 const conversationLink = remoteDb.prepare(`
578 SELECT created_at, updated_at, page_title
579 FROM conversation_links
580 WHERE link_id = ?;
581 `).get("link_immutable");
582 const renewalJob = remoteDb.prepare(`
583 SELECT created_at, updated_at, status, payload
584 FROM renewal_jobs
585 WHERE job_id = ?;
586 `).get("job_immutable");
587
588 assert.equal(localConversation?.created_at, 100);
589 assert.equal(localConversation?.updated_at, 410);
590 assert.equal(localConversation?.automation_status, "auto");
591 assert.equal(localConversation?.title, "After");
592
593 assert.equal(conversationLink?.created_at, 210);
594 assert.equal(conversationLink?.updated_at, 430);
595 assert.equal(conversationLink?.page_title, "After");
596
597 assert.equal(renewalJob?.created_at, 300);
598 assert.equal(renewalJob?.updated_at, 440);
599 assert.equal(renewalJob?.status, "running");
600 assert.equal(renewalJob?.payload, "[renew] updated");
601 } finally {
602 queueDb?.close();
603 remoteDb?.close();
604 rmSync(tmpDir, { recursive: true, force: true });
605 }
606 });
607
608 it("rejects records for non-whitelisted tables", async () => {
609 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
610
611 try {
612 const db = new DatabaseSync(join(tmpDir, "test.db"));
613 const queue = new SyncQueue(db);
614 const prepared = [];
615 const worker = new D1SyncWorker({
616 d1: {
617 prepare(statement) {
618 prepared.push(statement);
619 return {
620 async run() {}
621 };
622 }
623 },
624 queue,
625 log: () => {}
626 });
627
628 queue.enqueueSyncRecord({
629 tableName: "messages; DROP TABLE messages; --",
630 recordId: "msg_005",
631 operation: "insert",
632 payload: {
633 id: "msg_005",
634 raw_text: "hello"
635 }
636 });
637
638 const [record] = queue.dequeuePendingSyncRecords(10);
639 await worker.syncRecord(record);
640
641 assert.equal(prepared.length, 0);
642 assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
643
644 db.close();
645 } finally {
646 rmSync(tmpDir, { recursive: true, force: true });
647 }
648 });
649
650 it("rejects records with non-whitelisted columns", async () => {
651 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
652
653 try {
654 const db = new DatabaseSync(join(tmpDir, "test.db"));
655 const queue = new SyncQueue(db);
656 const prepared = [];
657 const worker = new D1SyncWorker({
658 d1: {
659 prepare(statement) {
660 prepared.push(statement);
661 return {
662 async run() {}
663 };
664 }
665 },
666 queue,
667 log: () => {}
668 });
669
670 queue.enqueueSyncRecord({
671 tableName: "messages",
672 recordId: "msg_006",
673 operation: "update",
674 payload: {
675 id: "msg_006",
676 raw_text: "hello",
677 hacked_column: "nope"
678 }
679 });
680
681 const [record] = queue.dequeuePendingSyncRecords(10);
682 await worker.syncRecord(record);
683
684 assert.equal(prepared.length, 0);
685 assert.equal(queue.dequeuePendingSyncRecords(10).length, 0);
686
687 db.close();
688 } finally {
689 rmSync(tmpDir, { recursive: true, force: true });
690 }
691 });
692});
693
694describe("createD1SyncWorker", () => {
695 it("returns null when D1 env vars are missing", () => {
696 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
697 try {
698 const result = createD1SyncWorker({
699 env: {},
700 databasePath: join(tmpDir, "test.db")
701 });
702 assert.strictEqual(result, null);
703 } finally {
704 rmSync(tmpDir, { recursive: true, force: true });
705 }
706 });
707
708 it("returns a worker when D1 env vars are set", () => {
709 const tmpDir = mkdtempSync(join(tmpdir(), "d1-test-"));
710 try {
711 const worker = createD1SyncWorker({
712 env: {
713 D1_ACCOUNT_ID: "acc",
714 D1_DATABASE_ID: "db",
715 CLOUDFLARE_API_TOKEN: "tok"
716 },
717 databasePath: join(tmpDir, "test.db")
718 });
719 assert.ok(worker instanceof D1SyncWorker);
720 } finally {
721 rmSync(tmpDir, { recursive: true, force: true });
722 }
723 });
724});
725
726describe("d1-setup.sql", () => {
727 it("contains renewal storage tables", () => {
728 const setupSql = readFileSync(new URL("./d1-setup.sql", import.meta.url), "utf8");
729
730 assert.match(setupSql, /CREATE TABLE IF NOT EXISTS local_conversations/u);
731 assert.match(setupSql, /CREATE TABLE IF NOT EXISTS conversation_links/u);
732 assert.match(setupSql, /CREATE TABLE IF NOT EXISTS renewal_jobs/u);
733 assert.match(setupSql, /CREATE TABLE IF NOT EXISTS browser_request_policy_state/u);
734 assert.match(setupSql, /idx_conversation_links_null_route/u);
735 assert.match(setupSql, /idx_conversation_links_null_page/u);
736 assert.match(setupSql, /idx_renewal_jobs_status_due/u);
737 assert.match(setupSql, /idx_browser_request_policy_state_updated/u);
738 });
739});