baa-conductor

git clone 

baa-conductor / packages / d1-client / src
codex@macbookpro  ·  2026-04-01

sync-worker.ts

  1import type { D1Client } from "./client.js";
  2import type { SyncQueue } from "./sync-queue.js";
  3import type { SyncQueueRecord, SyncWorkerConfig } from "./types.js";
  4import {
  5  DEFAULT_BASE_DELAY_MS,
  6  DEFAULT_BATCH_SIZE,
  7  DEFAULT_MAX_ATTEMPTS,
  8  DEFAULT_MAX_DELAY_MS,
  9  DEFAULT_POLL_INTERVAL_MS
 10} from "./types.js";
 11
 12const DEFAULT_PURGE_KEEP_COUNT = 1000;
 13
 14const SYNC_COLUMN_WHITELIST = {
 15  messages: new Set([
 16    "id",
 17    "platform",
 18    "conversation_id",
 19    "role",
 20    "raw_text",
 21    "summary",
 22    "observed_at",
 23    "static_path",
 24    "page_url",
 25    "page_title",
 26    "organization_id",
 27    "created_at"
 28  ]),
 29  executions: new Set([
 30    "instruction_id",
 31    "message_id",
 32    "target",
 33    "tool",
 34    "params",
 35    "params_kind",
 36    "result_ok",
 37    "result_data",
 38    "result_summary",
 39    "result_error",
 40    "http_status",
 41    "executed_at",
 42    "static_path",
 43    "created_at"
 44  ]),
 45  sessions: new Set([
 46    "id",
 47    "platform",
 48    "conversation_id",
 49    "started_at",
 50    "last_activity_at",
 51    "message_count",
 52    "execution_count",
 53    "summary",
 54    "static_path",
 55    "created_at"
 56  ]),
 57  local_conversations: new Set([
 58    "local_conversation_id",
 59    "platform",
 60    "automation_status",
 61    "last_non_paused_automation_status",
 62    "pause_reason",
 63    "last_error",
 64    "execution_state",
 65    "consecutive_failure_count",
 66    "repeated_message_count",
 67    "repeated_renewal_count",
 68    "last_message_fingerprint",
 69    "last_renewal_fingerprint",
 70    "title",
 71    "summary",
 72    "last_message_id",
 73    "last_message_at",
 74    "cooldown_until",
 75    "paused_at",
 76    "created_at",
 77    "updated_at"
 78  ]),
 79  conversation_links: new Set([
 80    "link_id",
 81    "local_conversation_id",
 82    "platform",
 83    "remote_conversation_id",
 84    "client_id",
 85    "page_url",
 86    "page_title",
 87    "route_path",
 88    "route_pattern",
 89    "route_params",
 90    "target_kind",
 91    "target_id",
 92    "target_payload",
 93    "is_active",
 94    "observed_at",
 95    "created_at",
 96    "updated_at"
 97  ]),
 98  renewal_jobs: new Set([
 99    "job_id",
100    "local_conversation_id",
101    "message_id",
102    "status",
103    "payload",
104    "payload_kind",
105    "target_snapshot",
106    "attempt_count",
107    "max_attempts",
108    "next_attempt_at",
109    "last_attempt_at",
110    "last_error",
111    "log_path",
112    "started_at",
113    "finished_at",
114    "created_at",
115    "updated_at"
116  ]),
117  browser_request_policy_state: new Set([
118    "state_key",
119    "value_json",
120    "updated_at"
121  ])
122} as const;
123
124type SyncTableName = keyof typeof SYNC_COLUMN_WHITELIST;
125
126export interface SyncWorkerDeps {
127  d1: D1Client;
128  queue: SyncQueue;
129  config?: SyncWorkerConfig;
130  log?: (message: string) => void;
131}
132
133/**
134 * Background sync worker that periodically drains the local d1_sync_queue
135 * and pushes records to Cloudflare D1.
136 *
137 * - Runs on a configurable poll interval.
138 * - Uses exponential backoff per-record on failure (1s -> 2s -> 4s -> ... max 5 min).
139 * - Marks records as `'failed'` after 10 consecutive failures.
140 * - Worker errors never propagate to the caller — they are logged and swallowed.
141 */
142export class D1SyncWorker {
143  private readonly d1: D1Client;
144  private readonly queue: SyncQueue;
145  private readonly pollIntervalMs: number;
146  private readonly batchSize: number;
147  private readonly maxAttempts: number;
148  private readonly baseDelayMs: number;
149  private readonly maxDelayMs: number;
150  private readonly purgeKeepCount: number;
151  private readonly log: (message: string) => void;
152
153  private timer: ReturnType<typeof globalThis.setTimeout> | null = null;
154  private running = false;
155  private processing = false;
156
157  constructor(deps: SyncWorkerDeps) {
158    this.d1 = deps.d1;
159    this.queue = deps.queue;
160    this.pollIntervalMs = deps.config?.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS;
161    this.batchSize = deps.config?.batchSize ?? DEFAULT_BATCH_SIZE;
162    this.maxAttempts = deps.config?.maxAttempts ?? DEFAULT_MAX_ATTEMPTS;
163    this.baseDelayMs = deps.config?.baseDelayMs ?? DEFAULT_BASE_DELAY_MS;
164    this.maxDelayMs = deps.config?.maxDelayMs ?? DEFAULT_MAX_DELAY_MS;
165    this.purgeKeepCount = deps.config?.purgeKeepCount ?? DEFAULT_PURGE_KEEP_COUNT;
166    this.log = deps.log ?? defaultLog;
167  }
168
169  getQueue(): SyncQueue {
170    return this.queue;
171  }
172
173  /**
174   * Start the background polling loop. Safe to call multiple times — subsequent
175   * calls are no-ops while the worker is already running.
176   */
177  start(): void {
178    if (this.running) {
179      return;
180    }
181
182    this.running = true;
183    this.log("[d1-sync] worker started");
184    this.scheduleNext();
185  }
186
187  /**
188   * Stop the worker. In-flight processing will finish but no new cycles start.
189   */
190  stop(): void {
191    this.running = false;
192
193    if (this.timer != null) {
194      clearTimeout(this.timer);
195      this.timer = null;
196    }
197
198    this.log("[d1-sync] worker stopped");
199  }
200
201  isRunning(): boolean {
202    return this.running;
203  }
204
205  // -------------------------------------------------------------------------
206  // Internal
207  // -------------------------------------------------------------------------
208
209  private scheduleNext(): void {
210    if (!this.running) {
211      return;
212    }
213
214    this.timer = setTimeout(() => {
215      void this.tick();
216    }, this.pollIntervalMs);
217  }
218
219  private async tick(): Promise<void> {
220    if (!this.running || this.processing) {
221      this.scheduleNext();
222      return;
223    }
224
225    this.processing = true;
226
227    try {
228      await this.processBatch();
229      this.purgeCompleted();
230    } catch (error) {
231      this.log(
232        `[d1-sync] unexpected error: ${error instanceof Error ? error.message : String(error)}`
233      );
234    } finally {
235      this.processing = false;
236      this.scheduleNext();
237    }
238  }
239
240  private async processBatch(): Promise<void> {
241    const records = this.queue.dequeuePendingSyncRecords(this.batchSize);
242
243    if (records.length === 0) {
244      return;
245    }
246
247    this.log(`[d1-sync] processing ${records.length} pending record(s)`);
248
249    for (const record of records) {
250      if (!this.running) {
251        break;
252      }
253
254      // Exponential backoff: skip records whose next-retry time hasn't arrived.
255      if (record.attempts > 0 && record.lastAttemptAt != null) {
256        const delay = computeBackoff(
257          record.attempts,
258          this.baseDelayMs,
259          this.maxDelayMs
260        );
261        const nextRetryAt = record.lastAttemptAt + delay;
262
263        if (Date.now() < nextRetryAt) {
264          continue;
265        }
266      }
267
268      await this.syncRecord(record);
269    }
270  }
271
272  private purgeCompleted(): void {
273    try {
274      this.queue.purgeSyncedKeepRecent(this.purgeKeepCount);
275    } catch (error) {
276      this.log(
277        `[d1-sync] purge failed: ${error instanceof Error ? error.message : String(error)}`
278      );
279    }
280  }
281
282  private async syncRecord(record: SyncQueueRecord): Promise<void> {
283    try {
284      const payload: Record<string, unknown> = JSON.parse(record.payload);
285      const sql = buildSyncSql(record.tableName, record.operation, payload);
286
287      if (sql == null) {
288        // Invalid or unsupported sync payloads are marked synced to avoid infinite retries.
289        this.queue.markSynced(record.id);
290        return;
291      }
292
293      await this.d1.prepare(sql.statement).run(...sql.params);
294      this.queue.markSynced(record.id);
295    } catch (error) {
296      this.queue.markAttemptFailed(record.id, record.attempts, this.maxAttempts);
297      this.log(
298        `[d1-sync] record ${record.id} (${record.tableName}/${record.recordId}) ` +
299        `attempt ${record.attempts + 1} failed: ` +
300        `${error instanceof Error ? error.message : String(error)}`
301      );
302    }
303  }
304}
305
306// ---------------------------------------------------------------------------
307// SQL generation for sync
308// ---------------------------------------------------------------------------
309
310interface GeneratedSql {
311  statement: string;
312  params: unknown[];
313}
314
315function buildSyncSql(
316  tableName: string,
317  operation: string,
318  payload: Record<string, unknown>
319): GeneratedSql | null {
320  const keys = getWhitelistedPayloadKeys(tableName, payload);
321
322  if (keys == null) {
323    return null;
324  }
325
326  switch (operation) {
327    case "insert":
328    case "update":
329      return buildUpsertSql(tableName, payload, keys);
330    case "delete":
331      return buildDeleteSql(tableName, payload, keys);
332    default:
333      return null;
334  }
335}
336
337function buildUpsertSql(
338  tableName: string,
339  payload: Record<string, unknown>,
340  keys: readonly string[]
341): GeneratedSql {
342  const placeholders = keys.map(() => "?").join(", ");
343  const columns = keys.join(", ");
344  const updates = keys.map((k) => buildUpsertAssignment(tableName, k)).join(", ");
345  const params = keys.map((k) => payload[k]);
346
347  // Use INSERT OR REPLACE which covers both insert and update.
348  const statement =
349    `INSERT INTO ${tableName} (${columns}) VALUES (${placeholders}) ` +
350    `ON CONFLICT DO UPDATE SET ${updates};`;
351
352  return { statement, params };
353}
354
355function buildUpsertAssignment(tableName: string, columnName: string): string {
356  if (columnName === "created_at") {
357    return `${columnName} = COALESCE(${tableName}.created_at, excluded.created_at)`;
358  }
359
360  return `${columnName} = excluded.${columnName}`;
361}
362
363function buildDeleteSql(
364  tableName: string,
365  payload: Record<string, unknown>,
366  keys: readonly string[]
367): GeneratedSql {
368  // The payload for deletes is expected to contain the primary key column(s).
369  const conditions = keys.map((k) => `${k} = ?`).join(" AND ");
370  const params = keys.map((k) => payload[k]);
371
372  return {
373    statement: `DELETE FROM ${tableName} WHERE ${conditions};`,
374    params
375  };
376}
377
378// ---------------------------------------------------------------------------
379// Helpers
380// ---------------------------------------------------------------------------
381
382function getWhitelistedPayloadKeys(
383  tableName: string,
384  payload: Record<string, unknown>
385): string[] | null {
386  const allowedColumns = SYNC_COLUMN_WHITELIST[tableName as SyncTableName];
387
388  if (allowedColumns == null) {
389    return null;
390  }
391
392  const keys = Object.keys(payload);
393
394  if (keys.length === 0) {
395    return null;
396  }
397
398  for (const key of keys) {
399    if (!allowedColumns.has(key)) {
400      return null;
401    }
402  }
403
404  return keys;
405}
406
407function computeBackoff(
408  attempts: number,
409  baseMs: number,
410  maxMs: number
411): number {
412  const delay = baseMs * Math.pow(2, attempts - 1);
413  return Math.min(delay, maxMs);
414}
415
416function defaultLog(message: string): void {
417  console.log(message);
418}