baa-conductor

git clone 

commit
d909d38
parent
c5e007b
author
im_wower
date
2026-03-22 00:13:40 +0800 CST
feat(worker-runner): persist local run files
7 files changed,  +287, -57
M apps/worker-runner/src/runner.ts
+112, -48
  1@@ -1,16 +1,26 @@
  2 import {
  3+  persistCheckpointRecord,
  4+  type CheckpointRecord
  5+} from "@baa-conductor/checkpointing";
  6+import {
  7+  appendLifecycleEntry,
  8   appendStreamChunk,
  9+  appendStreamEntry,
 10   createLocalRunLogSession,
 11   createLocalRunPaths,
 12   createRunMetadata,
 13   createRunStateSnapshot,
 14+  initializeLocalRunFiles,
 15   recordLifecycleEvent,
 16   summarizeLocalRunLogSession,
 17   updateRunState,
 18+  writeRunState,
 19   type LogLevel,
 20-  type StructuredData,
 21   type RunStatePatch,
 22   type RunStatus,
 23+  type StreamLogChannel,
 24+  type StructuredData,
 25+  type WorkerLifecycleEventInput,
 26   type WorkerLifecycleEventType
 27 } from "@baa-conductor/logging";
 28 import type {
 29@@ -38,6 +48,61 @@ function synchronizeRunState(run: PreparedStepRun, patch: RunStatePatch): void {
 30   });
 31 }
 32 
 33+async function persistRunState(run: PreparedStepRun): Promise<void> {
 34+  await writeRunState(run.logPaths, run.state);
 35+}
 36+
 37+async function synchronizeRunStatePersisted(
 38+  run: PreparedStepRun,
 39+  patch: RunStatePatch
 40+): Promise<void> {
 41+  synchronizeRunState(run, patch);
 42+  await persistRunState(run);
 43+}
 44+
 45+async function persistCheckpointRecordIfPresent(
 46+  run: PreparedStepRun,
 47+  record?: CheckpointRecord
 48+): Promise<void> {
 49+  if (record === undefined) {
 50+    return;
 51+  }
 52+
 53+  await persistCheckpointRecord(record);
 54+  await synchronizeRunStatePersisted(run, {
 55+    updatedAt: record.createdAt
 56+  });
 57+}
 58+
 59+async function recordLifecycleEventPersisted(
 60+  run: PreparedStepRun,
 61+  input: WorkerLifecycleEventInput,
 62+  statePatch: RunStatePatch = {}
 63+): Promise<void> {
 64+  const entry = recordLifecycleEvent(run.logSession, input);
 65+
 66+  await appendLifecycleEntry(run.logSession, entry);
 67+  await synchronizeRunStatePersisted(run, {
 68+    ...statePatch,
 69+    updatedAt: statePatch.updatedAt ?? entry.createdAt
 70+  });
 71+}
 72+
 73+async function appendStreamChunkPersisted(
 74+  run: PreparedStepRun,
 75+  channel: StreamLogChannel,
 76+  text: string
 77+): Promise<void> {
 78+  const entry = appendStreamChunk(run.logSession, channel, {
 79+    text
 80+  });
 81+
 82+  await appendStreamEntry(run.logSession, entry);
 83+  await synchronizeRunStatePersisted(run, {
 84+    updatedAt: entry.createdAt
 85+  });
 86+}
 87+
 88 function createDefaultArtifacts(run: PreparedStepRun): StepArtifact[] {
 89   return [
 90     {
 91@@ -145,7 +210,7 @@ function resolveExitCode(execution: WorkerExecutionOutcome): number {
 92   return execution.ok ? 0 : 1;
 93 }
 94 
 95-export function prepareStepRun(request: StepExecutionRequest): PreparedStepRun {
 96+export async function prepareStepRun(request: StepExecutionRequest): Promise<PreparedStepRun> {
 97   const startedAt = request.createdAt ?? new Date().toISOString();
 98   const checkpointConfig = resolveCheckpointConfig(request.checkpoint);
 99   const logPaths = createLocalRunPaths({
100@@ -190,7 +255,10 @@ export function prepareStepRun(request: StepExecutionRequest): PreparedStepRun {
101     checkpointManager
102   };
103 
104-  recordLifecycleEvent(run.logSession, {
105+  await initializeLocalRunFiles(run.logPaths, run.metadata, run.state);
106+  await persistCheckpointRecordIfPresent(run, preparationCheckpoint);
107+
108+  await recordLifecycleEventPersisted(run, {
109     type: "run_prepared",
110     level: "info",
111     createdAt: startedAt,
112@@ -249,7 +317,7 @@ export function prepareStepRun(request: StepExecutionRequest): PreparedStepRun {
113       };
114     }
115 
116-    recordLifecycleEvent(run.logSession, {
117+    await recordLifecycleEventPersisted(run, {
118       type: "checkpoint_slot_reserved",
119       level: "info",
120       message: `Initialized checkpoint manager at sequence ${checkpoint.lastCheckpointSeq}.`,
121@@ -257,10 +325,6 @@ export function prepareStepRun(request: StepExecutionRequest): PreparedStepRun {
122     });
123   }
124 
125-  synchronizeRunState(run, {
126-    updatedAt: startedAt
127-  });
128-
129   return run;
130 }
131 
132@@ -284,27 +348,29 @@ export async function runStep(
133   executor: WorkerExecutor = createPlaceholderWorkerExecutor()
134 ): Promise<StepExecutionResult> {
135   const startedAt = request.createdAt ?? new Date().toISOString();
136-  const run = prepareStepRun({
137+  const run = await prepareStepRun({
138     ...request,
139     createdAt: startedAt
140   });
141   const executionStartedAt = new Date().toISOString();
142 
143-  recordLifecycleEvent(run.logSession, {
144-    type: "worker_started",
145-    level: "info",
146-    createdAt: executionStartedAt,
147+  await recordLifecycleEventPersisted(
148+    run,
149+    {
150+      type: "worker_started",
151+      level: "info",
152+      createdAt: executionStartedAt,
153     message: `Worker runner opened execution scope for ${request.workerKind} step ${request.stepId}.`,
154     data: {
155-      stepKind: request.stepKind,
156-      timeoutSec: request.timeoutSec,
157-      worktreePath: request.runtime.worktreePath
158+        stepKind: request.stepKind,
159+        timeoutSec: request.timeoutSec,
160+        worktreePath: request.runtime.worktreePath
161+      }
162+    },
163+    {
164+      status: "running"
165     }
166-  });
167-  synchronizeRunState(run, {
168-    status: "running",
169-    updatedAt: executionStartedAt
170-  });
171+  );
172 
173   const execution = await executor.execute(run);
174   const blocked = execution.blocked ?? execution.outcome === "blocked";
175@@ -312,19 +378,15 @@ export async function runStep(
176   const exitCode = resolveExitCode(execution);
177 
178   for (const line of execution.stdout ?? []) {
179-    appendStreamChunk(run.logSession, "stdout", {
180-      text: line
181-    });
182+    await appendStreamChunkPersisted(run, "stdout", line);
183   }
184 
185   for (const line of execution.stderr ?? []) {
186-    appendStreamChunk(run.logSession, "stderr", {
187-      text: line
188-    });
189+    await appendStreamChunkPersisted(run, "stderr", line);
190   }
191 
192   if (execution.outcome === "prepared") {
193-    recordLifecycleEvent(run.logSession, {
194+    await recordLifecycleEventPersisted(run, {
195       type: "worker_execution_deferred",
196       level: "info",
197       message: `Real ${request.workerKind} execution is intentionally deferred while checkpoint capture remains active.`,
198@@ -335,7 +397,7 @@ export async function runStep(
199     });
200   }
201 
202-  recordLifecycleEvent(run.logSession, {
203+  await recordLifecycleEventPersisted(run, {
204     type: "worker_exited",
205     level: exitCode === 0 ? "info" : "error",
206     message: `Worker runner closed execution scope with outcome ${execution.outcome}.`,
207@@ -347,34 +409,36 @@ export async function runStep(
208 
209   const finishedAt = new Date().toISOString();
210 
211-  recordLifecycleEvent(run.logSession, {
212-    type: mapOutcomeToTerminalEvent(execution.outcome),
213-    level: mapOutcomeToLevel(execution),
214-    createdAt: finishedAt,
215-    message: execution.summary,
216-    data: {
217-      ok: execution.ok,
218-      blocked,
219-      needsHuman,
220+  await recordLifecycleEventPersisted(
221+    run,
222+    {
223+      type: mapOutcomeToTerminalEvent(execution.outcome),
224+      level: mapOutcomeToLevel(execution),
225+      createdAt: finishedAt,
226+      message: execution.summary,
227+      data: {
228+        ok: execution.ok,
229+        blocked,
230+        needsHuman,
231+        exitCode
232+      }
233+    },
234+    {
235+      status: mapOutcomeToRunStatus(execution.outcome),
236+      finishedAt,
237+      summary: execution.summary,
238       exitCode
239     }
240-  });
241+  );
242 
243-  emitLogTailCheckpoint(
244+  const logTailCheckpoint = emitLogTailCheckpoint(
245     run.checkpointManager,
246     run.checkpoint,
247     run.logSession,
248     `Captured combined log tail after ${execution.outcome} for ${request.stepId}.`,
249     request.checkpoint?.logTailLines
250   );
251-
252-  synchronizeRunState(run, {
253-    status: mapOutcomeToRunStatus(execution.outcome),
254-    updatedAt: finishedAt,
255-    finishedAt,
256-    summary: execution.summary,
257-    exitCode
258-  });
259+  await persistCheckpointRecordIfPresent(run, logTailCheckpoint);
260 
261   const logSummary = summarizeLocalRunLogSession(run.logSession);
262   const durationMs = Math.max(0, Date.parse(finishedAt) - Date.parse(startedAt));
M coordination/tasks/T-016-worker-persistence.md
+23, -9
 1@@ -1,10 +1,10 @@
 2 ---
 3 task_id: T-016
 4 title: Worker 本地持久化
 5-status: todo
 6+status: review
 7 branch: feat/T-016-worker-persistence
 8 repo: /Users/george/code/baa-conductor
 9-base_ref: main
10+base_ref: main@c5e007b
11 depends_on:
12   - T-005
13   - T-006
14@@ -12,7 +12,7 @@ write_scope:
15   - apps/worker-runner/**
16   - packages/checkpointing/**
17   - packages/logging/**
18-updated_at: 2026-03-21
19+updated_at: 2026-03-22
20 ---
21 
22 # T-016 Worker 本地持久化
23@@ -60,25 +60,39 @@ updated_at: 2026-03-21
24 
25 ## files_changed
26 
27-- 待填写
28+- `apps/worker-runner/src/runner.ts`
29+- `packages/checkpointing/src/index.ts`
30+- `packages/checkpointing/src/node-shims.ts`
31+- `packages/logging/src/index.ts`
32+- `packages/logging/src/node-shims.ts`
33+- `packages/logging/src/persistence.ts`
34+- `coordination/tasks/T-016-worker-persistence.md`
35 
36 ## commands_run
37 
38-- 待填写
39+- `npx --yes pnpm install`
40+- `npx --yes pnpm --filter @baa-conductor/logging typecheck`
41+- `npx --yes pnpm --filter @baa-conductor/checkpointing typecheck`
42+- `npx --yes pnpm --filter @baa-conductor/worker-runner typecheck`
43+- `npx --yes pnpm exec tsc -p apps/worker-runner/tsconfig.json --outDir <tmp> --module commonjs`
44+- `DIST_ROOT=<tmp> RUNTIME_ROOT=<tmp> node <<'EOF' ... EOF`
45 
46 ## result
47 
48-- 待填写
49+- `worker-runner` 现在会创建本地 run 目录,并在初始化时写入 `meta.json`、`state.json`、空的 `worker.log` / `stdout.log` / `stderr.log`。
50+- 生命周期事件、stdout/stderr chunk、checkpoint 写入后都会同步刷新本地 `state.json`,不再只停留在内存态。
51+- `summary` 与 `log_tail` checkpoint 会真实落到 `checkpoints/`,并已通过临时 CommonJS 编译验证生成 `0001-summary.json`、`0002-log-tail.txt`。
52 
53 ## risks
54 
55-- 待填写
56+- 真实 Codex 子进程尚未接入,当前只验证了 placeholder/custom executor 路径下的本地持久化。
57+- `git_diff`、`test_output` 等更大 checkpoint 负载还没有实际生成端,当前仅补齐了文件写入基础设施。
58 
59 ## next_handoff
60 
61-- 待填写
62+- 将真实 worker 执行器接到 `appendStreamChunkPersisted` / `recordLifecycleEventPersisted`,保持 stdout/stderr 与状态文件持续落盘。
63+- 在后续 checkpoint 任务里直接复用 `persistCheckpointRecord` 扩展 `git_diff`、`test_output` 的实际产出。
64 
65 ## notes
66 
67 - `2026-03-21`: 创建第三波任务卡
68-
M packages/checkpointing/src/index.ts
+23, -0
 1@@ -1,3 +1,5 @@
 2+import { mkdir, writeFile } from "node:fs/promises";
 3+
 4 export type CheckpointType = "summary" | "git_diff" | "log_tail" | "test_output";
 5 export type CheckpointFileFormat = "json" | "text" | "patch";
 6 export type CheckpointReplayStrategy = "none" | "git_apply_binary";
 7@@ -268,6 +270,16 @@ function renderTextContent(contentText?: string): string {
 8   return contentText.endsWith("\n") ? contentText : `${contentText}\n`;
 9 }
10 
11+function getParentDirectory(filePath: string): string {
12+  const lastSlashIndex = filePath.lastIndexOf("/");
13+
14+  if (lastSlashIndex <= 0) {
15+    return ".";
16+  }
17+
18+  return filePath.slice(0, lastSlashIndex);
19+}
20+
21 export function createCheckpointFilename(
22   seq: number,
23   checkpointTypeOrSuffix: CheckpointType | string
24@@ -382,6 +394,17 @@ export function renderCheckpointFile(record: CheckpointRecord): RenderedCheckpoi
25   };
26 }
27 
28+export async function persistCheckpointRecord(
29+  record: CheckpointRecord
30+): Promise<RenderedCheckpointFile> {
31+  const rendered = renderCheckpointFile(record);
32+
33+  await mkdir(getParentDirectory(rendered.storagePath), { recursive: true });
34+  await writeFile(rendered.storagePath, rendered.content, "utf8");
35+
36+  return rendered;
37+}
38+
39 export function createCheckpointManager(input: CreateCheckpointManagerInput): CheckpointManager {
40   const state = createCheckpointManagerState(input);
41 
A packages/checkpointing/src/node-shims.ts
+23, -0
 1@@ -0,0 +1,23 @@
 2+declare module "node:fs/promises" {
 3+  export interface FileOperationOptions {
 4+    encoding?: string;
 5+    mode?: number;
 6+    flag?: string;
 7+  }
 8+
 9+  export interface MakeDirectoryOptions {
10+    recursive?: boolean;
11+    mode?: number;
12+  }
13+
14+  export function mkdir(
15+    path: string,
16+    options?: MakeDirectoryOptions
17+  ): Promise<string | undefined>;
18+
19+  export function writeFile(
20+    path: string,
21+    data: string,
22+    options?: FileOperationOptions | string
23+  ): Promise<void>;
24+}
M packages/logging/src/index.ts
+1, -0
1@@ -1,4 +1,5 @@
2 export * from "./contracts";
3 export * from "./paths";
4+export * from "./persistence";
5 export * from "./session";
6 export * from "./state";
A packages/logging/src/node-shims.ts
+29, -0
 1@@ -0,0 +1,29 @@
 2+declare module "node:fs/promises" {
 3+  export interface FileOperationOptions {
 4+    encoding?: string;
 5+    mode?: number;
 6+    flag?: string;
 7+  }
 8+
 9+  export interface MakeDirectoryOptions {
10+    recursive?: boolean;
11+    mode?: number;
12+  }
13+
14+  export function mkdir(
15+    path: string,
16+    options?: MakeDirectoryOptions
17+  ): Promise<string | undefined>;
18+
19+  export function writeFile(
20+    path: string,
21+    data: string,
22+    options?: FileOperationOptions | string
23+  ): Promise<void>;
24+
25+  export function appendFile(
26+    path: string,
27+    data: string,
28+    options?: FileOperationOptions | string
29+  ): Promise<void>;
30+}
A packages/logging/src/persistence.ts
+76, -0
 1@@ -0,0 +1,76 @@
 2+import { appendFile, mkdir, writeFile } from "node:fs/promises";
 3+import type {
 4+  LocalRunLogSession,
 5+  LocalRunPaths,
 6+  RunMetadata,
 7+  RunStateSnapshot,
 8+  StreamChunkLogEntry,
 9+  WorkerLifecycleLogEntry
10+} from "./contracts";
11+
12+function renderJsonFile(value: unknown): string {
13+  return `${JSON.stringify(value, null, 2)}\n`;
14+}
15+
16+function ensureTrailingNewline(value: string): string {
17+  if (value === "" || value.endsWith("\n")) {
18+    return value;
19+  }
20+
21+  return `${value}\n`;
22+}
23+
24+export async function ensureLocalRunLayout(paths: LocalRunPaths): Promise<void> {
25+  await Promise.all([
26+    mkdir(paths.taskRunsDir, { recursive: true }),
27+    mkdir(paths.runDir, { recursive: true }),
28+    mkdir(paths.checkpointsDir, { recursive: true }),
29+    mkdir(paths.artifactsDir, { recursive: true })
30+  ]);
31+}
32+
33+export async function writeRunMetadata(
34+  paths: LocalRunPaths,
35+  metadata: RunMetadata
36+): Promise<void> {
37+  await writeFile(paths.metaPath, renderJsonFile(metadata), "utf8");
38+}
39+
40+export async function writeRunState(
41+  paths: LocalRunPaths,
42+  state: RunStateSnapshot
43+): Promise<void> {
44+  await writeFile(paths.statePath, renderJsonFile(state), "utf8");
45+}
46+
47+export async function initializeLocalRunFiles(
48+  paths: LocalRunPaths,
49+  metadata: RunMetadata,
50+  state: RunStateSnapshot
51+): Promise<void> {
52+  await ensureLocalRunLayout(paths);
53+  await Promise.all([
54+    writeRunMetadata(paths, metadata),
55+    writeRunState(paths, state),
56+    writeFile(paths.workerLogPath, "", "utf8"),
57+    writeFile(paths.stdoutLogPath, "", "utf8"),
58+    writeFile(paths.stderrLogPath, "", "utf8")
59+  ]);
60+}
61+
62+export async function appendLifecycleEntry(
63+  session: LocalRunLogSession,
64+  entry: WorkerLifecycleLogEntry
65+): Promise<void> {
66+  await appendFile(session.worker.filePath, ensureTrailingNewline(entry.renderedLine), "utf8");
67+}
68+
69+export async function appendStreamEntry(
70+  session: LocalRunLogSession,
71+  entry: StreamChunkLogEntry
72+): Promise<void> {
73+  const filePath =
74+    entry.channel === "stdout" ? session.stdout.filePath : session.stderr.filePath;
75+
76+  await appendFile(filePath, ensureTrailingNewline(entry.text), "utf8");
77+}