codex@macbookpro
·
2026-04-01
sync-worker.ts
1import type { D1Client } from "./client.js";
2import type { SyncQueue } from "./sync-queue.js";
3import type { SyncQueueRecord, SyncWorkerConfig } from "./types.js";
4import {
5 DEFAULT_BASE_DELAY_MS,
6 DEFAULT_BATCH_SIZE,
7 DEFAULT_MAX_ATTEMPTS,
8 DEFAULT_MAX_DELAY_MS,
9 DEFAULT_POLL_INTERVAL_MS
10} from "./types.js";
11
12const DEFAULT_PURGE_KEEP_COUNT = 1000;
13
14const SYNC_COLUMN_WHITELIST = {
15 messages: new Set([
16 "id",
17 "platform",
18 "conversation_id",
19 "role",
20 "raw_text",
21 "summary",
22 "observed_at",
23 "static_path",
24 "page_url",
25 "page_title",
26 "organization_id",
27 "created_at"
28 ]),
29 executions: new Set([
30 "instruction_id",
31 "message_id",
32 "target",
33 "tool",
34 "params",
35 "params_kind",
36 "result_ok",
37 "result_data",
38 "result_summary",
39 "result_error",
40 "http_status",
41 "executed_at",
42 "static_path",
43 "created_at"
44 ]),
45 sessions: new Set([
46 "id",
47 "platform",
48 "conversation_id",
49 "started_at",
50 "last_activity_at",
51 "message_count",
52 "execution_count",
53 "summary",
54 "static_path",
55 "created_at"
56 ]),
57 local_conversations: new Set([
58 "local_conversation_id",
59 "platform",
60 "automation_status",
61 "last_non_paused_automation_status",
62 "pause_reason",
63 "last_error",
64 "execution_state",
65 "consecutive_failure_count",
66 "repeated_message_count",
67 "repeated_renewal_count",
68 "last_message_fingerprint",
69 "last_renewal_fingerprint",
70 "title",
71 "summary",
72 "last_message_id",
73 "last_message_at",
74 "cooldown_until",
75 "paused_at",
76 "created_at",
77 "updated_at"
78 ]),
79 conversation_links: new Set([
80 "link_id",
81 "local_conversation_id",
82 "platform",
83 "remote_conversation_id",
84 "client_id",
85 "page_url",
86 "page_title",
87 "route_path",
88 "route_pattern",
89 "route_params",
90 "target_kind",
91 "target_id",
92 "target_payload",
93 "is_active",
94 "observed_at",
95 "created_at",
96 "updated_at"
97 ]),
98 renewal_jobs: new Set([
99 "job_id",
100 "local_conversation_id",
101 "message_id",
102 "status",
103 "payload",
104 "payload_kind",
105 "target_snapshot",
106 "attempt_count",
107 "max_attempts",
108 "next_attempt_at",
109 "last_attempt_at",
110 "last_error",
111 "log_path",
112 "started_at",
113 "finished_at",
114 "created_at",
115 "updated_at"
116 ]),
117 browser_request_policy_state: new Set([
118 "state_key",
119 "value_json",
120 "updated_at"
121 ])
122} as const;
123
124type SyncTableName = keyof typeof SYNC_COLUMN_WHITELIST;
125
126export interface SyncWorkerDeps {
127 d1: D1Client;
128 queue: SyncQueue;
129 config?: SyncWorkerConfig;
130 log?: (message: string) => void;
131}
132
133/**
134 * Background sync worker that periodically drains the local d1_sync_queue
135 * and pushes records to Cloudflare D1.
136 *
137 * - Runs on a configurable poll interval.
138 * - Uses exponential backoff per-record on failure (1s -> 2s -> 4s -> ... max 5 min).
139 * - Marks records as `'failed'` after 10 consecutive failures.
140 * - Worker errors never propagate to the caller — they are logged and swallowed.
141 */
142export class D1SyncWorker {
143 private readonly d1: D1Client;
144 private readonly queue: SyncQueue;
145 private readonly pollIntervalMs: number;
146 private readonly batchSize: number;
147 private readonly maxAttempts: number;
148 private readonly baseDelayMs: number;
149 private readonly maxDelayMs: number;
150 private readonly purgeKeepCount: number;
151 private readonly log: (message: string) => void;
152
153 private timer: ReturnType<typeof globalThis.setTimeout> | null = null;
154 private running = false;
155 private processing = false;
156
157 constructor(deps: SyncWorkerDeps) {
158 this.d1 = deps.d1;
159 this.queue = deps.queue;
160 this.pollIntervalMs = deps.config?.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS;
161 this.batchSize = deps.config?.batchSize ?? DEFAULT_BATCH_SIZE;
162 this.maxAttempts = deps.config?.maxAttempts ?? DEFAULT_MAX_ATTEMPTS;
163 this.baseDelayMs = deps.config?.baseDelayMs ?? DEFAULT_BASE_DELAY_MS;
164 this.maxDelayMs = deps.config?.maxDelayMs ?? DEFAULT_MAX_DELAY_MS;
165 this.purgeKeepCount = deps.config?.purgeKeepCount ?? DEFAULT_PURGE_KEEP_COUNT;
166 this.log = deps.log ?? defaultLog;
167 }
168
169 getQueue(): SyncQueue {
170 return this.queue;
171 }
172
173 /**
174 * Start the background polling loop. Safe to call multiple times — subsequent
175 * calls are no-ops while the worker is already running.
176 */
177 start(): void {
178 if (this.running) {
179 return;
180 }
181
182 this.running = true;
183 this.log("[d1-sync] worker started");
184 this.scheduleNext();
185 }
186
187 /**
188 * Stop the worker. In-flight processing will finish but no new cycles start.
189 */
190 stop(): void {
191 this.running = false;
192
193 if (this.timer != null) {
194 clearTimeout(this.timer);
195 this.timer = null;
196 }
197
198 this.log("[d1-sync] worker stopped");
199 }
200
201 isRunning(): boolean {
202 return this.running;
203 }
204
205 // -------------------------------------------------------------------------
206 // Internal
207 // -------------------------------------------------------------------------
208
209 private scheduleNext(): void {
210 if (!this.running) {
211 return;
212 }
213
214 this.timer = setTimeout(() => {
215 void this.tick();
216 }, this.pollIntervalMs);
217 }
218
219 private async tick(): Promise<void> {
220 if (!this.running || this.processing) {
221 this.scheduleNext();
222 return;
223 }
224
225 this.processing = true;
226
227 try {
228 await this.processBatch();
229 this.purgeCompleted();
230 } catch (error) {
231 this.log(
232 `[d1-sync] unexpected error: ${error instanceof Error ? error.message : String(error)}`
233 );
234 } finally {
235 this.processing = false;
236 this.scheduleNext();
237 }
238 }
239
240 private async processBatch(): Promise<void> {
241 const records = this.queue.dequeuePendingSyncRecords(this.batchSize);
242
243 if (records.length === 0) {
244 return;
245 }
246
247 this.log(`[d1-sync] processing ${records.length} pending record(s)`);
248
249 for (const record of records) {
250 if (!this.running) {
251 break;
252 }
253
254 // Exponential backoff: skip records whose next-retry time hasn't arrived.
255 if (record.attempts > 0 && record.lastAttemptAt != null) {
256 const delay = computeBackoff(
257 record.attempts,
258 this.baseDelayMs,
259 this.maxDelayMs
260 );
261 const nextRetryAt = record.lastAttemptAt + delay;
262
263 if (Date.now() < nextRetryAt) {
264 continue;
265 }
266 }
267
268 await this.syncRecord(record);
269 }
270 }
271
272 private purgeCompleted(): void {
273 try {
274 this.queue.purgeSyncedKeepRecent(this.purgeKeepCount);
275 } catch (error) {
276 this.log(
277 `[d1-sync] purge failed: ${error instanceof Error ? error.message : String(error)}`
278 );
279 }
280 }
281
282 private async syncRecord(record: SyncQueueRecord): Promise<void> {
283 try {
284 const payload: Record<string, unknown> = JSON.parse(record.payload);
285 const sql = buildSyncSql(record.tableName, record.operation, payload);
286
287 if (sql == null) {
288 // Invalid or unsupported sync payloads are marked synced to avoid infinite retries.
289 this.queue.markSynced(record.id);
290 return;
291 }
292
293 await this.d1.prepare(sql.statement).run(...sql.params);
294 this.queue.markSynced(record.id);
295 } catch (error) {
296 this.queue.markAttemptFailed(record.id, record.attempts, this.maxAttempts);
297 this.log(
298 `[d1-sync] record ${record.id} (${record.tableName}/${record.recordId}) ` +
299 `attempt ${record.attempts + 1} failed: ` +
300 `${error instanceof Error ? error.message : String(error)}`
301 );
302 }
303 }
304}
305
306// ---------------------------------------------------------------------------
307// SQL generation for sync
308// ---------------------------------------------------------------------------
309
310interface GeneratedSql {
311 statement: string;
312 params: unknown[];
313}
314
315function buildSyncSql(
316 tableName: string,
317 operation: string,
318 payload: Record<string, unknown>
319): GeneratedSql | null {
320 const keys = getWhitelistedPayloadKeys(tableName, payload);
321
322 if (keys == null) {
323 return null;
324 }
325
326 switch (operation) {
327 case "insert":
328 case "update":
329 return buildUpsertSql(tableName, payload, keys);
330 case "delete":
331 return buildDeleteSql(tableName, payload, keys);
332 default:
333 return null;
334 }
335}
336
337function buildUpsertSql(
338 tableName: string,
339 payload: Record<string, unknown>,
340 keys: readonly string[]
341): GeneratedSql {
342 const placeholders = keys.map(() => "?").join(", ");
343 const columns = keys.join(", ");
344 const updates = keys.map((k) => buildUpsertAssignment(tableName, k)).join(", ");
345 const params = keys.map((k) => payload[k]);
346
347 // Use INSERT OR REPLACE which covers both insert and update.
348 const statement =
349 `INSERT INTO ${tableName} (${columns}) VALUES (${placeholders}) ` +
350 `ON CONFLICT DO UPDATE SET ${updates};`;
351
352 return { statement, params };
353}
354
355function buildUpsertAssignment(tableName: string, columnName: string): string {
356 if (columnName === "created_at") {
357 return `${columnName} = COALESCE(${tableName}.created_at, excluded.created_at)`;
358 }
359
360 return `${columnName} = excluded.${columnName}`;
361}
362
363function buildDeleteSql(
364 tableName: string,
365 payload: Record<string, unknown>,
366 keys: readonly string[]
367): GeneratedSql {
368 // The payload for deletes is expected to contain the primary key column(s).
369 const conditions = keys.map((k) => `${k} = ?`).join(" AND ");
370 const params = keys.map((k) => payload[k]);
371
372 return {
373 statement: `DELETE FROM ${tableName} WHERE ${conditions};`,
374 params
375 };
376}
377
378// ---------------------------------------------------------------------------
379// Helpers
380// ---------------------------------------------------------------------------
381
382function getWhitelistedPayloadKeys(
383 tableName: string,
384 payload: Record<string, unknown>
385): string[] | null {
386 const allowedColumns = SYNC_COLUMN_WHITELIST[tableName as SyncTableName];
387
388 if (allowedColumns == null) {
389 return null;
390 }
391
392 const keys = Object.keys(payload);
393
394 if (keys.length === 0) {
395 return null;
396 }
397
398 for (const key of keys) {
399 if (!allowedColumns.has(key)) {
400 return null;
401 }
402 }
403
404 return keys;
405}
406
407function computeBackoff(
408 attempts: number,
409 baseMs: number,
410 maxMs: number
411): number {
412 const delay = baseMs * Math.pow(2, attempts - 1);
413 return Math.min(delay, maxMs);
414}
415
416function defaultLog(message: string): void {
417 console.log(message);
418}