baa-conductor

git clone 

commit
99c995e
parent
ca554cc
author
codex@macbookpro
date
2026-03-31 18:02:47 +0800 CST
fix: unify system_state timestamps to milliseconds
12 files changed,  +299, -11
M apps/conductor-daemon/src/index.test.js
+129, -2
  1@@ -3199,6 +3199,7 @@ test("renewal projector scans settled messages with cursor semantics and skips i
  2 
  3     const cursorState = await repository.getSystemState("renewal.projector.cursor");
  4     assert.ok(cursorState);
  5+    assert.equal(cursorState.updatedAt, nowMs);
  6     assert.deepEqual(JSON.parse(cursorState.valueJson), {
  7       message_id: missingTabIdMessage.id,
  8       observed_at: missingTabIdMessage.observedAt
  9@@ -3271,6 +3272,128 @@ test("renewal projector scans settled messages with cursor semantics and skips i
 10   }
 11 });
 12 
 13+test("renewal projector restores cursor from valueJson even when legacy system_state.updated_at is second-based", async () => {
 14+  const rootDir = mkdtempSync(join(tmpdir(), "baa-renewal-projector-cursor-restore-legacy-"));
 15+  const stateDir = join(rootDir, "state");
 16+  const logsDir = mkdtempSync(join(tmpdir(), "baa-renewal-projector-cursor-restore-legacy-logs-"));
 17+  const localApiFixture = await createLocalApiFixture({
 18+    databasePath: join(rootDir, "control-plane.sqlite")
 19+  });
 20+  const artifactStore = new ArtifactStore({
 21+    artifactDir: join(stateDir, ARTIFACTS_DIRNAME),
 22+    databasePath: join(stateDir, ARTIFACT_DB_FILENAME),
 23+    publicBaseUrl: "https://artifacts.example.test"
 24+  });
 25+  const nowMs = Date.UTC(2026, 2, 30, 12, 0, 0);
 26+
 27+  try {
 28+    await artifactStore.upsertLocalConversation({
 29+      automationStatus: "auto",
 30+      localConversationId: "lc_restore_cursor",
 31+      platform: "claude",
 32+      updatedAt: nowMs - 60_000
 33+    });
 34+    await artifactStore.upsertConversationLink({
 35+      clientId: "firefox-restore-cursor",
 36+      linkId: "link_restore_cursor",
 37+      localConversationId: "lc_restore_cursor",
 38+      observedAt: nowMs - 60_000,
 39+      pageUrl: "https://claude.ai/chat/conv_restore_cursor",
 40+      platform: "claude",
 41+      remoteConversationId: "conv_restore_cursor",
 42+      targetId: "tab:17",
 43+      targetKind: "browser.proxy_delivery",
 44+      targetPayload: {
 45+        clientId: "firefox-restore-cursor",
 46+        tabId: 17
 47+      }
 48+    });
 49+
 50+    const firstMessage = await artifactStore.insertMessage({
 51+      conversationId: "conv_restore_cursor",
 52+      id: "msg_restore_cursor_first",
 53+      observedAt: nowMs - 30_000,
 54+      platform: "claude",
 55+      rawText: "already processed before unit migration",
 56+      role: "assistant"
 57+    });
 58+    const secondMessage = await artifactStore.insertMessage({
 59+      conversationId: "conv_restore_cursor",
 60+      id: "msg_restore_cursor_second",
 61+      observedAt: nowMs - 20_000,
 62+      platform: "claude",
 63+      rawText: "should project after restoring legacy cursor",
 64+      role: "assistant"
 65+    });
 66+
 67+    await localApiFixture.repository.putSystemState({
 68+      stateKey: "renewal.projector.cursor",
 69+      updatedAt: Math.floor(nowMs / 1000),
 70+      valueJson: JSON.stringify({
 71+        message_id: firstMessage.id,
 72+        observed_at: firstMessage.observedAt
 73+      })
 74+    });
 75+
 76+    const timedJobs = new ConductorTimedJobs(
 77+      {
 78+        intervalMs: 5_000,
 79+        maxMessagesPerTick: 10,
 80+        maxTasksPerTick: 10,
 81+        settleDelayMs: 0
 82+      },
 83+      {
 84+        artifactStore,
 85+        autoStart: false,
 86+        logDir: logsDir,
 87+        schedule: async (work) => {
 88+          await work({
 89+            controllerId: "mini-main",
 90+            host: "mini",
 91+            term: 2
 92+          });
 93+          return "scheduled";
 94+        }
 95+      }
 96+    );
 97+    timedJobs.registerRunner(
 98+      createRenewalProjectorRunner({
 99+        now: () => nowMs,
100+        repository: localApiFixture.repository
101+      })
102+    );
103+
104+    await timedJobs.start();
105+    const tick = await timedJobs.runTick("manual");
106+    assert.equal(tick.decision, "scheduled");
107+
108+    const jobs = await artifactStore.listRenewalJobs({});
109+    assert.equal(jobs.length, 1);
110+    assert.equal(jobs[0].messageId, secondMessage.id);
111+
112+    const cursorState = await localApiFixture.repository.getSystemState("renewal.projector.cursor");
113+    assert.ok(cursorState);
114+    assert.equal(cursorState.updatedAt, nowMs);
115+    assert.deepEqual(JSON.parse(cursorState.valueJson), {
116+      message_id: secondMessage.id,
117+      observed_at: secondMessage.observedAt
118+    });
119+
120+    await timedJobs.stop();
121+  } finally {
122+    artifactStore.close();
123+    localApiFixture.controlPlane.close();
124+    rmSync(rootDir, {
125+      force: true,
126+      recursive: true
127+    });
128+    rmSync(logsDir, {
129+      force: true,
130+      recursive: true
131+    });
132+  }
133+});
134+
135 test("shouldRenew keeps route_unavailable while exposing structured route failure details", async () => {
136   const nowMs = Date.UTC(2026, 2, 30, 10, 5, 0);
137   const baseCandidate = {
138@@ -4965,8 +5088,12 @@ test("handleConductorHttpRequest serves the migrated local business endpoints fr
139       localApiContext
140     );
141     assert.equal(pauseResponse.status, 200);
142-    assert.equal(parseJsonBody(pauseResponse).data.mode, "paused");
143-    assert.equal((await repository.getAutomationState())?.mode, "paused");
144+    const pausePayload = parseJsonBody(pauseResponse);
145+    assert.equal(pausePayload.data.mode, "paused");
146+    const automationState = await repository.getAutomationState();
147+    assert.equal(automationState?.mode, "paused");
148+    assert.equal(pausePayload.data.updated_at, automationState?.updatedAt);
149+    assert.equal(pausePayload.data.automation.updated_at, automationState?.updatedAt);
150 
151     const missingTokenExecResponse = await handleConductorHttpRequest(
152       {
M apps/conductor-daemon/src/local-api.ts
+4, -2
 1@@ -837,7 +837,7 @@ function toUnixMilliseconds(value: number | null | undefined): number | null {
 2     return null;
 3   }
 4 
 5-  return value * 1000;
 6+  return value >= 1_000_000_000_000 ? Math.trunc(value) : Math.trunc(value * 1000);
 7 }
 8 
 9 function resolveLeadershipRole(snapshot: ConductorRuntimeApiSnapshot): "leader" | "standby" {
10@@ -6312,9 +6312,11 @@ export async function setAutomationMode(
11   repository: ControlPlaneRepository,
12   input: SetAutomationModeInput
13 ): Promise<JsonObject> {
14+  const updatedAt = toUnixMilliseconds(input.updatedAt) ?? Date.now();
15+
16   await repository.putSystemState({
17     stateKey: AUTOMATION_STATE_KEY,
18-    updatedAt: input.updatedAt,
19+    updatedAt,
20     valueJson: JSON.stringify({
21       mode: input.mode,
22       ...(input.requestedBy ? { requested_by: input.requestedBy } : {}),
M apps/conductor-daemon/src/renewal/projector.ts
+1, -1
1@@ -653,7 +653,7 @@ async function saveCursor(
2 ): Promise<void> {
3   await repository.putSystemState({
4     stateKey: cursorStateKey,
5-    updatedAt: Math.floor(now / 1000),
6+    updatedAt: now,
7     valueJson: JSON.stringify({
8       message_id: cursor.id,
9       observed_at: cursor.observedAt
M apps/status-api/src/data-source.ts
+7, -2
 1@@ -298,7 +298,10 @@ export async function readStatusSnapshot(
 2   ]);
 3 
 4   const observedAtUnixSeconds = toUnixSeconds(observedAt);
 5-  const latestDurableTimestamp = Math.max(automationState?.updatedAt ?? 0, lease?.renewedAt ?? 0);
 6+  const latestDurableTimestampMs = Math.max(
 7+    normalizeTimestamp(automationState?.updatedAt) ?? 0,
 8+    normalizeTimestamp(lease?.renewedAt) ?? 0
 9+  );
10 
11   return {
12     source: "d1",
13@@ -311,7 +314,9 @@ export async function readStatusSnapshot(
14     queueDepth,
15     activeRuns,
16     updatedAt:
17-      latestDurableTimestamp > 0 ? toIsoFromUnixSeconds(latestDurableTimestamp) ?? fallback.updatedAt : fallback.updatedAt,
18+      latestDurableTimestampMs > 0
19+        ? new Date(latestDurableTimestampMs).toISOString()
20+        : fallback.updatedAt,
21     observedAt: fallback.observedAt
22   };
23 }
M apps/status-api/src/index.test.js
+37, -0
 1@@ -3,6 +3,7 @@ import test from "node:test";
 2 
 3 import {
 4   createStatusSnapshotFromControlApiPayload,
 5+  readStatusSnapshot,
 6   resolveStatusApiControlApiBase,
 7   resolveStatusApiTruthSourceBase,
 8   StaticStatusSnapshotLoader
 9@@ -48,6 +49,42 @@ test("status snapshots mark conductor-api as the upstream source", () => {
10   assert.equal(snapshot.source, "conductor-api");
11 });
12 
13+test("status snapshots normalize millisecond automation timestamps against second-based D1 lease timestamps", async () => {
14+  const snapshot = await readStatusSnapshot(
15+    {
16+      countActiveRuns: async () => 1,
17+      countQueuedTasks: async () => 2,
18+      getAutomationState: async () => ({
19+        mode: "paused",
20+        stateKey: "automation",
21+        updatedAt: Date.UTC(2026, 2, 31, 12, 4, 0),
22+        valueJson: JSON.stringify({
23+          mode: "paused"
24+        })
25+      }),
26+      getCurrentLease: async () => ({
27+        holderHost: "mini",
28+        holderId: "mini-main",
29+        leaseExpiresAt: Math.floor(Date.UTC(2026, 2, 31, 12, 10, 0) / 1000),
30+        leaseName: "global",
31+        metadataJson: null,
32+        preferredHolderId: "mini-main",
33+        renewedAt: Math.floor(Date.UTC(2026, 2, 31, 12, 5, 0) / 1000),
34+        term: 7
35+      })
36+    },
37+    new Date("2026-03-31T12:05:30.000Z")
38+  );
39+
40+  assert.equal(snapshot.mode, "paused");
41+  assert.equal(snapshot.source, "d1");
42+  assert.equal(snapshot.queueDepth, 2);
43+  assert.equal(snapshot.activeRuns, 1);
44+  assert.equal(snapshot.leaseExpiresAt, "2026-03-31T12:10:00.000Z");
45+  assert.equal(snapshot.leaseActive, true);
46+  assert.equal(snapshot.updatedAt, "2026-03-31T12:05:00.000Z");
47+});
48+
49 test("status-api describe reports conductor local truth with legacy compatibility note", async () => {
50   const handler = createStatusApiHandler(new StaticStatusSnapshotLoader(), {
51     truthSourceBaseUrl: "http://100.71.210.78:4317"
M bugs/README.md
+1, -0
1@@ -51,6 +51,7 @@ bugs/
2 | BUG-029 | FIXED | 已停用 conversation link 不会再被远端对话查询命中 |
3 | BUG-030 | FIXED | `targetId` 匹配现在绝对优先于弱信号叠加 |
4 | BUG-035 | FIXED | `remote_conversation_id = NULL` 的 link 现在会按 route/page identity 收敛为唯一 canonical row |
5+| BUG-036 | FIXED | projector cursor 和 `system_state.updated_at` 现在统一为毫秒口径,并兼容 legacy 秒值恢复 |
6 | MISSING-001 | FIXED | 执行结果已经接到 AI 对话 delivery 主链 |
7 | MISSING-002 | FIXED | 插件侧 delivery plan 执行器已落地 |
8 | MISSING-003 | FIXED | Phase 1 已补齐 browser.claude target |
A bugs/archive/BUG-036-projector-cursor-updated-at-unit-mismatch.md
+37, -0
 1@@ -0,0 +1,37 @@
 2+# BUG-036: projector cursor updatedAt 秒级与 observedAt 毫秒级不一致
 3+
 4+> 提交者:Claude
 5+> 日期:2026-03-30
 6+
 7+## 现象
 8+
 9+`projector.ts` 的 `saveCursor` 使用 `Math.floor(now / 1000)` 将毫秒转为秒级写入 `system_state.updated_at`:
10+
11+```typescript
12+await repository.putSystemState({
13+  stateKey: cursorStateKey,
14+  updatedAt: Math.floor(now / 1000),  // 秒级
15+  valueJson: JSON.stringify({
16+    message_id: cursor.id,
17+    observed_at: cursor.observedAt     // 毫秒级
18+  })
19+});
20+```
21+
22+而 `system_state` 表的 `updated_at` 列定义为 `INTEGER NOT NULL`,没有明确单位约定。如果后续有其他代码读 `system_state.updated_at` 并假设毫秒级,会出现约 1000 倍的时间偏差。
23+
24+## 当前影响
25+
26+cursor 本身的值存在 `valueJson` 里(`observed_at` 是毫秒级),cursor 恢复时读的是 `valueJson` 而不是 `updated_at`,因此当前功能不受影响。
27+
28+但 `updated_at` 的单位不一致是潜在的混淆源。
29+
30+## 建议修复
31+
32+统一 `system_state.updated_at` 为毫秒级(与其他所有 `*_at` 字段一致),或在 `system_state` schema 注释中明确标注单位。
33+
34+`ops/sql/schema.sql` 中现有的 `system_state` 初始化使用 `CAST(strftime('%s', 'now') AS INTEGER)` 即秒级。如果要统一为毫秒级,需要一起改。
35+
36+## 优先级
37+
38+低。当前不影响功能,但单位不一致是后续维护的隐患。
A bugs/archive/FIX-BUG-036.md
+51, -0
 1@@ -0,0 +1,51 @@
 2+# FIX-BUG-036: projector cursor system_state 时间单位统一到毫秒
 3+
 4+## 执行状态
 5+
 6+- 已完成(2026-03-31,代码 + 自动化验证已落地)
 7+
 8+## 关联 Bug
 9+
10+- `BUG-036-projector-cursor-updated-at-unit-mismatch.md`
11+
12+## 实际修改文件
13+
14+- `apps/conductor-daemon/src/renewal/projector.ts`
15+- `apps/conductor-daemon/src/local-api.ts`
16+- `apps/conductor-daemon/src/index.test.js`
17+- `apps/status-api/src/data-source.ts`
18+- `apps/status-api/src/index.test.js`
19+- `packages/db/src/index.ts`
20+- `packages/db/src/index.test.js`
21+- `ops/sql/schema.sql`
22+- `ops/sql/migrations/0001_init.sql`
23+
24+## 实际修改
25+
26+- 把 renewal projector cursor 保存到 `system_state` 时的 `updated_at` 从秒级改为毫秒级,和 `valueJson.observed_at` 保持同一单位。
27+- 保持 cursor 恢复语义不变:
28+  - 仍然只从 `valueJson.message_id` / `valueJson.observed_at` 恢复 cursor
29+  - 不依赖 `system_state.updated_at` 决定恢复位置
30+- 把 `automation` 这条 `system_state` 的写入链统一到毫秒级:
31+  - `packages/db` 的 `ensureAutomationState()` 和 `setAutomationMode()` 默认写毫秒
32+  - `local-api` 的 `setAutomationMode()` 在落库前把秒值规范化成毫秒,兼容旧调用点
33+- 更新 `system_state` 初始化 SQL 和首个 migration,让默认 `automation.updated_at` 以毫秒单位写入。
34+- 补上兼容读取:
35+  - `local-api` 返回系统状态时接受秒或毫秒输入,统一输出毫秒
36+  - `status-api` 读取 D1 快照时对 `automation.updatedAt` 做单位归一化,避免和仍为秒级的 lease 时间比较时出现混算
37+
38+## 回归测试
39+
40+- `packages/db`:覆盖 schema 初始化和 `ensureAutomationState()` 回填后的 `system_state.updated_at` 为毫秒
41+- `apps/conductor-daemon`:
42+  - 覆盖 projector saveCursor 写入毫秒级 `updated_at`
43+  - 覆盖 legacy 秒级 `updated_at` 的 cursor 仍能按 `valueJson` 正常恢复
44+  - 覆盖 `/v1/system/pause` 返回的 automation `updated_at` 与库内毫秒值一致
45+- `apps/status-api`:覆盖毫秒级 automation 时间和秒级 lease 时间并存时的快照归一化
46+
47+## 验证
48+
49+- `pnpm -C packages/db test`
50+- `pnpm -C apps/status-api test`
51+- `pnpm -C apps/conductor-daemon test`
52+- `pnpm build`
M ops/sql/migrations/0001_init.sql
+1, -1
1@@ -212,7 +212,7 @@ INSERT INTO system_state (
2 VALUES (
3   'automation',
4   '{"mode":"running"}',
5-  CAST(strftime('%s', 'now') AS INTEGER)
6+  CAST(strftime('%s', 'now') AS INTEGER) * 1000
7 )
8 ON CONFLICT(state_key) DO NOTHING;
9 
M ops/sql/schema.sql
+1, -1
1@@ -212,7 +212,7 @@ INSERT INTO system_state (
2 VALUES (
3   'automation',
4   '{"mode":"running"}',
5-  CAST(strftime('%s', 'now') AS INTEGER)
6+  CAST(strftime('%s', 'now') AS INTEGER) * 1000
7 )
8 ON CONFLICT(state_key) DO NOTHING;
9 
M packages/db/src/index.test.js
+24, -0
 1@@ -103,6 +103,30 @@ test("buildLeaderLeaseRecord increments term for takeover and marks standby resp
 2   assert.equal(response.holderId, "mini-main");
 3 });
 4 
 5+test("control-plane automation system_state timestamps are initialized and backfilled in milliseconds", async () => {
 6+  const db = new SqliteD1Database(":memory:", {
 7+    schemaSql: CONTROL_PLANE_SCHEMA_SQL
 8+  });
 9+  const repository = new D1ControlPlaneRepository(db);
10+
11+  try {
12+    const initialState = await repository.getAutomationState();
13+    assert.ok(initialState);
14+    assert.equal(initialState.mode, "running");
15+    assert.ok(initialState.updatedAt >= 1_000_000_000_000);
16+
17+    await db.prepare("DELETE FROM system_state WHERE state_key = 'automation'").run();
18+    await repository.ensureAutomationState("paused");
19+
20+    const recreatedState = await repository.getAutomationState();
21+    assert.ok(recreatedState);
22+    assert.equal(recreatedState.mode, "paused");
23+    assert.ok(recreatedState.updatedAt >= 1_000_000_000_000);
24+  } finally {
25+    db.close();
26+  }
27+});
28+
29 test("SqliteD1Database supports task log queries through D1ControlPlaneRepository", async () => {
30   const db = new SqliteD1Database(":memory:", {
31     schemaSql: CONTROL_PLANE_SCHEMA_SQL
M packages/db/src/index.ts
+6, -2
 1@@ -471,6 +471,10 @@ export function nowUnixSeconds(date: Date = new Date()): number {
 2   return Math.floor(date.getTime() / 1000);
 3 }
 4 
 5+export function nowUnixMilliseconds(date: Date = new Date()): number {
 6+  return date.getTime();
 7+}
 8+
 9 export function stringifyJson(value: JsonValue | null | undefined): string | null {
10   if (value == null) {
11     return null;
12@@ -2286,7 +2290,7 @@ export class D1ControlPlaneRepository implements ControlPlaneRepository {
13     await this.run(ENSURE_AUTOMATION_STATE_SQL, [
14       AUTOMATION_STATE_KEY,
15       buildAutomationStateValue(mode),
16-      nowUnixSeconds()
17+      nowUnixMilliseconds()
18     ]);
19   }
20 
21@@ -2494,7 +2498,7 @@ export class D1ControlPlaneRepository implements ControlPlaneRepository {
22 
23   async setAutomationMode(
24     mode: AutomationMode,
25-    updatedAt: number = nowUnixSeconds()
26+    updatedAt: number = nowUnixMilliseconds()
27   ): Promise<void> {
28     await this.run(UPSERT_SYSTEM_STATE_SQL, [
29       AUTOMATION_STATE_KEY,