codex@macbookpro
·
2026-03-31
data-source.ts
1import {
2 D1ControlPlaneRepository,
3 type AutomationStateRecord,
4 type D1DatabaseLike,
5 type LeaderLeaseRecord
6} from "../../../packages/db/src/index.js";
7import {
8 createEmptyStatusSnapshot,
9 type StatusApiEnvironment,
10 type StatusSnapshot,
11 type StatusSnapshotLoader
12} from "./contracts.js";
13
14const SELECT_QUEUED_TASK_COUNT_SQL = `
15 SELECT COUNT(*) AS value
16 FROM tasks
17 WHERE status = 'queued'
18`;
19
20const SELECT_ACTIVE_RUN_COUNT_SQL = `
21 SELECT COUNT(*) AS value
22 FROM task_runs
23 WHERE started_at IS NOT NULL
24 AND finished_at IS NULL
25`;
26
27const DEFAULT_STATUS_API_TRUTH_SOURCE_BASE = "http://100.71.210.78:4317";
28const STATUS_API_SYSTEM_STATE_PATH = "/v1/system/state";
29
30export interface StatusSnapshotSourceReader {
31 countActiveRuns(): Promise<number>;
32 countQueuedTasks(): Promise<number>;
33 getAutomationState(): Promise<AutomationStateRecord | null>;
34 getCurrentLease(): Promise<LeaderLeaseRecord | null>;
35}
36
37export interface ControlApiStatusSnapshotLoaderOptions {
38 baseUrl: string;
39 fetch?: typeof fetch;
40 now?: () => Date;
41}
42
43export interface DefaultStatusSnapshotLoaderOptions {
44 env?: StatusApiEnvironment;
45 fetch?: typeof fetch;
46 now?: () => Date;
47}
48
49export class StaticStatusSnapshotLoader implements StatusSnapshotLoader {
50 constructor(private readonly snapshot: StatusSnapshot = createEmptyStatusSnapshot()) {}
51
52 async loadSnapshot(): Promise<StatusSnapshot> {
53 return this.snapshot;
54 }
55}
56
57export class ControlApiStatusSnapshotLoader implements StatusSnapshotLoader {
58 private readonly baseUrl: string;
59 private readonly fetchImpl: typeof fetch;
60
61 constructor(
62 private readonly options: ControlApiStatusSnapshotLoaderOptions
63 ) {
64 this.baseUrl = normalizeTruthSourceBaseUrl(options.baseUrl);
65 this.fetchImpl = options.fetch ?? globalThis.fetch;
66
67 if (typeof this.fetchImpl !== "function") {
68 throw new Error("Status API requires a fetch implementation to read the conductor truth source.");
69 }
70 }
71
72 async loadSnapshot(): Promise<StatusSnapshot> {
73 const response = await this.fetchImpl(new URL(STATUS_API_SYSTEM_STATE_PATH, this.baseUrl), {
74 method: "GET",
75 headers: {
76 Accept: "application/json",
77 "Cache-Control": "no-store"
78 }
79 });
80
81 if (!response.ok) {
82 throw new Error(
83 `Conductor truth source ${STATUS_API_SYSTEM_STATE_PATH} returned HTTP ${response.status}.`
84 );
85 }
86
87 const payload = await response.json();
88
89 return createStatusSnapshotFromControlApiPayload(payload, this.options.now?.() ?? new Date());
90 }
91}
92
93export class D1StatusSnapshotLoader implements StatusSnapshotLoader {
94 private readonly repository: D1ControlPlaneRepository;
95
96 constructor(
97 private readonly db: D1DatabaseLike,
98 private readonly now: () => Date = () => new Date()
99 ) {
100 this.repository = new D1ControlPlaneRepository(db);
101 }
102
103 async loadSnapshot(): Promise<StatusSnapshot> {
104 return readStatusSnapshot(
105 {
106 countActiveRuns: () => countRows(this.db, SELECT_ACTIVE_RUN_COUNT_SQL),
107 countQueuedTasks: () => countRows(this.db, SELECT_QUEUED_TASK_COUNT_SQL),
108 getAutomationState: () => this.repository.getAutomationState(),
109 getCurrentLease: () => this.repository.getCurrentLease()
110 },
111 this.now()
112 );
113 }
114}
115
116export async function createStatusSnapshotFromDatabase(
117 db: D1DatabaseLike,
118 observedAt: Date = new Date()
119): Promise<StatusSnapshot> {
120 return new D1StatusSnapshotLoader(db, () => observedAt).loadSnapshot();
121}
122
123export function createDefaultStatusSnapshotLoader(
124 options: DefaultStatusSnapshotLoaderOptions = {}
125): StatusSnapshotLoader {
126 return new ControlApiStatusSnapshotLoader({
127 baseUrl: resolveStatusApiTruthSourceBase(options.env),
128 fetch: options.fetch,
129 now: options.now
130 });
131}
132
133export function resolveStatusApiTruthSourceBase(env: StatusApiEnvironment = process?.env ?? {}): string {
134 return normalizeTruthSourceBaseUrl(
135 getFirstNonEmptyString(env.BAA_CONDUCTOR_LOCAL_API, env.BAA_CONTROL_API_BASE) ?? DEFAULT_STATUS_API_TRUTH_SOURCE_BASE
136 );
137}
138
139export function resolveStatusApiControlApiBase(env: StatusApiEnvironment = process?.env ?? {}): string {
140 return resolveStatusApiTruthSourceBase(env);
141}
142
143export function createStatusSnapshotFromControlApiPayload(
144 payload: unknown,
145 observedAt: Date = new Date()
146): StatusSnapshot {
147 const fallback = createEmptyStatusSnapshot(observedAt);
148 const payloadError = normalizeOptionalString(
149 getFirstDefinedValue(payload, ["error", "data.error", "message", "data.message"])
150 );
151
152 if (getFirstDefinedValue(payload, ["ok"]) === false) {
153 throw new Error(`Conductor truth source reported an error: ${payloadError ?? "unknown_error"}.`);
154 }
155
156 const mode = normalizeAutomationMode(
157 getFirstDefinedValue(payload, [
158 "data.mode",
159 "mode",
160 "data.automation.mode",
161 "automation.mode",
162 "data.system.mode",
163 "system.mode",
164 "data.system_state.mode",
165 "system_state.mode",
166 "data.systemState.mode",
167 "systemState.mode",
168 "data.state.mode",
169 "state.mode"
170 ])
171 );
172
173 if (mode == null) {
174 throw new TypeError("Conductor truth source payload did not include a valid automation mode.");
175 }
176
177 const observedAtMs = observedAt.getTime();
178 const updatedAtMs = normalizeTimestamp(
179 getFirstDefinedValue(payload, [
180 "data.updated_at",
181 "updated_at",
182 "data.updatedAt",
183 "updatedAt",
184 "data.automation.updated_at",
185 "automation.updated_at",
186 "data.automation.updatedAt",
187 "automation.updatedAt",
188 "data.state.updated_at",
189 "state.updated_at",
190 "data.state.updatedAt",
191 "state.updatedAt"
192 ])
193 );
194 const leaseExpiresAtMs = normalizeTimestamp(
195 getFirstDefinedValue(payload, [
196 "data.lease_expires_at",
197 "lease_expires_at",
198 "data.leaseExpiresAt",
199 "leaseExpiresAt",
200 "data.leader.lease_expires_at",
201 "leader.lease_expires_at",
202 "data.leader.leaseExpiresAt",
203 "leader.leaseExpiresAt"
204 ])
205 );
206
207 return {
208 source: "conductor-api",
209 mode,
210 leaderId: normalizeOptionalString(
211 getFirstDefinedValue(payload, [
212 "data.holder_id",
213 "holder_id",
214 "data.holderId",
215 "holderId",
216 "data.leader.controller_id",
217 "leader.controller_id",
218 "data.leader.controllerId",
219 "leader.controllerId",
220 "data.lease_holder",
221 "lease_holder",
222 "data.leaseHolder",
223 "leaseHolder"
224 ])
225 ),
226 leaderHost: normalizeOptionalString(
227 getFirstDefinedValue(payload, [
228 "data.holder_host",
229 "holder_host",
230 "data.holderHost",
231 "holderHost",
232 "data.leader.host",
233 "leader.host",
234 "data.leader",
235 "leader"
236 ])
237 ),
238 leaseTerm: normalizeNonNegativeInteger(
239 getFirstDefinedValue(payload, ["data.term", "term", "data.leader.term", "leader.term"])
240 ),
241 leaseExpiresAt: toIsoFromUnixMilliseconds(leaseExpiresAtMs),
242 leaseActive: leaseExpiresAtMs != null && leaseExpiresAtMs > observedAtMs,
243 queueDepth:
244 normalizeNonNegativeInteger(
245 getFirstDefinedValue(payload, [
246 "data.queue_depth",
247 "queue_depth",
248 "data.queueDepth",
249 "queueDepth",
250 "data.queued_tasks",
251 "queued_tasks",
252 "data.queuedTasks",
253 "queuedTasks",
254 "data.queue.queued_tasks",
255 "queue.queued_tasks",
256 "data.queue.queuedTasks",
257 "queue.queuedTasks",
258 "data.stats.queue_depth",
259 "stats.queue_depth",
260 "data.stats.queueDepth",
261 "stats.queueDepth"
262 ])
263 ) ?? fallback.queueDepth,
264 activeRuns:
265 normalizeNonNegativeInteger(
266 getFirstDefinedValue(payload, [
267 "data.active_runs",
268 "active_runs",
269 "data.activeRuns",
270 "activeRuns",
271 "data.queue.active_runs",
272 "queue.active_runs",
273 "data.queue.activeRuns",
274 "queue.activeRuns",
275 "data.runs.active",
276 "runs.active",
277 "data.stats.active_runs",
278 "stats.active_runs",
279 "data.stats.activeRuns",
280 "stats.activeRuns"
281 ])
282 ) ?? fallback.activeRuns,
283 updatedAt: toIsoFromUnixMilliseconds(updatedAtMs) ?? fallback.updatedAt,
284 observedAt: fallback.observedAt
285 };
286}
287
288export async function readStatusSnapshot(
289 source: StatusSnapshotSourceReader,
290 observedAt: Date = new Date()
291): Promise<StatusSnapshot> {
292 const fallback = createEmptyStatusSnapshot(observedAt);
293 const [automationState, lease, queueDepth, activeRuns] = await Promise.all([
294 source.getAutomationState(),
295 source.getCurrentLease(),
296 source.countQueuedTasks(),
297 source.countActiveRuns()
298 ]);
299
300 const observedAtUnixSeconds = toUnixSeconds(observedAt);
301 const latestDurableTimestampMs = Math.max(
302 normalizeTimestamp(automationState?.updatedAt) ?? 0,
303 normalizeTimestamp(lease?.renewedAt) ?? 0
304 );
305
306 return {
307 source: "d1",
308 mode: automationState?.mode ?? fallback.mode,
309 leaderId: lease?.holderId ?? null,
310 leaderHost: lease?.holderHost ?? null,
311 leaseTerm: lease?.term ?? null,
312 leaseExpiresAt: toIsoFromUnixSeconds(lease?.leaseExpiresAt),
313 leaseActive: lease != null && lease.leaseExpiresAt > observedAtUnixSeconds,
314 queueDepth,
315 activeRuns,
316 updatedAt:
317 latestDurableTimestampMs > 0
318 ? new Date(latestDurableTimestampMs).toISOString()
319 : fallback.updatedAt,
320 observedAt: fallback.observedAt
321 };
322}
323
324async function countRows(db: D1DatabaseLike, sql: string): Promise<number> {
325 const value = await db.prepare(sql).first<number>("value");
326
327 if (value == null) {
328 return 0;
329 }
330
331 if (!Number.isFinite(value) || value < 0) {
332 throw new TypeError(`Expected count query to return a non-negative finite number, received "${String(value)}".`);
333 }
334
335 return value;
336}
337
338function toUnixSeconds(date: Date): number {
339 return Math.floor(date.getTime() / 1000);
340}
341
342function toIsoFromUnixSeconds(value: number | null | undefined): string | null {
343 if (value == null) {
344 return null;
345 }
346
347 return new Date(value * 1000).toISOString();
348}
349
350function normalizeTruthSourceBaseUrl(value: string): string {
351 const normalized = value.trim();
352
353 if (normalized === "") {
354 return DEFAULT_STATUS_API_TRUTH_SOURCE_BASE;
355 }
356
357 return new URL(normalized).toString();
358}
359
360function getFirstNonEmptyString(...values: Array<string | undefined>): string | undefined {
361 for (const value of values) {
362 if (typeof value === "string" && value.trim() !== "") {
363 return value;
364 }
365 }
366
367 return undefined;
368}
369
370function normalizeAutomationMode(value: unknown): StatusSnapshot["mode"] | null {
371 return value === "running" || value === "draining" || value === "paused" ? value : null;
372}
373
374function normalizeNonNegativeInteger(value: unknown): number | null {
375 const numeric =
376 typeof value === "number"
377 ? value
378 : typeof value === "string" && value.trim() !== ""
379 ? Number(value)
380 : Number.NaN;
381
382 if (!Number.isInteger(numeric) || numeric < 0) {
383 return null;
384 }
385
386 return numeric;
387}
388
389function normalizeOptionalString(value: unknown): string | null {
390 if (typeof value !== "string") {
391 return null;
392 }
393
394 const normalized = value.trim();
395
396 return normalized === "" ? null : normalized;
397}
398
399function normalizeTimestamp(value: unknown): number | null {
400 const numeric =
401 typeof value === "number"
402 ? value
403 : typeof value === "string" && value.trim() !== ""
404 ? Number(value)
405 : Number.NaN;
406
407 if (!Number.isFinite(numeric) || numeric <= 0) {
408 return null;
409 }
410
411 return numeric >= 1_000_000_000_000 ? Math.trunc(numeric) : Math.trunc(numeric * 1000);
412}
413
414function getFirstDefinedValue(source: unknown, paths: readonly string[]): unknown {
415 for (const path of paths) {
416 const value = readPath(source, path);
417
418 if (value !== undefined) {
419 return value;
420 }
421 }
422
423 return undefined;
424}
425
426function readPath(source: unknown, path: string): unknown {
427 let current = source;
428
429 for (const segment of path.split(".")) {
430 if (!isRecord(current) || !(segment in current)) {
431 return undefined;
432 }
433
434 current = current[segment];
435 }
436
437 return current;
438}
439
440function isRecord(value: unknown): value is Record<string, unknown> {
441 return value != null && typeof value === "object" && !Array.isArray(value);
442}
443
444function toIsoFromUnixMilliseconds(value: number | null | undefined): string | null {
445 if (value == null) {
446 return null;
447 }
448
449 return new Date(value).toISOString();
450}