baa-conductor

git clone 

commit
42670d0
parent
aade8e2
author
codex@macbookpro
date
2026-04-01 11:59:50 +0800 CST
feat: make timed-jobs log writes async
3 files changed,  +215, -21
M apps/conductor-daemon/src/index.test.js
+168, -7
  1@@ -1704,6 +1704,31 @@ function readJsonlEntries(dirPath) {
  2     .map((line) => JSON.parse(line));
  3 }
  4 
  5+async function waitForJsonlEntries(dirPath, predicate = null, timeoutMs = 1_000) {
  6+  const deadline = Date.now() + timeoutMs;
  7+  let lastError = null;
  8+
  9+  while (Date.now() < deadline) {
 10+    try {
 11+      const entries = readJsonlEntries(dirPath);
 12+
 13+      if (predicate == null || predicate(entries)) {
 14+        return entries;
 15+      }
 16+    } catch (error) {
 17+      lastError = error;
 18+    }
 19+
 20+    await new Promise((resolve) => setTimeout(resolve, 10));
 21+  }
 22+
 23+  if (lastError != null) {
 24+    throw lastError;
 25+  }
 26+
 27+  return readJsonlEntries(dirPath);
 28+}
 29+
 30 async function flushAsyncWork() {
 31   await Promise.resolve();
 32   await Promise.resolve();
 33@@ -3028,7 +3053,10 @@ test("ConductorTimedJobs runs registered runners on leader ticks and writes JSON
 34 
 35     await timedJobs.stop();
 36 
 37-    const entries = readJsonlEntries(logsDir);
 38+    const entries = await waitForJsonlEntries(
 39+      logsDir,
 40+      (items) => items.some((entry) => entry.runner === "renewal.projector" && entry.stage === "job_projected")
 41+    );
 42     assert.ok(
 43       entries.find(
 44         (entry) =>
 45@@ -3063,6 +3091,126 @@ test("ConductorTimedJobs runs registered runners on leader ticks and writes JSON
 46   }
 47 });
 48 
 49+test("ConductorTimedJobs does not block ticks on pending async log writes and drains them on stop", async () => {
 50+  const logsDir = mkdtempSync(join(tmpdir(), "baa-timed-jobs-async-log-"));
 51+  const pendingWrites = [];
 52+  const timedJobs = new ConductorTimedJobs(
 53+    {
 54+      intervalMs: 5_000,
 55+      maxMessagesPerTick: 10,
 56+      maxTasksPerTick: 8,
 57+      settleDelayMs: 12_000
 58+    },
 59+    {
 60+      appendFileImpl: async (filePath, data) => {
 61+        await new Promise((resolve) => {
 62+          pendingWrites.push(() => {
 63+            writeFileSync(filePath, data, {
 64+              flag: "a"
 65+            });
 66+            resolve();
 67+          });
 68+        });
 69+      },
 70+      autoStart: false,
 71+      logDir: logsDir,
 72+      schedule: async (work) => {
 73+        await work({
 74+          controllerId: "mini-main",
 75+          host: "mini",
 76+          term: 4
 77+        });
 78+        return "scheduled";
 79+      }
 80+    }
 81+  );
 82+  let stopCompleted = false;
 83+
 84+  timedJobs.registerRunner({
 85+    name: "renewal.projector",
 86+    async run(context) {
 87+      context.log({
 88+        stage: "scan_window",
 89+        result: "ok",
 90+        details: {
 91+          cursor: "message:1"
 92+        }
 93+      });
 94+
 95+      return {
 96+        result: "ok"
 97+      };
 98+    }
 99+  });
100+
101+  try {
102+    await timedJobs.start();
103+
104+    const tick = await Promise.race([
105+      timedJobs.runTick("manual"),
106+      new Promise((_, reject) => {
107+        setTimeout(() => reject(new Error("timed out waiting for timed-jobs tick")), 1_000);
108+      })
109+    ]);
110+
111+    assert.equal(tick.decision, "scheduled");
112+    assert.ok(pendingWrites.length > 0);
113+
114+    const stopPromise = timedJobs.stop();
115+    void stopPromise.then(() => {
116+      stopCompleted = true;
117+    });
118+
119+    await flushAsyncWork();
120+    assert.equal(stopCompleted, false);
121+
122+    while (!stopCompleted) {
123+      const releaseWrite = pendingWrites.shift();
124+
125+      if (releaseWrite == null) {
126+        await flushAsyncWork();
127+        continue;
128+      }
129+
130+      releaseWrite();
131+      await flushAsyncWork();
132+    }
133+
134+    await stopPromise;
135+
136+    const entries = await waitForJsonlEntries(
137+      logsDir,
138+      (items) => items.some((entry) => entry.runner === "timed-jobs.framework" && entry.stage === "stopped")
139+    );
140+    assert.ok(
141+      entries.find(
142+        (entry) => entry.runner === "timed-jobs.framework" && entry.stage === "started"
143+      )
144+    );
145+    assert.ok(
146+      entries.find(
147+        (entry) => entry.runner === "renewal.projector" && entry.stage === "scan_window"
148+      )
149+    );
150+    assert.ok(
151+      entries.find(
152+        (entry) => entry.runner === "timed-jobs.framework" && entry.stage === "stopped"
153+      )
154+    );
155+  } finally {
156+    while (pendingWrites.length > 0) {
157+      pendingWrites.shift()();
158+      await flushAsyncWork();
159+    }
160+
161+    await timedJobs.stop();
162+    rmSync(logsDir, {
163+      force: true,
164+      recursive: true
165+    });
166+  }
167+});
168+
169 test("renewal projector scans settled messages with cursor semantics and skips ineligible conversations", async () => {
170   const rootDir = mkdtempSync(join(tmpdir(), "baa-renewal-projector-"));
171   const logsDir = mkdtempSync(join(tmpdir(), "baa-renewal-projector-logs-"));
172@@ -3077,6 +3225,7 @@ test("renewal projector scans settled messages with cursor semantics and skips i
173     publicBaseUrl: "https://conductor.makefile.so"
174   });
175   const nowMs = Date.UTC(2026, 2, 30, 10, 0, 0);
176+  let timedJobs = null;
177 
178   try {
179     await artifactStore.upsertLocalConversation({
180@@ -3286,7 +3435,7 @@ test("renewal projector scans settled messages with cursor semantics and skips i
181       role: "assistant"
182     });
183 
184-    const timedJobs = new ConductorTimedJobs(
185+    timedJobs = new ConductorTimedJobs(
186       {
187         intervalMs: 5_000,
188         maxMessagesPerTick: 10,
189@@ -3345,7 +3494,10 @@ test("renewal projector scans settled messages with cursor semantics and skips i
190       observed_at: missingTabIdMessage.observedAt
191     });
192 
193-    const entries = readJsonlEntries(logsDir);
194+    const entries = await waitForJsonlEntries(
195+      logsDir,
196+      (items) => items.some((entry) => entry.runner === "renewal.projector" && entry.stage === "job_projected")
197+    );
198     assert.ok(entries.find((entry) => entry.runner === "renewal.projector" && entry.stage === "job_projected"));
199     assert.ok(
200       entries.find(
201@@ -3400,6 +3552,7 @@ test("renewal projector scans settled messages with cursor semantics and skips i
202     assert.equal(pausedMessage.id, "msg_paused");
203     assert.equal(cooldownMessage.id, "msg_cooldown");
204   } finally {
205+    await timedJobs?.stop();
206     artifactStore.close();
207     rmSync(rootDir, {
208       force: true,
209@@ -3425,6 +3578,7 @@ test("renewal projector restores cursor from valueJson even when legacy system_s
210     publicBaseUrl: "https://artifacts.example.test"
211   });
212   const nowMs = Date.UTC(2026, 2, 30, 12, 0, 0);
213+  let timedJobs = null;
214 
215   try {
216     await artifactStore.upsertLocalConversation({
217@@ -3475,7 +3629,7 @@ test("renewal projector restores cursor from valueJson even when legacy system_s
218       })
219     });
220 
221-    const timedJobs = new ConductorTimedJobs(
222+    timedJobs = new ConductorTimedJobs(
223       {
224         intervalMs: 5_000,
225         maxMessagesPerTick: 10,
226@@ -3521,6 +3675,7 @@ test("renewal projector restores cursor from valueJson even when legacy system_s
227 
228     await timedJobs.stop();
229   } finally {
230+    await timedJobs?.stop();
231     artifactStore.close();
232     localApiFixture.controlPlane.close();
233     rmSync(rootDir, {
234@@ -4837,7 +4992,7 @@ test("ConductorTimedJobs keeps standby runners idle and clears interval handles
235     await timedJobs.stop();
236     assert.equal(intervalScheduler.getActiveCount(), 0);
237 
238-    const entries = readJsonlEntries(logsDir);
239+    const entries = await waitForJsonlEntries(logsDir);
240     assert.ok(
241       entries.find(
242         (entry) =>
243@@ -7201,7 +7356,10 @@ test("ConductorRuntime initializes timed-jobs logging during startup", async ()
244     const timedJobsLogDir = join(logsDir, "timed-jobs");
245     assert.equal(existsSync(timedJobsLogDir), true);
246 
247-    const entries = readJsonlEntries(timedJobsLogDir);
248+    const entries = await waitForJsonlEntries(
249+      timedJobsLogDir,
250+      (items) => items.some((entry) => entry.runner === "timed-jobs.framework" && entry.stage === "started")
251+    );
252     assert.ok(
253       entries.find(
254         (entry) => entry.runner === "timed-jobs.framework" && entry.stage === "started"
255@@ -8850,7 +9008,10 @@ test("ConductorRuntime registers renewal projector, projects auto messages once,
256     assert.equal(jobsAfterRestart.length, 1);
257     assert.equal(jobsAfterRestart[0].messageId, "msg-runtime-projector-1");
258 
259-    const timedJobsEntries = readJsonlEntries(join(logsDir, "timed-jobs"));
260+    const timedJobsEntries = await waitForJsonlEntries(
261+      join(logsDir, "timed-jobs"),
262+      (items) => items.some((entry) => entry.runner === "renewal.projector" && entry.stage === "job_projected")
263+    );
264     assert.ok(
265       timedJobsEntries.find(
266         (entry) => entry.runner === "renewal.projector" && entry.stage === "job_projected"
M apps/conductor-daemon/src/timed-jobs/runtime.ts
+30, -8
 1@@ -1,4 +1,4 @@
 2-import { appendFileSync } from "node:fs";
 3+import { writeFile } from "node:fs/promises";
 4 import { join } from "node:path";
 5 
 6 import type { ArtifactStore } from "../../../../packages/artifact-db/dist/index.js";
 7@@ -66,7 +66,10 @@ export type TimedJobsSchedule = (
 8   work: (context: TimedJobScheduleContext) => Promise<void>
 9 ) => Promise<"scheduled" | "skipped_not_leader">;
10 
11+type TimedJobsAppendFile = (filePath: string, data: string) => Promise<void>;
12+
13 export interface ConductorTimedJobsOptions {
14+  appendFileImpl?: TimedJobsAppendFile;
15   artifactStore?: ArtifactStore | null;
16   autoStart?: boolean;
17   clearIntervalImpl?: (handle: TimedJobsIntervalHandle) => void;
18@@ -92,6 +95,7 @@ interface TimedJobsLogEntry {
19 const FRAMEWORK_RUNNER_NAME = "timed-jobs.framework";
20 
21 export class ConductorTimedJobs {
22+  private readonly appendFileImpl: TimedJobsAppendFile;
23   private readonly artifactStore: ArtifactStore | null;
24   private readonly autoStart: boolean;
25   private readonly clearIntervalImpl: (handle: TimedJobsIntervalHandle) => void;
26@@ -99,6 +103,7 @@ export class ConductorTimedJobs {
27   private inFlightTick: Promise<TimedJobsTickResult> | null = null;
28   private intervalHandle: TimedJobsIntervalHandle | null = null;
29   private readonly logDir: string | null;
30+  private pendingLogWrite: Promise<void> = Promise.resolve();
31   private readonly runners = new Map<string, TimedJobRunner>();
32   private readonly schedule: TimedJobsSchedule;
33   private readonly setIntervalImpl: (
34@@ -109,6 +114,8 @@ export class ConductorTimedJobs {
35   private batchSequence = 0;
36 
37   constructor(config: TimedJobsConfig, options: ConductorTimedJobsOptions) {
38+    this.appendFileImpl =
39+      options.appendFileImpl ?? ((filePath, data) => writeFile(filePath, data, { flag: "a" }));
40     this.artifactStore = options.artifactStore ?? null;
41     this.autoStart = options.autoStart ?? true;
42     this.clearIntervalImpl =
43@@ -199,6 +206,7 @@ export class ConductorTimedJobs {
44       stage: "stopped",
45       result: "stopped"
46     });
47+    await this.flushLogWrites();
48   }
49 
50   async runTick(trigger: TimedJobsTickTrigger = "manual"): Promise<TimedJobsTickResult> {
51@@ -432,6 +440,24 @@ export class ConductorTimedJobs {
52     this.writeLogEntry(runner, input.batchId, input);
53   }
54 
55+  private async flushLogWrites(): Promise<void> {
56+    try {
57+      await this.pendingLogWrite;
58+    } catch {
59+      // individual write failures are already reported
60+    }
61+  }
62+
63+  private queueLogWrite(filePath: string, data: string): void {
64+    // Keep JSONL entries ordered without blocking tick execution on filesystem IO.
65+    this.pendingLogWrite = this.pendingLogWrite
66+      .catch(() => {})
67+      .then(() => this.appendFileImpl(filePath, data))
68+      .catch((error) => {
69+        console.error(`[timed-jobs-log] write failed: ${String(error)}`);
70+      });
71+  }
72+
73   private writeLogEntry(
74     runner: string,
75     batchId: string,
76@@ -457,13 +483,9 @@ export class ConductorTimedJobs {
77       }
78     }
79 
80-    try {
81-      const date = entry.ts.slice(0, 10);
82-      const filePath = join(this.logDir, `${date}.jsonl`);
83-      appendFileSync(filePath, `${JSON.stringify(entry)}\n`);
84-    } catch (error) {
85-      console.error(`[timed-jobs-log] write failed: ${String(error)}`);
86-    }
87+    const date = entry.ts.slice(0, 10);
88+    const filePath = join(this.logDir, `${date}.jsonl`);
89+    this.queueLogWrite(filePath, `${JSON.stringify(entry)}\n`);
90   }
91 }
92 
M tasks/T-S064.md
+17, -6
 1@@ -2,7 +2,7 @@
 2 
 3 ## 状态
 4 
 5-- 当前状态:`待开始`
 6+- 当前状态:`已完成`
 7 - 规模预估:`S`
 8 - 依赖任务:无
 9 - 建议执行者:`Codex`
10@@ -111,22 +111,33 @@ timed-jobs 已经是正式主链能力。继续使用同步日志写入会放大
11 
12 ### 开始执行
13 
14-- 执行者:
15-- 开始时间:
16+- 执行者:`Codex`
17+- 开始时间:`2026-04-01 11:29:12 CST`
18 - 状态变更:`待开始` → `进行中`
19 
20 ### 完成摘要
21 
22-- 完成时间:
23+- 完成时间:`2026-04-01 11:59:04 CST`
24 - 状态变更:`进行中` → `已完成`
25 - 修改了哪些文件:
26+  - `apps/conductor-daemon/src/timed-jobs/runtime.ts`
27+  - `apps/conductor-daemon/src/index.test.js`
28+  - `tasks/T-S064.md`
29 - 核心实现思路:
30+  - 将 timed-jobs JSONL 写入从同步 `appendFileSync` 改为异步追加队列,保持单行 JSONL 格式不变,同时保证同一进程内日志顺序稳定
31+  - `runTick` / runner 主流程不等待日志 IO,避免在 tick 热路径上做同步磁盘写;`stop()` 阶段显式 drain 队列,降低停机时日志丢失和测试 flake 风险
32+  - 补充测试覆盖“异步写入不阻塞 tick、stop 会等待日志落盘”,并把直接读取 timed-jobs 日志的现有用例改为等待目标日志事件真正落盘后再断言
33 - 跑了哪些测试:
34+  - `cd /Users/george/code/baa-conductor-timed-jobs-async-log-writes && pnpm install`
35+  - `cd /Users/george/code/baa-conductor-timed-jobs-async-log-writes/apps/conductor-daemon && pnpm run build && node --test --test-name-pattern='renewal projector scans settled messages with cursor semantics and skips ineligible conversations' src/index.test.js`
36+  - `cd /Users/george/code/baa-conductor-timed-jobs-async-log-writes && pnpm -C apps/conductor-daemon test`
37 
38 ### 执行过程中遇到的问题
39 
40-- 暂无
41+- 任务卡记录的基线提交是 `64d122e`,但实际开工时 `origin/main` 已前进到 `37bf1d0`;本次按“从最新 main 开工”的要求创建了 worktree 和分支
42+- 新 worktree 初始没有安装依赖,第一次跑测试失败于 `pnpm exec tsc` 不存在;补跑 `pnpm install` 后恢复正常构建与测试流程
43+- 日志改成异步写后,旧测试里“先删临时目录再结束 timed-jobs”与“日志一出现就立刻断言目标事件”这两类隐含同步假设会失效;已调整为等待日志落盘并在清理前先 `stop()` drain
44 
45 ### 剩余风险
46 
47-- 暂无
48+- `stop()` 正常路径现在会等待日志队列写完,但如果进程被外部强杀,最后少量排队中的日志仍可能来不及落盘