baa-conductor

git clone 

commit
9802336
parent
9506c71
author
im_wower
date
2026-03-22 15:27:07 +0800 CST
Fix status-api live truth source
8 files changed,  +366, -20
M apps/status-api/src/cli.ts
+4, -1
 1@@ -3,8 +3,8 @@ import {
 2   getDefaultStatusApiPort,
 3   runStatusApiSmokeCheck,
 4   startStatusApiServer,
 5-  type StatusApiEnvironment
 6 } from "./host.js";
 7+import type { StatusApiEnvironment } from "./contracts.js";
 8 
 9 type StatusApiCliAction = "help" | "serve" | "smoke";
10 
11@@ -37,6 +37,7 @@ export async function runStatusApiCli(options: RunStatusApiCliOptions = {}): Pro
12 
13     case "serve": {
14       const server = await startStatusApiServer({
15+        env: options.env ?? process?.env ?? {},
16         host: command.host,
17         port: command.port
18       });
19@@ -55,6 +56,7 @@ export async function runStatusApiCli(options: RunStatusApiCliOptions = {}): Pro
20 
21     case "smoke": {
22       const result = await runStatusApiSmokeCheck({
23+        env: options.env ?? process?.env ?? {},
24         host: command.host ?? getDefaultStatusApiHost(),
25         port: command.port ?? 0
26       });
27@@ -152,6 +154,7 @@ function renderStatusApiCliHelp(): string {
28     "Usage: node apps/status-api/dist/index.js [serve|smoke|help] [--host <host>] [--port <port>]",
29     "",
30     `Default listen address: http://${getDefaultStatusApiHost()}:${getDefaultStatusApiPort()}`,
31+    "Default truth source: BAA_CONTROL_API_BASE or https://control-api.makefile.so",
32     "Routes:",
33     "- GET /healthz",
34     "- GET /v1/status",
M apps/status-api/src/contracts.ts
+5, -1
 1@@ -1,8 +1,12 @@
 2 import type { AutomationMode } from "../../../packages/db/src/index.js";
 3 
 4-export type StatusSnapshotSource = "empty" | "d1";
 5+export type StatusSnapshotSource = "control-api" | "empty" | "d1";
 6 export type StatusApiRouteMethod = "GET";
 7 
 8+export interface StatusApiEnvironment {
 9+  [key: string]: string | undefined;
10+}
11+
12 export interface StatusSnapshot {
13   source: StatusSnapshotSource;
14   mode: AutomationMode;
M apps/status-api/src/data-source.ts
+308, -1
  1@@ -4,7 +4,12 @@ import {
  2   type D1DatabaseLike,
  3   type LeaderLeaseRecord
  4 } from "../../../packages/db/src/index.js";
  5-import { createEmptyStatusSnapshot, type StatusSnapshot, type StatusSnapshotLoader } from "./contracts.js";
  6+import {
  7+  createEmptyStatusSnapshot,
  8+  type StatusApiEnvironment,
  9+  type StatusSnapshot,
 10+  type StatusSnapshotLoader
 11+} from "./contracts.js";
 12 
 13 const SELECT_QUEUED_TASK_COUNT_SQL = `
 14   SELECT COUNT(*) AS value
 15@@ -19,6 +24,9 @@ const SELECT_ACTIVE_RUN_COUNT_SQL = `
 16     AND finished_at IS NULL
 17 `;
 18 
 19+const DEFAULT_CONTROL_API_BASE = "https://control-api.makefile.so";
 20+const CONTROL_API_SYSTEM_STATE_PATH = "/v1/system/state";
 21+
 22 export interface StatusSnapshotSourceReader {
 23   countActiveRuns(): Promise<number>;
 24   countQueuedTasks(): Promise<number>;
 25@@ -26,6 +34,18 @@ export interface StatusSnapshotSourceReader {
 26   getCurrentLease(): Promise<LeaderLeaseRecord | null>;
 27 }
 28 
 29+export interface ControlApiStatusSnapshotLoaderOptions {
 30+  baseUrl: string;
 31+  fetch?: typeof fetch;
 32+  now?: () => Date;
 33+}
 34+
 35+export interface DefaultStatusSnapshotLoaderOptions {
 36+  env?: StatusApiEnvironment;
 37+  fetch?: typeof fetch;
 38+  now?: () => Date;
 39+}
 40+
 41 export class StaticStatusSnapshotLoader implements StatusSnapshotLoader {
 42   constructor(private readonly snapshot: StatusSnapshot = createEmptyStatusSnapshot()) {}
 43 
 44@@ -34,6 +54,42 @@ export class StaticStatusSnapshotLoader implements StatusSnapshotLoader {
 45   }
 46 }
 47 
 48+export class ControlApiStatusSnapshotLoader implements StatusSnapshotLoader {
 49+  private readonly baseUrl: string;
 50+  private readonly fetchImpl: typeof fetch;
 51+
 52+  constructor(
 53+    private readonly options: ControlApiStatusSnapshotLoaderOptions
 54+  ) {
 55+    this.baseUrl = normalizeControlApiBaseUrl(options.baseUrl);
 56+    this.fetchImpl = options.fetch ?? globalThis.fetch;
 57+
 58+    if (typeof this.fetchImpl !== "function") {
 59+      throw new Error("Status API requires a fetch implementation to read the control plane truth source.");
 60+    }
 61+  }
 62+
 63+  async loadSnapshot(): Promise<StatusSnapshot> {
 64+    const response = await this.fetchImpl(new URL(CONTROL_API_SYSTEM_STATE_PATH, this.baseUrl), {
 65+      method: "GET",
 66+      headers: {
 67+        Accept: "application/json",
 68+        "Cache-Control": "no-store"
 69+      }
 70+    });
 71+
 72+    if (!response.ok) {
 73+      throw new Error(
 74+        `Control API truth source ${CONTROL_API_SYSTEM_STATE_PATH} returned HTTP ${response.status}.`
 75+      );
 76+    }
 77+
 78+    const payload = await response.json();
 79+
 80+    return createStatusSnapshotFromControlApiPayload(payload, this.options.now?.() ?? new Date());
 81+  }
 82+}
 83+
 84 export class D1StatusSnapshotLoader implements StatusSnapshotLoader {
 85   private readonly repository: D1ControlPlaneRepository;
 86 
 87@@ -64,6 +120,165 @@ export async function createStatusSnapshotFromDatabase(
 88   return new D1StatusSnapshotLoader(db, () => observedAt).loadSnapshot();
 89 }
 90 
 91+export function createDefaultStatusSnapshotLoader(
 92+  options: DefaultStatusSnapshotLoaderOptions = {}
 93+): StatusSnapshotLoader {
 94+  return new ControlApiStatusSnapshotLoader({
 95+    baseUrl: resolveStatusApiControlApiBase(options.env),
 96+    fetch: options.fetch,
 97+    now: options.now
 98+  });
 99+}
100+
101+export function resolveStatusApiControlApiBase(env: StatusApiEnvironment = process?.env ?? {}): string {
102+  return normalizeControlApiBaseUrl(env.BAA_CONTROL_API_BASE ?? DEFAULT_CONTROL_API_BASE);
103+}
104+
105+export function createStatusSnapshotFromControlApiPayload(
106+  payload: unknown,
107+  observedAt: Date = new Date()
108+): StatusSnapshot {
109+  const fallback = createEmptyStatusSnapshot(observedAt);
110+  const payloadError = normalizeOptionalString(
111+    getFirstDefinedValue(payload, ["error", "data.error", "message", "data.message"])
112+  );
113+
114+  if (getFirstDefinedValue(payload, ["ok"]) === false) {
115+    throw new Error(`Control API truth source reported an error: ${payloadError ?? "unknown_error"}.`);
116+  }
117+
118+  const mode = normalizeAutomationMode(
119+    getFirstDefinedValue(payload, [
120+      "data.mode",
121+      "mode",
122+      "data.automation.mode",
123+      "automation.mode",
124+      "data.system.mode",
125+      "system.mode",
126+      "data.system_state.mode",
127+      "system_state.mode",
128+      "data.systemState.mode",
129+      "systemState.mode",
130+      "data.state.mode",
131+      "state.mode"
132+    ])
133+  );
134+
135+  if (mode == null) {
136+    throw new TypeError("Control API truth source payload did not include a valid automation mode.");
137+  }
138+
139+  const observedAtMs = observedAt.getTime();
140+  const updatedAtMs = normalizeTimestamp(
141+    getFirstDefinedValue(payload, [
142+      "data.updated_at",
143+      "updated_at",
144+      "data.updatedAt",
145+      "updatedAt",
146+      "data.automation.updated_at",
147+      "automation.updated_at",
148+      "data.automation.updatedAt",
149+      "automation.updatedAt",
150+      "data.state.updated_at",
151+      "state.updated_at",
152+      "data.state.updatedAt",
153+      "state.updatedAt"
154+    ])
155+  );
156+  const leaseExpiresAtMs = normalizeTimestamp(
157+    getFirstDefinedValue(payload, [
158+      "data.lease_expires_at",
159+      "lease_expires_at",
160+      "data.leaseExpiresAt",
161+      "leaseExpiresAt",
162+      "data.leader.lease_expires_at",
163+      "leader.lease_expires_at",
164+      "data.leader.leaseExpiresAt",
165+      "leader.leaseExpiresAt"
166+    ])
167+  );
168+
169+  return {
170+    source: "control-api",
171+    mode,
172+    leaderId: normalizeOptionalString(
173+      getFirstDefinedValue(payload, [
174+        "data.holder_id",
175+        "holder_id",
176+        "data.holderId",
177+        "holderId",
178+        "data.leader.controller_id",
179+        "leader.controller_id",
180+        "data.leader.controllerId",
181+        "leader.controllerId",
182+        "data.lease_holder",
183+        "lease_holder",
184+        "data.leaseHolder",
185+        "leaseHolder"
186+      ])
187+    ),
188+    leaderHost: normalizeOptionalString(
189+      getFirstDefinedValue(payload, [
190+        "data.holder_host",
191+        "holder_host",
192+        "data.holderHost",
193+        "holderHost",
194+        "data.leader.host",
195+        "leader.host",
196+        "data.leader",
197+        "leader"
198+      ])
199+    ),
200+    leaseTerm: normalizeNonNegativeInteger(
201+      getFirstDefinedValue(payload, ["data.term", "term", "data.leader.term", "leader.term"])
202+    ),
203+    leaseExpiresAt: toIsoFromUnixMilliseconds(leaseExpiresAtMs),
204+    leaseActive: leaseExpiresAtMs != null && leaseExpiresAtMs > observedAtMs,
205+    queueDepth:
206+      normalizeNonNegativeInteger(
207+        getFirstDefinedValue(payload, [
208+          "data.queue_depth",
209+          "queue_depth",
210+          "data.queueDepth",
211+          "queueDepth",
212+          "data.queued_tasks",
213+          "queued_tasks",
214+          "data.queuedTasks",
215+          "queuedTasks",
216+          "data.queue.queued_tasks",
217+          "queue.queued_tasks",
218+          "data.queue.queuedTasks",
219+          "queue.queuedTasks",
220+          "data.stats.queue_depth",
221+          "stats.queue_depth",
222+          "data.stats.queueDepth",
223+          "stats.queueDepth"
224+        ])
225+      ) ?? fallback.queueDepth,
226+    activeRuns:
227+      normalizeNonNegativeInteger(
228+        getFirstDefinedValue(payload, [
229+          "data.active_runs",
230+          "active_runs",
231+          "data.activeRuns",
232+          "activeRuns",
233+          "data.queue.active_runs",
234+          "queue.active_runs",
235+          "data.queue.activeRuns",
236+          "queue.activeRuns",
237+          "data.runs.active",
238+          "runs.active",
239+          "data.stats.active_runs",
240+          "stats.active_runs",
241+          "data.stats.activeRuns",
242+          "stats.activeRuns"
243+        ])
244+      ) ?? fallback.activeRuns,
245+    updatedAt: toIsoFromUnixMilliseconds(updatedAtMs) ?? fallback.updatedAt,
246+    observedAt: fallback.observedAt
247+  };
248+}
249+
250 export async function readStatusSnapshot(
251   source: StatusSnapshotSourceReader,
252   observedAt: Date = new Date()
253@@ -120,3 +335,95 @@ function toIsoFromUnixSeconds(value: number | null | undefined): string | null {
254 
255   return new Date(value * 1000).toISOString();
256 }
257+
258+function normalizeControlApiBaseUrl(value: string): string {
259+  const normalized = value.trim();
260+
261+  if (normalized === "") {
262+    return DEFAULT_CONTROL_API_BASE;
263+  }
264+
265+  return new URL(normalized).toString();
266+}
267+
268+function normalizeAutomationMode(value: unknown): StatusSnapshot["mode"] | null {
269+  return value === "running" || value === "draining" || value === "paused" ? value : null;
270+}
271+
272+function normalizeNonNegativeInteger(value: unknown): number | null {
273+  const numeric =
274+    typeof value === "number"
275+      ? value
276+      : typeof value === "string" && value.trim() !== ""
277+        ? Number(value)
278+        : Number.NaN;
279+
280+  if (!Number.isInteger(numeric) || numeric < 0) {
281+    return null;
282+  }
283+
284+  return numeric;
285+}
286+
287+function normalizeOptionalString(value: unknown): string | null {
288+  if (typeof value !== "string") {
289+    return null;
290+  }
291+
292+  const normalized = value.trim();
293+
294+  return normalized === "" ? null : normalized;
295+}
296+
297+function normalizeTimestamp(value: unknown): number | null {
298+  const numeric =
299+    typeof value === "number"
300+      ? value
301+      : typeof value === "string" && value.trim() !== ""
302+        ? Number(value)
303+        : Number.NaN;
304+
305+  if (!Number.isFinite(numeric) || numeric <= 0) {
306+    return null;
307+  }
308+
309+  return numeric >= 1_000_000_000_000 ? Math.trunc(numeric) : Math.trunc(numeric * 1000);
310+}
311+
312+function getFirstDefinedValue(source: unknown, paths: readonly string[]): unknown {
313+  for (const path of paths) {
314+    const value = readPath(source, path);
315+
316+    if (value !== undefined) {
317+      return value;
318+    }
319+  }
320+
321+  return undefined;
322+}
323+
324+function readPath(source: unknown, path: string): unknown {
325+  let current = source;
326+
327+  for (const segment of path.split(".")) {
328+    if (!isRecord(current) || !(segment in current)) {
329+      return undefined;
330+    }
331+
332+    current = current[segment];
333+  }
334+
335+  return current;
336+}
337+
338+function isRecord(value: unknown): value is Record<string, unknown> {
339+  return value != null && typeof value === "object" && !Array.isArray(value);
340+}
341+
342+function toIsoFromUnixMilliseconds(value: number | null | undefined): string | null {
343+  if (value == null) {
344+    return null;
345+  }
346+
347+  return new Date(value).toISOString();
348+}
M apps/status-api/src/host.ts
+15, -9
 1@@ -1,5 +1,5 @@
 2 import { createStatusApiRuntime, describeStatusApiRuntimeSurface, type StatusApiRuntime, type StatusApiRuntimeOptions } from "./runtime.js";
 3-import type { StatusApiResponse } from "./contracts.js";
 4+import type { StatusApiEnvironment, StatusApiResponse } from "./contracts.js";
 5 
 6 const DEFAULT_STATUS_API_HOST = "127.0.0.1";
 7 const DEFAULT_STATUS_API_PORT = 4318;
 8@@ -10,10 +10,6 @@ const ERROR_HEADERS = {
 9   "cache-control": "no-store"
10 } as const;
11 
12-export interface StatusApiEnvironment {
13-  [key: string]: string | undefined;
14-}
15-
16 export interface StatusApiListenOptions {
17   host: string;
18   port: number;
19@@ -97,8 +93,10 @@ export function createStatusApiNodeRequestListener(runtime: StatusApiRuntime) {
20 
21 export async function startStatusApiServer(options: StatusApiServerOptions = {}): Promise<StatusApiServer> {
22   const { createServer } = await importNodeHttp();
23-  const listenOptions = resolveStatusApiListenOptions(options);
24+  const env = options.env ?? process?.env ?? {};
25+  const listenOptions = resolveStatusApiListenOptions(options, env);
26   const runtime = createStatusApiRuntime({
27+    env,
28     snapshotLoader: options.snapshotLoader
29   });
30   const server = createServer(createStatusApiNodeRequestListener(runtime));
31@@ -328,20 +326,28 @@ async function verifyStatus(baseUrl: string): Promise<StatusApiSmokeCheck> {
32   const payload = (await response.json()) as {
33     ok?: boolean;
34     data?: {
35+      leaderId?: string | null;
36+      mode?: string;
37       source?: string;
38     };
39   };
40+  const snapshot = payload.data;
41 
42   assertStatus(response, 200, "/v1/status");
43 
44-  if (payload.ok !== true || payload.data?.source !== "empty") {
45-    throw new Error(`Expected /v1/status to return an empty snapshot envelope, received ${JSON.stringify(payload)}.`);
46+  if (
47+    payload.ok !== true ||
48+    snapshot == null ||
49+    snapshot.source === "empty" ||
50+    !["running", "draining", "paused"].includes(snapshot.mode ?? "")
51+  ) {
52+    throw new Error(`Expected /v1/status to return a non-empty live snapshot, received ${JSON.stringify(payload)}.`);
53   }
54 
55   return {
56     path: "/v1/status",
57     status: response.status,
58-    detail: `source=${payload.data.source}`
59+    detail: `source=${snapshot.source} mode=${snapshot.mode ?? "unknown"} leaderId=${snapshot.leaderId ?? "null"}`
60   };
61 }
62 
M apps/status-api/src/render.ts
+5, -5
 1@@ -2,8 +2,8 @@ import type { StatusSnapshot } from "./contracts.js";
 2 
 3 export function renderStatusPage(snapshot: StatusSnapshot): string {
 4   const modeLabel = formatMode(snapshot.mode);
 5-  const leaderLabel = snapshot.leaderHost ?? "No active leader lease";
 6-  const leaderDetail = snapshot.leaderId == null ? "Waiting for a holder in D1." : snapshot.leaderId;
 7+  const leaderLabel = snapshot.leaderHost ?? snapshot.leaderId ?? "No active leader lease";
 8+  const leaderDetail = snapshot.leaderId == null ? "Truth source did not report an active holder." : `holder_id=${snapshot.leaderId}`;
 9   const leaseLabel = snapshot.leaseExpiresAt == null ? "No lease expiry" : formatTimestamp(snapshot.leaseExpiresAt);
10   const leaseDetail = snapshot.leaseActive ? "Lease is currently valid." : "Lease is missing or stale.";
11 
12@@ -233,10 +233,10 @@ export function renderStatusPage(snapshot: StatusSnapshot): string {
13       <section class="grid" aria-label="status metrics">
14         ${renderMetricCard("Mode", modeLabel, describeMode(snapshot.mode), true)}
15         ${renderMetricCard("Leader", leaderLabel, leaderDetail)}
16-        ${renderMetricCard("Queue Depth", String(snapshot.queueDepth), "Queued tasks waiting to be claimed.")}
17-        ${renderMetricCard("Active Runs", String(snapshot.activeRuns), "Task runs with a started timestamp and no finish timestamp.")}
18+        ${renderMetricCard("Queue Depth", String(snapshot.queueDepth), "Queued tasks reported by the current truth source.")}
19+        ${renderMetricCard("Active Runs", String(snapshot.activeRuns), "Active runs reported by the current truth source.")}
20         ${renderMetricCard("Lease Expiry", leaseLabel, leaseDetail)}
21-        ${renderMetricCard("Snapshot Updated", formatTimestamp(snapshot.updatedAt), "Latest durable timestamp observed from automation state or lease renewal.")}
22+        ${renderMetricCard("Snapshot Updated", formatTimestamp(snapshot.updatedAt), "Latest timestamp reported by the truth source, or the observation time when none is exposed.")}
23       </section>
24 
25       <section class="footer">
M apps/status-api/src/runtime.ts
+11, -3
 1@@ -1,5 +1,12 @@
 2-import { StaticStatusSnapshotLoader } from "./data-source.js";
 3-import type { StatusApiHandler, StatusApiRequest, StatusApiResponse, StatusApiRoute, StatusSnapshotLoader } from "./contracts.js";
 4+import { createDefaultStatusSnapshotLoader } from "./data-source.js";
 5+import type {
 6+  StatusApiEnvironment,
 7+  StatusApiHandler,
 8+  StatusApiRequest,
 9+  StatusApiResponse,
10+  StatusApiRoute,
11+  StatusSnapshotLoader
12+} from "./contracts.js";
13 import { createStatusApiHandler } from "./service.js";
14 
15 export interface StatusApiRuntime extends StatusApiHandler {
16@@ -7,11 +14,12 @@ export interface StatusApiRuntime extends StatusApiHandler {
17 }
18 
19 export interface StatusApiRuntimeOptions {
20+  env?: StatusApiEnvironment;
21   snapshotLoader?: StatusSnapshotLoader;
22 }
23 
24 export function createStatusApiRuntime(options: StatusApiRuntimeOptions = {}): StatusApiRuntime {
25-  const handler = createStatusApiHandler(options.snapshotLoader ?? new StaticStatusSnapshotLoader());
26+  const handler = createStatusApiHandler(options.snapshotLoader ?? createDefaultStatusSnapshotLoader({ env: options.env }));
27 
28   return {
29     routes: handler.routes,
M docs/runtime/environment.md
+3, -0
 1@@ -12,6 +12,8 @@
 2 - `BAA_TMP_DIR`
 3 - `BAA_STATE_DIR`
 4 
 5+`status-api` 默认把 `BAA_CONTROL_API_BASE` 当成 live truth source,并读取 `${BAA_CONTROL_API_BASE}/v1/system/state`。
 6+
 7 ## 节点变量
 8 
 9 ```text
10@@ -21,6 +23,7 @@ BAA_NODE_ID=mini-main
11 BAA_CONDUCTOR_LOCAL_API=http://100.71.210.78:4317
12 BAA_CONDUCTOR_LOCAL_API_ALLOWED_HOSTS=100.71.210.78
13 BAA_STATUS_API_HOST=100.71.210.78
14+BAA_CONTROL_API_BASE=https://control-api.makefile.so
15 ```
16 
17 ## 最小例子
M docs/runtime/node-verification.md
+15, -0
 1@@ -36,6 +36,21 @@ npx --yes pnpm -r build
 2   --check-loaded
 3 ```
 4 
 5+关键状态对齐可以直接再跑一组比对:
 6+
 7+```bash
 8+curl -fsSL https://control-api.makefile.so/v1/system/state
 9+curl -fsSL http://100.71.210.78:4318/v1/status
10+```
11+
12+至少确认这些字段一致:
13+
14+- `mode`
15+- `holder_id` vs `leaderId`
16+- `term`
17+- `lease_expires_at` vs `leaseExpiresAt`
18+- `source` 不再是 `empty`
19+
20 ## 常见失败点
21 
22 - `conductor /rolez` 不是 `leader`