baa-conductor


baa-conductor / apps / status-api / src
codex@macbookpro  ·  2026-03-31

data-source.ts

  1import {
  2  D1ControlPlaneRepository,
  3  type AutomationStateRecord,
  4  type D1DatabaseLike,
  5  type LeaderLeaseRecord
  6} from "../../../packages/db/src/index.js";
  7import {
  8  createEmptyStatusSnapshot,
  9  type StatusApiEnvironment,
 10  type StatusSnapshot,
 11  type StatusSnapshotLoader
 12} from "./contracts.js";
 13
 14const SELECT_QUEUED_TASK_COUNT_SQL = `
 15  SELECT COUNT(*) AS value
 16  FROM tasks
 17  WHERE status = 'queued'
 18`;
 19
 20const SELECT_ACTIVE_RUN_COUNT_SQL = `
 21  SELECT COUNT(*) AS value
 22  FROM task_runs
 23  WHERE started_at IS NOT NULL
 24    AND finished_at IS NULL
 25`;
 26
 27const DEFAULT_STATUS_API_TRUTH_SOURCE_BASE = "http://100.71.210.78:4317";
 28const STATUS_API_SYSTEM_STATE_PATH = "/v1/system/state";
 29
 30export interface StatusSnapshotSourceReader {
 31  countActiveRuns(): Promise<number>;
 32  countQueuedTasks(): Promise<number>;
 33  getAutomationState(): Promise<AutomationStateRecord | null>;
 34  getCurrentLease(): Promise<LeaderLeaseRecord | null>;
 35}
 36
 37export interface ControlApiStatusSnapshotLoaderOptions {
 38  baseUrl: string;
 39  fetch?: typeof fetch;
 40  now?: () => Date;
 41}
 42
 43export interface DefaultStatusSnapshotLoaderOptions {
 44  env?: StatusApiEnvironment;
 45  fetch?: typeof fetch;
 46  now?: () => Date;
 47}
 48
 49export class StaticStatusSnapshotLoader implements StatusSnapshotLoader {
 50  constructor(private readonly snapshot: StatusSnapshot = createEmptyStatusSnapshot()) {}
 51
 52  async loadSnapshot(): Promise<StatusSnapshot> {
 53    return this.snapshot;
 54  }
 55}
 56
 57export class ControlApiStatusSnapshotLoader implements StatusSnapshotLoader {
 58  private readonly baseUrl: string;
 59  private readonly fetchImpl: typeof fetch;
 60
 61  constructor(
 62    private readonly options: ControlApiStatusSnapshotLoaderOptions
 63  ) {
 64    this.baseUrl = normalizeTruthSourceBaseUrl(options.baseUrl);
 65    this.fetchImpl = options.fetch ?? globalThis.fetch;
 66
 67    if (typeof this.fetchImpl !== "function") {
 68      throw new Error("Status API requires a fetch implementation to read the conductor truth source.");
 69    }
 70  }
 71
 72  async loadSnapshot(): Promise<StatusSnapshot> {
 73    const response = await this.fetchImpl(new URL(STATUS_API_SYSTEM_STATE_PATH, this.baseUrl), {
 74      method: "GET",
 75      headers: {
 76        Accept: "application/json",
 77        "Cache-Control": "no-store"
 78      }
 79    });
 80
 81    if (!response.ok) {
 82      throw new Error(
 83        `Conductor truth source ${STATUS_API_SYSTEM_STATE_PATH} returned HTTP ${response.status}.`
 84      );
 85    }
 86
 87    const payload = await response.json();
 88
 89    return createStatusSnapshotFromControlApiPayload(payload, this.options.now?.() ?? new Date());
 90  }
 91}
 92
 93export class D1StatusSnapshotLoader implements StatusSnapshotLoader {
 94  private readonly repository: D1ControlPlaneRepository;
 95
 96  constructor(
 97    private readonly db: D1DatabaseLike,
 98    private readonly now: () => Date = () => new Date()
 99  ) {
100    this.repository = new D1ControlPlaneRepository(db);
101  }
102
103  async loadSnapshot(): Promise<StatusSnapshot> {
104    return readStatusSnapshot(
105      {
106        countActiveRuns: () => countRows(this.db, SELECT_ACTIVE_RUN_COUNT_SQL),
107        countQueuedTasks: () => countRows(this.db, SELECT_QUEUED_TASK_COUNT_SQL),
108        getAutomationState: () => this.repository.getAutomationState(),
109        getCurrentLease: () => this.repository.getCurrentLease()
110      },
111      this.now()
112    );
113  }
114}
115
116export async function createStatusSnapshotFromDatabase(
117  db: D1DatabaseLike,
118  observedAt: Date = new Date()
119): Promise<StatusSnapshot> {
120  return new D1StatusSnapshotLoader(db, () => observedAt).loadSnapshot();
121}
122
123export function createDefaultStatusSnapshotLoader(
124  options: DefaultStatusSnapshotLoaderOptions = {}
125): StatusSnapshotLoader {
126  return new ControlApiStatusSnapshotLoader({
127    baseUrl: resolveStatusApiTruthSourceBase(options.env),
128    fetch: options.fetch,
129    now: options.now
130  });
131}
132
133export function resolveStatusApiTruthSourceBase(env: StatusApiEnvironment = process?.env ?? {}): string {
134  return normalizeTruthSourceBaseUrl(
135    getFirstNonEmptyString(env.BAA_CONDUCTOR_LOCAL_API, env.BAA_CONTROL_API_BASE) ?? DEFAULT_STATUS_API_TRUTH_SOURCE_BASE
136  );
137}
138
139export function resolveStatusApiControlApiBase(env: StatusApiEnvironment = process?.env ?? {}): string {
140  return resolveStatusApiTruthSourceBase(env);
141}
142
143export function createStatusSnapshotFromControlApiPayload(
144  payload: unknown,
145  observedAt: Date = new Date()
146): StatusSnapshot {
147  const fallback = createEmptyStatusSnapshot(observedAt);
148  const payloadError = normalizeOptionalString(
149    getFirstDefinedValue(payload, ["error", "data.error", "message", "data.message"])
150  );
151
152  if (getFirstDefinedValue(payload, ["ok"]) === false) {
153    throw new Error(`Conductor truth source reported an error: ${payloadError ?? "unknown_error"}.`);
154  }
155
156  const mode = normalizeAutomationMode(
157    getFirstDefinedValue(payload, [
158      "data.mode",
159      "mode",
160      "data.automation.mode",
161      "automation.mode",
162      "data.system.mode",
163      "system.mode",
164      "data.system_state.mode",
165      "system_state.mode",
166      "data.systemState.mode",
167      "systemState.mode",
168      "data.state.mode",
169      "state.mode"
170    ])
171  );
172
173  if (mode == null) {
174    throw new TypeError("Conductor truth source payload did not include a valid automation mode.");
175  }
176
177  const observedAtMs = observedAt.getTime();
178  const updatedAtMs = normalizeTimestamp(
179    getFirstDefinedValue(payload, [
180      "data.updated_at",
181      "updated_at",
182      "data.updatedAt",
183      "updatedAt",
184      "data.automation.updated_at",
185      "automation.updated_at",
186      "data.automation.updatedAt",
187      "automation.updatedAt",
188      "data.state.updated_at",
189      "state.updated_at",
190      "data.state.updatedAt",
191      "state.updatedAt"
192    ])
193  );
194  const leaseExpiresAtMs = normalizeTimestamp(
195    getFirstDefinedValue(payload, [
196      "data.lease_expires_at",
197      "lease_expires_at",
198      "data.leaseExpiresAt",
199      "leaseExpiresAt",
200      "data.leader.lease_expires_at",
201      "leader.lease_expires_at",
202      "data.leader.leaseExpiresAt",
203      "leader.leaseExpiresAt"
204    ])
205  );
206
207  return {
208    source: "conductor-api",
209    mode,
210    leaderId: normalizeOptionalString(
211      getFirstDefinedValue(payload, [
212        "data.holder_id",
213        "holder_id",
214        "data.holderId",
215        "holderId",
216        "data.leader.controller_id",
217        "leader.controller_id",
218        "data.leader.controllerId",
219        "leader.controllerId",
220        "data.lease_holder",
221        "lease_holder",
222        "data.leaseHolder",
223        "leaseHolder"
224      ])
225    ),
226    leaderHost: normalizeOptionalString(
227      getFirstDefinedValue(payload, [
228        "data.holder_host",
229        "holder_host",
230        "data.holderHost",
231        "holderHost",
232        "data.leader.host",
233        "leader.host",
234        "data.leader",
235        "leader"
236      ])
237    ),
238    leaseTerm: normalizeNonNegativeInteger(
239      getFirstDefinedValue(payload, ["data.term", "term", "data.leader.term", "leader.term"])
240    ),
241    leaseExpiresAt: toIsoFromUnixMilliseconds(leaseExpiresAtMs),
242    leaseActive: leaseExpiresAtMs != null && leaseExpiresAtMs > observedAtMs,
243    queueDepth:
244      normalizeNonNegativeInteger(
245        getFirstDefinedValue(payload, [
246          "data.queue_depth",
247          "queue_depth",
248          "data.queueDepth",
249          "queueDepth",
250          "data.queued_tasks",
251          "queued_tasks",
252          "data.queuedTasks",
253          "queuedTasks",
254          "data.queue.queued_tasks",
255          "queue.queued_tasks",
256          "data.queue.queuedTasks",
257          "queue.queuedTasks",
258          "data.stats.queue_depth",
259          "stats.queue_depth",
260          "data.stats.queueDepth",
261          "stats.queueDepth"
262        ])
263      ) ?? fallback.queueDepth,
264    activeRuns:
265      normalizeNonNegativeInteger(
266        getFirstDefinedValue(payload, [
267          "data.active_runs",
268          "active_runs",
269          "data.activeRuns",
270          "activeRuns",
271          "data.queue.active_runs",
272          "queue.active_runs",
273          "data.queue.activeRuns",
274          "queue.activeRuns",
275          "data.runs.active",
276          "runs.active",
277          "data.stats.active_runs",
278          "stats.active_runs",
279          "data.stats.activeRuns",
280          "stats.activeRuns"
281        ])
282      ) ?? fallback.activeRuns,
283    updatedAt: toIsoFromUnixMilliseconds(updatedAtMs) ?? fallback.updatedAt,
284    observedAt: fallback.observedAt
285  };
286}
287
288export async function readStatusSnapshot(
289  source: StatusSnapshotSourceReader,
290  observedAt: Date = new Date()
291): Promise<StatusSnapshot> {
292  const fallback = createEmptyStatusSnapshot(observedAt);
293  const [automationState, lease, queueDepth, activeRuns] = await Promise.all([
294    source.getAutomationState(),
295    source.getCurrentLease(),
296    source.countQueuedTasks(),
297    source.countActiveRuns()
298  ]);
299
300  const observedAtUnixSeconds = toUnixSeconds(observedAt);
301  const latestDurableTimestampMs = Math.max(
302    normalizeTimestamp(automationState?.updatedAt) ?? 0,
303    normalizeTimestamp(lease?.renewedAt) ?? 0
304  );
305
306  return {
307    source: "d1",
308    mode: automationState?.mode ?? fallback.mode,
309    leaderId: lease?.holderId ?? null,
310    leaderHost: lease?.holderHost ?? null,
311    leaseTerm: lease?.term ?? null,
312    leaseExpiresAt: toIsoFromUnixSeconds(lease?.leaseExpiresAt),
313    leaseActive: lease != null && lease.leaseExpiresAt > observedAtUnixSeconds,
314    queueDepth,
315    activeRuns,
316    updatedAt:
317      latestDurableTimestampMs > 0
318        ? new Date(latestDurableTimestampMs).toISOString()
319        : fallback.updatedAt,
320    observedAt: fallback.observedAt
321  };
322}
323
324async function countRows(db: D1DatabaseLike, sql: string): Promise<number> {
325  const value = await db.prepare(sql).first<number>("value");
326
327  if (value == null) {
328    return 0;
329  }
330
331  if (!Number.isFinite(value) || value < 0) {
332    throw new TypeError(`Expected count query to return a non-negative finite number, received "${String(value)}".`);
333  }
334
335  return value;
336}
337
338function toUnixSeconds(date: Date): number {
339  return Math.floor(date.getTime() / 1000);
340}
341
342function toIsoFromUnixSeconds(value: number | null | undefined): string | null {
343  if (value == null) {
344    return null;
345  }
346
347  return new Date(value * 1000).toISOString();
348}
349
350function normalizeTruthSourceBaseUrl(value: string): string {
351  const normalized = value.trim();
352
353  if (normalized === "") {
354    return DEFAULT_STATUS_API_TRUTH_SOURCE_BASE;
355  }
356
357  return new URL(normalized).toString();
358}
359
360function getFirstNonEmptyString(...values: Array<string | undefined>): string | undefined {
361  for (const value of values) {
362    if (typeof value === "string" && value.trim() !== "") {
363      return value;
364    }
365  }
366
367  return undefined;
368}
369
370function normalizeAutomationMode(value: unknown): StatusSnapshot["mode"] | null {
371  return value === "running" || value === "draining" || value === "paused" ? value : null;
372}
373
374function normalizeNonNegativeInteger(value: unknown): number | null {
375  const numeric =
376    typeof value === "number"
377      ? value
378      : typeof value === "string" && value.trim() !== ""
379        ? Number(value)
380        : Number.NaN;
381
382  if (!Number.isInteger(numeric) || numeric < 0) {
383    return null;
384  }
385
386  return numeric;
387}
388
389function normalizeOptionalString(value: unknown): string | null {
390  if (typeof value !== "string") {
391    return null;
392  }
393
394  const normalized = value.trim();
395
396  return normalized === "" ? null : normalized;
397}
398
399function normalizeTimestamp(value: unknown): number | null {
400  const numeric =
401    typeof value === "number"
402      ? value
403      : typeof value === "string" && value.trim() !== ""
404        ? Number(value)
405        : Number.NaN;
406
407  if (!Number.isFinite(numeric) || numeric <= 0) {
408    return null;
409  }
410
411  return numeric >= 1_000_000_000_000 ? Math.trunc(numeric) : Math.trunc(numeric * 1000);
412}
413
414function getFirstDefinedValue(source: unknown, paths: readonly string[]): unknown {
415  for (const path of paths) {
416    const value = readPath(source, path);
417
418    if (value !== undefined) {
419      return value;
420    }
421  }
422
423  return undefined;
424}
425
426function readPath(source: unknown, path: string): unknown {
427  let current = source;
428
429  for (const segment of path.split(".")) {
430    if (!isRecord(current) || !(segment in current)) {
431      return undefined;
432    }
433
434    current = current[segment];
435  }
436
437  return current;
438}
439
440function isRecord(value: unknown): value is Record<string, unknown> {
441  return value != null && typeof value === "object" && !Array.isArray(value);
442}
443
444function toIsoFromUnixMilliseconds(value: number | null | undefined): string | null {
445  if (value == null) {
446    return null;
447  }
448
449  return new Date(value).toISOString();
450}