im_wower
·
2026-03-29
sync-queue.ts
1import { DatabaseSync } from "node:sqlite";
2
3import {
4 D1_SYNC_QUEUE_SCHEMA_SQL,
5 type EnqueueSyncInput,
6 type SyncOperation,
7 type SyncQueueRecord,
8 type SyncStatus
9} from "./types.js";
10
11// ---------------------------------------------------------------------------
12// Row shape returned by node:sqlite
13// ---------------------------------------------------------------------------
14
15interface SyncQueueRow {
16 id: number;
17 table_name: string;
18 record_id: string;
19 operation: string;
20 payload: string;
21 created_at: number;
22 attempts: number;
23 last_attempt_at: number | null;
24 status: string;
25}
26
27// ---------------------------------------------------------------------------
28// SQL statements
29// ---------------------------------------------------------------------------
30
31const INSERT_SQL = `
32INSERT INTO d1_sync_queue (table_name, record_id, operation, payload, created_at)
33VALUES (?, ?, ?, ?, ?);
34`;
35
36const SELECT_PENDING_SQL = `
37SELECT *
38FROM d1_sync_queue
39WHERE status = 'pending'
40ORDER BY created_at ASC
41LIMIT ?;
42`;
43
44const UPDATE_SYNCED_SQL = `
45UPDATE d1_sync_queue
46SET status = 'synced', last_attempt_at = ?
47WHERE id = ?;
48`;
49
50const UPDATE_ATTEMPT_SQL = `
51UPDATE d1_sync_queue
52SET attempts = ?, last_attempt_at = ?, status = ?
53WHERE id = ?;
54`;
55
56const DELETE_SYNCED_SQL = `
57DELETE FROM d1_sync_queue
58WHERE status = 'synced' AND last_attempt_at < ?;
59`;
60
61const DELETE_SYNCED_KEEP_RECENT_SQL = `
62DELETE FROM d1_sync_queue
63WHERE status = 'synced'
64 AND id NOT IN (
65 SELECT id FROM d1_sync_queue
66 WHERE status = 'synced'
67 ORDER BY last_attempt_at DESC
68 LIMIT ?
69 );
70`;
71
72// ---------------------------------------------------------------------------
73// SyncQueue
74// ---------------------------------------------------------------------------
75
76export class SyncQueue {
77 private readonly db: DatabaseSync;
78
79 constructor(db: DatabaseSync) {
80 this.db = db;
81 this.db.exec(D1_SYNC_QUEUE_SCHEMA_SQL);
82 }
83
84 /**
85 * Enqueue a record for D1 synchronisation.
86 */
87 enqueueSyncRecord(input: EnqueueSyncInput): void {
88 const payload =
89 typeof input.payload === "string"
90 ? input.payload
91 : JSON.stringify(input.payload);
92
93 this.db.prepare(INSERT_SQL).run(
94 input.tableName,
95 input.recordId,
96 input.operation,
97 payload,
98 Date.now()
99 );
100 }
101
102 /**
103 * Dequeue up to `limit` pending records ordered by creation time.
104 */
105 dequeuePendingSyncRecords(limit: number): SyncQueueRecord[] {
106 const rows = this.db.prepare(SELECT_PENDING_SQL).all(limit) as SyncQueueRow[];
107 return rows.map(mapRow);
108 }
109
110 /**
111 * Mark a record as successfully synced.
112 */
113 markSynced(id: number): void {
114 this.db.prepare(UPDATE_SYNCED_SQL).run(Date.now(), id);
115 }
116
117 /**
118 * Record a failed attempt. If `attempts >= maxAttempts` the status becomes
119 * `'failed'`; otherwise it stays `'pending'` for retry.
120 */
121 markAttemptFailed(id: number, currentAttempts: number, maxAttempts: number): void {
122 const newAttempts = currentAttempts + 1;
123 const newStatus: SyncStatus = newAttempts >= maxAttempts ? "failed" : "pending";
124 this.db.prepare(UPDATE_ATTEMPT_SQL).run(newAttempts, Date.now(), newStatus, id);
125 }
126
127 /**
128 * Purge synced records older than `olderThanMs` milliseconds.
129 */
130 purgeSynced(olderThanMs: number): void {
131 const cutoff = Date.now() - olderThanMs;
132 this.db.prepare(DELETE_SYNCED_SQL).run(cutoff);
133 }
134
135 /**
136 * Purge synced records, keeping only the most recent `keepCount`.
137 */
138 purgeSyncedKeepRecent(keepCount: number): void {
139 this.db.prepare(DELETE_SYNCED_KEEP_RECENT_SQL).run(keepCount);
140 }
141}
142
143// ---------------------------------------------------------------------------
144// Helpers
145// ---------------------------------------------------------------------------
146
147function mapRow(row: SyncQueueRow): SyncQueueRecord {
148 return {
149 id: row.id,
150 tableName: row.table_name,
151 recordId: row.record_id,
152 operation: row.operation as SyncOperation,
153 payload: row.payload,
154 createdAt: row.created_at,
155 attempts: row.attempts,
156 lastAttemptAt: row.last_attempt_at,
157 status: row.status as SyncStatus
158 };
159}