- commit
- d248eea
- parent
- da004c8
- author
- im_wower
- date
- 2026-03-22 00:37:48 +0800 CST
Merge remote-tracking branch 'origin/feat/T-016-worker-persistence' into integration/third-wave-20260322
7 files changed,
+289,
-59
+114,
-50
1@@ -1,16 +1,26 @@
2 import {
3+ persistCheckpointRecord,
4+ type CheckpointRecord
5+} from "@baa-conductor/checkpointing";
6+import {
7+ appendLifecycleEntry,
8 appendStreamChunk,
9+ appendStreamEntry,
10 createLocalRunLogSession,
11 createLocalRunPaths,
12 createRunMetadata,
13 createRunStateSnapshot,
14+ initializeLocalRunFiles,
15 recordLifecycleEvent,
16 summarizeLocalRunLogSession,
17 updateRunState,
18+ writeRunState,
19 type LogLevel,
20- type StructuredData,
21 type RunStatePatch,
22 type RunStatus,
23+ type StreamLogChannel,
24+ type StructuredData,
25+ type WorkerLifecycleEventInput,
26 type WorkerLifecycleEventType
27 } from "@baa-conductor/logging";
28 import type {
29@@ -38,6 +48,61 @@ function synchronizeRunState(run: PreparedStepRun, patch: RunStatePatch): void {
30 });
31 }
32
33+async function persistRunState(run: PreparedStepRun): Promise<void> {
34+ await writeRunState(run.logPaths, run.state);
35+}
36+
37+async function synchronizeRunStatePersisted(
38+ run: PreparedStepRun,
39+ patch: RunStatePatch
40+): Promise<void> {
41+ synchronizeRunState(run, patch);
42+ await persistRunState(run);
43+}
44+
45+async function persistCheckpointRecordIfPresent(
46+ run: PreparedStepRun,
47+ record?: CheckpointRecord
48+): Promise<void> {
49+ if (record === undefined) {
50+ return;
51+ }
52+
53+ await persistCheckpointRecord(record);
54+ await synchronizeRunStatePersisted(run, {
55+ updatedAt: record.createdAt
56+ });
57+}
58+
59+async function recordLifecycleEventPersisted(
60+ run: PreparedStepRun,
61+ input: WorkerLifecycleEventInput,
62+ statePatch: RunStatePatch = {}
63+): Promise<void> {
64+ const entry = recordLifecycleEvent(run.logSession, input);
65+
66+ await appendLifecycleEntry(run.logSession, entry);
67+ await synchronizeRunStatePersisted(run, {
68+ ...statePatch,
69+ updatedAt: statePatch.updatedAt ?? entry.createdAt
70+ });
71+}
72+
73+async function appendStreamChunkPersisted(
74+ run: PreparedStepRun,
75+ channel: StreamLogChannel,
76+ text: string
77+): Promise<void> {
78+ const entry = appendStreamChunk(run.logSession, channel, {
79+ text
80+ });
81+
82+ await appendStreamEntry(run.logSession, entry);
83+ await synchronizeRunStatePersisted(run, {
84+ updatedAt: entry.createdAt
85+ });
86+}
87+
88 function createDefaultArtifacts(run: PreparedStepRun): StepArtifact[] {
89 return [
90 {
91@@ -145,7 +210,7 @@ function resolveExitCode(execution: WorkerExecutionOutcome): number {
92 return execution.ok ? 0 : 1;
93 }
94
95-export function prepareStepRun(request: StepExecutionRequest): PreparedStepRun {
96+export async function prepareStepRun(request: StepExecutionRequest): Promise<PreparedStepRun> {
97 const startedAt = request.createdAt ?? new Date().toISOString();
98 const checkpointConfig = resolveCheckpointConfig(request.checkpoint);
99 const logPaths = createLocalRunPaths({
100@@ -190,7 +255,10 @@ export function prepareStepRun(request: StepExecutionRequest): PreparedStepRun {
101 checkpointManager
102 };
103
104- recordLifecycleEvent(run.logSession, {
105+ await initializeLocalRunFiles(run.logPaths, run.metadata, run.state);
106+ await persistCheckpointRecordIfPresent(run, preparationCheckpoint);
107+
108+ await recordLifecycleEventPersisted(run, {
109 type: "run_prepared",
110 level: "info",
111 createdAt: startedAt,
112@@ -249,7 +317,7 @@ export function prepareStepRun(request: StepExecutionRequest): PreparedStepRun {
113 };
114 }
115
116- recordLifecycleEvent(run.logSession, {
117+ await recordLifecycleEventPersisted(run, {
118 type: "checkpoint_slot_reserved",
119 level: "info",
120 message: `Initialized checkpoint manager at sequence ${checkpoint.lastCheckpointSeq}.`,
121@@ -257,10 +325,6 @@ export function prepareStepRun(request: StepExecutionRequest): PreparedStepRun {
122 });
123 }
124
125- synchronizeRunState(run, {
126- updatedAt: startedAt
127- });
128-
129 return run;
130 }
131
132@@ -284,27 +348,29 @@ export async function runStep(
133 executor: WorkerExecutor = createPlaceholderWorkerExecutor()
134 ): Promise<StepExecutionResult> {
135 const startedAt = request.createdAt ?? new Date().toISOString();
136- const run = prepareStepRun({
137+ const run = await prepareStepRun({
138 ...request,
139 createdAt: startedAt
140 });
141 const executionStartedAt = new Date().toISOString();
142
143- recordLifecycleEvent(run.logSession, {
144- type: "worker_started",
145- level: "info",
146- createdAt: executionStartedAt,
147- message: `Worker runner opened execution scope for ${request.workerKind} step ${request.stepId}.`,
148- data: {
149- stepKind: request.stepKind,
150- timeoutSec: request.timeoutSec,
151- worktreePath: request.runtime.worktreePath
152+ await recordLifecycleEventPersisted(
153+ run,
154+ {
155+ type: "worker_started",
156+ level: "info",
157+ createdAt: executionStartedAt,
158+ message: `Worker runner opened execution scope for ${request.workerKind} step ${request.stepId}.`,
159+ data: {
160+ stepKind: request.stepKind,
161+ timeoutSec: request.timeoutSec,
162+ worktreePath: request.runtime.worktreePath
163+ }
164+ },
165+ {
166+ status: "running"
167 }
168- });
169- synchronizeRunState(run, {
170- status: "running",
171- updatedAt: executionStartedAt
172- });
173+ );
174
175 const execution = await executor.execute(run);
176 const blocked = execution.blocked ?? execution.outcome === "blocked";
177@@ -312,19 +378,15 @@ export async function runStep(
178 const exitCode = resolveExitCode(execution);
179
180 for (const line of execution.stdout ?? []) {
181- appendStreamChunk(run.logSession, "stdout", {
182- text: line
183- });
184+ await appendStreamChunkPersisted(run, "stdout", line);
185 }
186
187 for (const line of execution.stderr ?? []) {
188- appendStreamChunk(run.logSession, "stderr", {
189- text: line
190- });
191+ await appendStreamChunkPersisted(run, "stderr", line);
192 }
193
194 if (execution.outcome === "prepared") {
195- recordLifecycleEvent(run.logSession, {
196+ await recordLifecycleEventPersisted(run, {
197 type: "worker_execution_deferred",
198 level: "info",
199 message: `Real ${request.workerKind} execution is intentionally deferred while checkpoint capture remains active.`,
200@@ -335,7 +397,7 @@ export async function runStep(
201 });
202 }
203
204- recordLifecycleEvent(run.logSession, {
205+ await recordLifecycleEventPersisted(run, {
206 type: "worker_exited",
207 level: exitCode === 0 ? "info" : "error",
208 message: `Worker runner closed execution scope with outcome ${execution.outcome}.`,
209@@ -347,34 +409,36 @@ export async function runStep(
210
211 const finishedAt = new Date().toISOString();
212
213- recordLifecycleEvent(run.logSession, {
214- type: mapOutcomeToTerminalEvent(execution.outcome),
215- level: mapOutcomeToLevel(execution),
216- createdAt: finishedAt,
217- message: execution.summary,
218- data: {
219- ok: execution.ok,
220- blocked,
221- needsHuman,
222+ await recordLifecycleEventPersisted(
223+ run,
224+ {
225+ type: mapOutcomeToTerminalEvent(execution.outcome),
226+ level: mapOutcomeToLevel(execution),
227+ createdAt: finishedAt,
228+ message: execution.summary,
229+ data: {
230+ ok: execution.ok,
231+ blocked,
232+ needsHuman,
233+ exitCode
234+ }
235+ },
236+ {
237+ status: mapOutcomeToRunStatus(execution.outcome),
238+ finishedAt,
239+ summary: execution.summary,
240 exitCode
241 }
242- });
243+ );
244
245- emitLogTailCheckpoint(
246+ const logTailCheckpoint = emitLogTailCheckpoint(
247 run.checkpointManager,
248 run.checkpoint,
249 run.logSession,
250 `Captured combined log tail after ${execution.outcome} for ${request.stepId}.`,
251 request.checkpoint?.logTailLines
252 );
253-
254- synchronizeRunState(run, {
255- status: mapOutcomeToRunStatus(execution.outcome),
256- updatedAt: finishedAt,
257- finishedAt,
258- summary: execution.summary,
259- exitCode
260- });
261+ await persistCheckpointRecordIfPresent(run, logTailCheckpoint);
262
263 const logSummary = summarizeLocalRunLogSession(run.logSession);
264 const durationMs = Math.max(0, Date.parse(finishedAt) - Date.parse(startedAt));
1@@ -1,10 +1,10 @@
2 ---
3 task_id: T-016
4 title: Worker 本地持久化
5-status: todo
6+status: review
7 branch: feat/T-016-worker-persistence
8 repo: /Users/george/code/baa-conductor
9-base_ref: main
10+base_ref: main@c5e007b
11 depends_on:
12 - T-005
13 - T-006
14@@ -12,7 +12,7 @@ write_scope:
15 - apps/worker-runner/**
16 - packages/checkpointing/**
17 - packages/logging/**
18-updated_at: 2026-03-21
19+updated_at: 2026-03-22
20 ---
21
22 # T-016 Worker 本地持久化
23@@ -60,25 +60,39 @@ updated_at: 2026-03-21
24
25 ## files_changed
26
27-- 待填写
28+- `apps/worker-runner/src/runner.ts`
29+- `packages/checkpointing/src/index.ts`
30+- `packages/checkpointing/src/node-shims.ts`
31+- `packages/logging/src/index.ts`
32+- `packages/logging/src/node-shims.ts`
33+- `packages/logging/src/persistence.ts`
34+- `coordination/tasks/T-016-worker-persistence.md`
35
36 ## commands_run
37
38-- 待填写
39+- `npx --yes pnpm install`
40+- `npx --yes pnpm --filter @baa-conductor/logging typecheck`
41+- `npx --yes pnpm --filter @baa-conductor/checkpointing typecheck`
42+- `npx --yes pnpm --filter @baa-conductor/worker-runner typecheck`
43+- `npx --yes pnpm exec tsc -p apps/worker-runner/tsconfig.json --outDir <tmp> --module commonjs`
44+- `DIST_ROOT=<tmp> RUNTIME_ROOT=<tmp> node <<'EOF' ... EOF`
45
46 ## result
47
48-- 待填写
49+- `worker-runner` 现在会创建本地 run 目录,并在初始化时写入 `meta.json`、`state.json`、空的 `worker.log` / `stdout.log` / `stderr.log`。
50+- 生命周期事件、stdout/stderr chunk、checkpoint 写入后都会同步刷新本地 `state.json`,不再只停留在内存态。
51+- `summary` 与 `log_tail` checkpoint 会真实落到 `checkpoints/`,并已通过临时 CommonJS 编译验证生成 `0001-summary.json`、`0002-log-tail.txt`。
52
53 ## risks
54
55-- 待填写
56+- 真实 Codex 子进程尚未接入,当前只验证了 placeholder/custom executor 路径下的本地持久化。
57+- `git_diff`、`test_output` 等更大 checkpoint 负载还没有实际生成端,当前仅补齐了文件写入基础设施。
58
59 ## next_handoff
60
61-- 待填写
62+- 将真实 worker 执行器接到 `appendStreamChunkPersisted` / `recordLifecycleEventPersisted`,保持 stdout/stderr 与状态文件持续落盘。
63+- 在后续 checkpoint 任务里直接复用 `persistCheckpointRecord` 扩展 `git_diff`、`test_output` 的实际产出。
64
65 ## notes
66
67 - `2026-03-21`: 创建第三波任务卡
68-
+23,
-0
1@@ -1,3 +1,5 @@
2+import { mkdir, writeFile } from "node:fs/promises";
3+
4 export type CheckpointType = "summary" | "git_diff" | "log_tail" | "test_output";
5 export type CheckpointFileFormat = "json" | "text" | "patch";
6 export type CheckpointReplayStrategy = "none" | "git_apply_binary";
7@@ -268,6 +270,16 @@ function renderTextContent(contentText?: string): string {
8 return contentText.endsWith("\n") ? contentText : `${contentText}\n`;
9 }
10
11+function getParentDirectory(filePath: string): string {
12+ const lastSlashIndex = filePath.lastIndexOf("/");
13+
14+ if (lastSlashIndex <= 0) {
15+ return ".";
16+ }
17+
18+ return filePath.slice(0, lastSlashIndex);
19+}
20+
21 export function createCheckpointFilename(
22 seq: number,
23 checkpointTypeOrSuffix: CheckpointType | string
24@@ -382,6 +394,17 @@ export function renderCheckpointFile(record: CheckpointRecord): RenderedCheckpoi
25 };
26 }
27
28+export async function persistCheckpointRecord(
29+ record: CheckpointRecord
30+): Promise<RenderedCheckpointFile> {
31+ const rendered = renderCheckpointFile(record);
32+
33+ await mkdir(getParentDirectory(rendered.storagePath), { recursive: true });
34+ await writeFile(rendered.storagePath, rendered.content, "utf8");
35+
36+ return rendered;
37+}
38+
39 export function createCheckpointManager(input: CreateCheckpointManagerInput): CheckpointManager {
40 const state = createCheckpointManagerState(input);
41
+23,
-0
1@@ -0,0 +1,23 @@
2+declare module "node:fs/promises" {
3+ export interface FileOperationOptions {
4+ encoding?: string;
5+ mode?: number;
6+ flag?: string;
7+ }
8+
9+ export interface MakeDirectoryOptions {
10+ recursive?: boolean;
11+ mode?: number;
12+ }
13+
14+ export function mkdir(
15+ path: string,
16+ options?: MakeDirectoryOptions
17+ ): Promise<string | undefined>;
18+
19+ export function writeFile(
20+ path: string,
21+ data: string,
22+ options?: FileOperationOptions | string
23+ ): Promise<void>;
24+}
+1,
-0
1@@ -1,4 +1,5 @@
2 export * from "./contracts";
3 export * from "./paths";
4+export * from "./persistence";
5 export * from "./session";
6 export * from "./state";
+29,
-0
1@@ -0,0 +1,29 @@
2+declare module "node:fs/promises" {
3+ export interface FileOperationOptions {
4+ encoding?: string;
5+ mode?: number;
6+ flag?: string;
7+ }
8+
9+ export interface MakeDirectoryOptions {
10+ recursive?: boolean;
11+ mode?: number;
12+ }
13+
14+ export function mkdir(
15+ path: string,
16+ options?: MakeDirectoryOptions
17+ ): Promise<string | undefined>;
18+
19+ export function writeFile(
20+ path: string,
21+ data: string,
22+ options?: FileOperationOptions | string
23+ ): Promise<void>;
24+
25+ export function appendFile(
26+ path: string,
27+ data: string,
28+ options?: FileOperationOptions | string
29+ ): Promise<void>;
30+}
+76,
-0
1@@ -0,0 +1,76 @@
2+import { appendFile, mkdir, writeFile } from "node:fs/promises";
3+import type {
4+ LocalRunLogSession,
5+ LocalRunPaths,
6+ RunMetadata,
7+ RunStateSnapshot,
8+ StreamChunkLogEntry,
9+ WorkerLifecycleLogEntry
10+} from "./contracts";
11+
12+function renderJsonFile(value: unknown): string {
13+ return `${JSON.stringify(value, null, 2)}\n`;
14+}
15+
16+function ensureTrailingNewline(value: string): string {
17+ if (value === "" || value.endsWith("\n")) {
18+ return value;
19+ }
20+
21+ return `${value}\n`;
22+}
23+
24+export async function ensureLocalRunLayout(paths: LocalRunPaths): Promise<void> {
25+ await Promise.all([
26+ mkdir(paths.taskRunsDir, { recursive: true }),
27+ mkdir(paths.runDir, { recursive: true }),
28+ mkdir(paths.checkpointsDir, { recursive: true }),
29+ mkdir(paths.artifactsDir, { recursive: true })
30+ ]);
31+}
32+
33+export async function writeRunMetadata(
34+ paths: LocalRunPaths,
35+ metadata: RunMetadata
36+): Promise<void> {
37+ await writeFile(paths.metaPath, renderJsonFile(metadata), "utf8");
38+}
39+
40+export async function writeRunState(
41+ paths: LocalRunPaths,
42+ state: RunStateSnapshot
43+): Promise<void> {
44+ await writeFile(paths.statePath, renderJsonFile(state), "utf8");
45+}
46+
47+export async function initializeLocalRunFiles(
48+ paths: LocalRunPaths,
49+ metadata: RunMetadata,
50+ state: RunStateSnapshot
51+): Promise<void> {
52+ await ensureLocalRunLayout(paths);
53+ await Promise.all([
54+ writeRunMetadata(paths, metadata),
55+ writeRunState(paths, state),
56+ writeFile(paths.workerLogPath, "", "utf8"),
57+ writeFile(paths.stdoutLogPath, "", "utf8"),
58+ writeFile(paths.stderrLogPath, "", "utf8")
59+ ]);
60+}
61+
62+export async function appendLifecycleEntry(
63+ session: LocalRunLogSession,
64+ entry: WorkerLifecycleLogEntry
65+): Promise<void> {
66+ await appendFile(session.worker.filePath, ensureTrailingNewline(entry.renderedLine), "utf8");
67+}
68+
69+export async function appendStreamEntry(
70+ session: LocalRunLogSession,
71+ entry: StreamChunkLogEntry
72+): Promise<void> {
73+ const filePath =
74+ entry.channel === "stdout" ? session.stdout.filePath : session.stderr.filePath;
75+
76+ await appendFile(filePath, ensureTrailingNewline(entry.text), "utf8");
77+}