baa-conductor

git clone 

baa-conductor / packages / d1-client / src
im_wower  ·  2026-03-29

sync-queue.ts

  1import { DatabaseSync } from "node:sqlite";
  2
  3import {
  4  D1_SYNC_QUEUE_SCHEMA_SQL,
  5  type EnqueueSyncInput,
  6  type SyncOperation,
  7  type SyncQueueRecord,
  8  type SyncStatus
  9} from "./types.js";
 10
 11// ---------------------------------------------------------------------------
 12// Row shape returned by node:sqlite
 13// ---------------------------------------------------------------------------
 14
 15interface SyncQueueRow {
 16  id: number;
 17  table_name: string;
 18  record_id: string;
 19  operation: string;
 20  payload: string;
 21  created_at: number;
 22  attempts: number;
 23  last_attempt_at: number | null;
 24  status: string;
 25}
 26
 27// ---------------------------------------------------------------------------
 28// SQL statements
 29// ---------------------------------------------------------------------------
 30
 31const INSERT_SQL = `
 32INSERT INTO d1_sync_queue (table_name, record_id, operation, payload, created_at)
 33VALUES (?, ?, ?, ?, ?);
 34`;
 35
 36const SELECT_PENDING_SQL = `
 37SELECT *
 38FROM d1_sync_queue
 39WHERE status = 'pending'
 40ORDER BY created_at ASC
 41LIMIT ?;
 42`;
 43
 44const UPDATE_SYNCED_SQL = `
 45UPDATE d1_sync_queue
 46SET status = 'synced', last_attempt_at = ?
 47WHERE id = ?;
 48`;
 49
 50const UPDATE_ATTEMPT_SQL = `
 51UPDATE d1_sync_queue
 52SET attempts = ?, last_attempt_at = ?, status = ?
 53WHERE id = ?;
 54`;
 55
 56const DELETE_SYNCED_SQL = `
 57DELETE FROM d1_sync_queue
 58WHERE status = 'synced' AND last_attempt_at < ?;
 59`;
 60
 61const DELETE_SYNCED_KEEP_RECENT_SQL = `
 62DELETE FROM d1_sync_queue
 63WHERE status = 'synced'
 64  AND id NOT IN (
 65    SELECT id FROM d1_sync_queue
 66    WHERE status = 'synced'
 67    ORDER BY last_attempt_at DESC
 68    LIMIT ?
 69  );
 70`;
 71
 72// ---------------------------------------------------------------------------
 73// SyncQueue
 74// ---------------------------------------------------------------------------
 75
 76export class SyncQueue {
 77  private readonly db: DatabaseSync;
 78
 79  constructor(db: DatabaseSync) {
 80    this.db = db;
 81    this.db.exec(D1_SYNC_QUEUE_SCHEMA_SQL);
 82  }
 83
 84  /**
 85   * Enqueue a record for D1 synchronisation.
 86   */
 87  enqueueSyncRecord(input: EnqueueSyncInput): void {
 88    const payload =
 89      typeof input.payload === "string"
 90        ? input.payload
 91        : JSON.stringify(input.payload);
 92
 93    this.db.prepare(INSERT_SQL).run(
 94      input.tableName,
 95      input.recordId,
 96      input.operation,
 97      payload,
 98      Date.now()
 99    );
100  }
101
102  /**
103   * Dequeue up to `limit` pending records ordered by creation time.
104   */
105  dequeuePendingSyncRecords(limit: number): SyncQueueRecord[] {
106    const rows = this.db.prepare(SELECT_PENDING_SQL).all(limit) as SyncQueueRow[];
107    return rows.map(mapRow);
108  }
109
110  /**
111   * Mark a record as successfully synced.
112   */
113  markSynced(id: number): void {
114    this.db.prepare(UPDATE_SYNCED_SQL).run(Date.now(), id);
115  }
116
117  /**
118   * Record a failed attempt. If `attempts >= maxAttempts` the status becomes
119   * `'failed'`; otherwise it stays `'pending'` for retry.
120   */
121  markAttemptFailed(id: number, currentAttempts: number, maxAttempts: number): void {
122    const newAttempts = currentAttempts + 1;
123    const newStatus: SyncStatus = newAttempts >= maxAttempts ? "failed" : "pending";
124    this.db.prepare(UPDATE_ATTEMPT_SQL).run(newAttempts, Date.now(), newStatus, id);
125  }
126
127  /**
128   * Purge synced records older than `olderThanMs` milliseconds.
129   */
130  purgeSynced(olderThanMs: number): void {
131    const cutoff = Date.now() - olderThanMs;
132    this.db.prepare(DELETE_SYNCED_SQL).run(cutoff);
133  }
134
135  /**
136   * Purge synced records, keeping only the most recent `keepCount`.
137   */
138  purgeSyncedKeepRecent(keepCount: number): void {
139    this.db.prepare(DELETE_SYNCED_KEEP_RECENT_SQL).run(keepCount);
140  }
141}
142
143// ---------------------------------------------------------------------------
144// Helpers
145// ---------------------------------------------------------------------------
146
147function mapRow(row: SyncQueueRow): SyncQueueRecord {
148  return {
149    id: row.id,
150    tableName: row.table_name,
151    recordId: row.record_id,
152    operation: row.operation as SyncOperation,
153    payload: row.payload,
154    createdAt: row.created_at,
155    attempts: row.attempts,
156    lastAttemptAt: row.last_attempt_at,
157    status: row.status as SyncStatus
158  };
159}