im_wower
·
2026-03-22
runner.ts
1import {
2 persistCheckpointRecord,
3 type CheckpointRecord
4} from "@baa-conductor/checkpointing";
5import {
6 appendLifecycleEntry,
7 appendStreamChunk,
8 appendStreamEntry,
9 createLocalRunLogSession,
10 createLocalRunPaths,
11 createRunMetadata,
12 createRunStateSnapshot,
13 initializeLocalRunFiles,
14 recordLifecycleEvent,
15 summarizeLocalRunLogSession,
16 updateRunState,
17 writeRunState,
18 type LogLevel,
19 type RunStatePatch,
20 type RunStatus,
21 type StreamLogChannel,
22 type StructuredData,
23 type WorkerLifecycleEventInput,
24 type WorkerLifecycleEventType
25} from "@baa-conductor/logging";
26import type {
27 PreparedStepRun,
28 StepArtifact,
29 StepExecutionOutcome,
30 StepExecutionRequest,
31 StepExecutionResult,
32 WorkerExecutionOutcome,
33 WorkerExecutor
34} from "./contracts";
35import {
36 createPreparedCheckpointState,
37 createRunCheckpointManager,
38 emitLogTailCheckpoint,
39 emitPreparationCheckpoint,
40 resolveCheckpointConfig
41} from "./checkpoints";
42
43function synchronizeRunState(run: PreparedStepRun, patch: RunStatePatch): void {
44 run.state = updateRunState(run.state, {
45 ...patch,
46 lastEventSeq: run.logSession.nextSeq - 1,
47 checkpointSeq: run.checkpoint.lastCheckpointSeq
48 });
49}
50
51async function persistRunState(run: PreparedStepRun): Promise<void> {
52 await writeRunState(run.logPaths, run.state);
53}
54
55async function synchronizeRunStatePersisted(
56 run: PreparedStepRun,
57 patch: RunStatePatch
58): Promise<void> {
59 synchronizeRunState(run, patch);
60 await persistRunState(run);
61}
62
63async function persistCheckpointRecordIfPresent(
64 run: PreparedStepRun,
65 record?: CheckpointRecord
66): Promise<void> {
67 if (record === undefined) {
68 return;
69 }
70
71 await persistCheckpointRecord(record);
72 await synchronizeRunStatePersisted(run, {
73 updatedAt: record.createdAt
74 });
75}
76
77async function recordLifecycleEventPersisted(
78 run: PreparedStepRun,
79 input: WorkerLifecycleEventInput,
80 statePatch: RunStatePatch = {}
81): Promise<void> {
82 const entry = recordLifecycleEvent(run.logSession, input);
83
84 await appendLifecycleEntry(run.logSession, entry);
85 await synchronizeRunStatePersisted(run, {
86 ...statePatch,
87 updatedAt: statePatch.updatedAt ?? entry.createdAt
88 });
89}
90
91async function appendStreamChunkPersisted(
92 run: PreparedStepRun,
93 channel: StreamLogChannel,
94 text: string
95): Promise<void> {
96 const entry = appendStreamChunk(run.logSession, channel, {
97 text
98 });
99
100 await appendStreamEntry(run.logSession, entry);
101 await synchronizeRunStatePersisted(run, {
102 updatedAt: entry.createdAt
103 });
104}
105
106function createDefaultArtifacts(run: PreparedStepRun): StepArtifact[] {
107 return [
108 {
109 name: "meta.json",
110 kind: "metadata",
111 path: run.logPaths.metaPath,
112 description: "Immutable run metadata for this step attempt."
113 },
114 {
115 name: "state.json",
116 kind: "state",
117 path: run.logPaths.statePath,
118 description: "Mutable run state snapshot for progress tracking and recovery."
119 },
120 {
121 name: "worker.log",
122 kind: "log",
123 path: run.logPaths.workerLogPath,
124 description: "Lifecycle events rendered into the worker log stream."
125 },
126 {
127 name: "stdout.log",
128 kind: "log",
129 path: run.logPaths.stdoutLogPath,
130 description: "Raw stdout stream produced by the worker process."
131 },
132 {
133 name: "stderr.log",
134 kind: "log",
135 path: run.logPaths.stderrLogPath,
136 description: "Raw stderr stream produced by the worker process."
137 },
138 {
139 name: "checkpoints",
140 kind: "directory",
141 path: run.logPaths.checkpointsDir,
142 description: "Checkpoint directory layout for summary/log tail records and future git diff snapshots."
143 },
144 {
145 name: "artifacts",
146 kind: "directory",
147 path: run.logPaths.artifactsDir,
148 description: "Reserved artifact directory for worker outputs."
149 }
150 ];
151}
152
153function mergeArtifacts(run: PreparedStepRun, extraArtifacts: StepArtifact[] = []): StepArtifact[] {
154 const merged = new Map<string, StepArtifact>();
155
156 for (const artifact of createDefaultArtifacts(run)) {
157 merged.set(artifact.path, artifact);
158 }
159
160 for (const artifact of extraArtifacts) {
161 merged.set(artifact.path, artifact);
162 }
163
164 return [...merged.values()];
165}
166
167function mapOutcomeToRunStatus(outcome: StepExecutionOutcome): RunStatus {
168 switch (outcome) {
169 case "prepared":
170 return "prepared";
171 case "completed":
172 return "completed";
173 case "failed":
174 return "failed";
175 case "blocked":
176 return "blocked";
177 }
178}
179
180function mapOutcomeToTerminalEvent(outcome: StepExecutionOutcome): WorkerLifecycleEventType {
181 switch (outcome) {
182 case "prepared":
183 return "step_prepared";
184 case "completed":
185 return "step_completed";
186 case "failed":
187 return "step_failed";
188 case "blocked":
189 return "step_blocked";
190 }
191}
192
193function mapOutcomeToLevel(execution: WorkerExecutionOutcome): LogLevel {
194 if (execution.outcome === "failed") {
195 return "error";
196 }
197
198 if (execution.outcome === "blocked") {
199 return "warn";
200 }
201
202 return "info";
203}
204
205function resolveExitCode(execution: WorkerExecutionOutcome): number {
206 if (execution.exitCode !== undefined) {
207 return execution.exitCode;
208 }
209
210 return execution.ok ? 0 : 1;
211}
212
213export async function prepareStepRun(request: StepExecutionRequest): Promise<PreparedStepRun> {
214 const startedAt = request.createdAt ?? new Date().toISOString();
215 const checkpointConfig = resolveCheckpointConfig(request.checkpoint);
216 const logPaths = createLocalRunPaths({
217 repoRoot: request.runtime.repoRoot,
218 taskId: request.taskId,
219 runId: request.runId,
220 runsRootDir: request.runtime.runsRootDir
221 });
222 const checkpointManager = createRunCheckpointManager(request, logPaths.checkpointsDir);
223 const checkpoint = createPreparedCheckpointState(checkpointManager, checkpointConfig);
224 const preparationCheckpoint = emitPreparationCheckpoint(checkpointManager, checkpoint, request);
225 const logSession = createLocalRunLogSession(logPaths, {
226 taskId: request.taskId,
227 stepId: request.stepId,
228 runId: request.runId
229 });
230 const run: PreparedStepRun = {
231 request,
232 logPaths,
233 logSession,
234 metadata: createRunMetadata({
235 taskId: request.taskId,
236 stepId: request.stepId,
237 stepName: request.stepName,
238 runId: request.runId,
239 workerKind: request.workerKind,
240 attempt: request.attempt,
241 repoRoot: request.runtime.repoRoot,
242 worktreePath: request.runtime.worktreePath,
243 createdAt: startedAt,
244 checkpointMode: checkpoint.mode,
245 promptSummary: request.promptSummary,
246 command: request.command
247 }),
248 state: createRunStateSnapshot({
249 attempt: request.attempt,
250 startedAt,
251 checkpointSeq: checkpoint.lastCheckpointSeq,
252 summary: preparationCheckpoint?.summary ?? `Prepared local run layout for ${request.stepId}.`
253 }),
254 checkpoint,
255 checkpointManager
256 };
257
258 await initializeLocalRunFiles(run.logPaths, run.metadata, run.state);
259 await persistCheckpointRecordIfPresent(run, preparationCheckpoint);
260
261 await recordLifecycleEventPersisted(run, {
262 type: "run_prepared",
263 level: "info",
264 createdAt: startedAt,
265 message: `Prepared local run layout for ${request.stepId}.`,
266 data: {
267 stepName: request.stepName,
268 stepKind: request.stepKind,
269 workerKind: request.workerKind,
270 attempt: request.attempt,
271 timeoutSec: request.timeoutSec,
272 runDir: run.logPaths.runDir,
273 worktreePath: request.runtime.worktreePath
274 }
275 });
276
277 if (checkpoint.mode === "capture") {
278 const checkpointEventData: StructuredData = {
279 checkpointsDir: run.logPaths.checkpointsDir,
280 nextCheckpointSeq: checkpoint.nextCheckpointSeq,
281 supportedTypes: checkpoint.supportedTypes
282 };
283
284 if (preparationCheckpoint !== undefined) {
285 checkpointEventData.latestCheckpointPath = preparationCheckpoint.file.storagePath;
286 }
287
288 if (checkpoint.gitDiffPlan !== undefined) {
289 checkpointEventData.gitDiffCommands = {
290 checkpointType: checkpoint.gitDiffPlan.checkpointType,
291 cadenceSec: {
292 min: checkpoint.gitDiffPlan.cadenceSec.min,
293 max: checkpoint.gitDiffPlan.cadenceSec.max
294 },
295 commands: {
296 statusShort: {
297 purpose: checkpoint.gitDiffPlan.commands.statusShort.purpose,
298 args: checkpoint.gitDiffPlan.commands.statusShort.args
299 },
300 binaryDiff: {
301 purpose: checkpoint.gitDiffPlan.commands.binaryDiff.purpose,
302 args: checkpoint.gitDiffPlan.commands.binaryDiff.args
303 },
304 diffStat:
305 checkpoint.gitDiffPlan.commands.diffStat === undefined
306 ? null
307 : {
308 purpose: checkpoint.gitDiffPlan.commands.diffStat.purpose,
309 args: checkpoint.gitDiffPlan.commands.diffStat.args
310 }
311 },
312 replay: {
313 strategy: checkpoint.gitDiffPlan.replay.strategy,
314 args: checkpoint.gitDiffPlan.replay.args,
315 baseRef: checkpoint.gitDiffPlan.replay.baseRef ?? null
316 }
317 };
318 }
319
320 await recordLifecycleEventPersisted(run, {
321 type: "checkpoint_slot_reserved",
322 level: "info",
323 message: `Initialized checkpoint manager at sequence ${checkpoint.lastCheckpointSeq}.`,
324 data: checkpointEventData
325 });
326 }
327
328 return run;
329}
330
331export function createPlaceholderWorkerExecutor(): WorkerExecutor {
332 return {
333 async execute(run: PreparedStepRun): Promise<WorkerExecutionOutcome> {
334 return {
335 ok: true,
336 outcome: "prepared",
337 summary: `Prepared local log streams and result envelope for ${run.request.stepId}; real ${run.request.workerKind} execution remains deferred.`,
338 needsHuman: false,
339 blocked: false,
340 exitCode: 0
341 };
342 }
343 };
344}
345
346export async function runStep(
347 request: StepExecutionRequest,
348 executor: WorkerExecutor = createPlaceholderWorkerExecutor()
349): Promise<StepExecutionResult> {
350 const startedAt = request.createdAt ?? new Date().toISOString();
351 const run = await prepareStepRun({
352 ...request,
353 createdAt: startedAt
354 });
355 const executionStartedAt = new Date().toISOString();
356
357 await recordLifecycleEventPersisted(
358 run,
359 {
360 type: "worker_started",
361 level: "info",
362 createdAt: executionStartedAt,
363 message: `Worker runner opened execution scope for ${request.workerKind} step ${request.stepId}.`,
364 data: {
365 stepKind: request.stepKind,
366 timeoutSec: request.timeoutSec,
367 worktreePath: request.runtime.worktreePath
368 }
369 },
370 {
371 status: "running"
372 }
373 );
374
375 const execution = await executor.execute(run);
376 const blocked = execution.blocked ?? execution.outcome === "blocked";
377 const needsHuman = execution.needsHuman ?? false;
378 const exitCode = resolveExitCode(execution);
379
380 for (const line of execution.stdout ?? []) {
381 await appendStreamChunkPersisted(run, "stdout", line);
382 }
383
384 for (const line of execution.stderr ?? []) {
385 await appendStreamChunkPersisted(run, "stderr", line);
386 }
387
388 if (execution.outcome === "prepared") {
389 await recordLifecycleEventPersisted(run, {
390 type: "worker_execution_deferred",
391 level: "info",
392 message: `Real ${request.workerKind} execution is intentionally deferred while checkpoint capture remains active.`,
393 data: {
394 supportedTypes: run.checkpoint.supportedTypes,
395 nextCheckpointSeq: run.checkpoint.nextCheckpointSeq
396 }
397 });
398 }
399
400 await recordLifecycleEventPersisted(run, {
401 type: "worker_exited",
402 level: exitCode === 0 ? "info" : "error",
403 message: `Worker runner closed execution scope with outcome ${execution.outcome}.`,
404 data: {
405 exitCode,
406 outcome: execution.outcome
407 }
408 });
409
410 const finishedAt = new Date().toISOString();
411
412 await recordLifecycleEventPersisted(
413 run,
414 {
415 type: mapOutcomeToTerminalEvent(execution.outcome),
416 level: mapOutcomeToLevel(execution),
417 createdAt: finishedAt,
418 message: execution.summary,
419 data: {
420 ok: execution.ok,
421 blocked,
422 needsHuman,
423 exitCode
424 }
425 },
426 {
427 status: mapOutcomeToRunStatus(execution.outcome),
428 finishedAt,
429 summary: execution.summary,
430 exitCode
431 }
432 );
433
434 const logTailCheckpoint = emitLogTailCheckpoint(
435 run.checkpointManager,
436 run.checkpoint,
437 run.logSession,
438 `Captured combined log tail after ${execution.outcome} for ${request.stepId}.`,
439 request.checkpoint?.logTailLines
440 );
441 await persistCheckpointRecordIfPresent(run, logTailCheckpoint);
442
443 const logSummary = summarizeLocalRunLogSession(run.logSession);
444 const durationMs = Math.max(0, Date.parse(finishedAt) - Date.parse(startedAt));
445
446 return {
447 ok: execution.ok,
448 outcome: execution.outcome,
449 summary: execution.summary,
450 needsHuman,
451 blocked,
452 taskId: request.taskId,
453 stepId: request.stepId,
454 runId: request.runId,
455 attempt: request.attempt,
456 workerKind: request.workerKind,
457 startedAt,
458 finishedAt,
459 durationMs,
460 metadata: run.metadata,
461 state: run.state,
462 logPaths: run.logPaths,
463 logSession: run.logSession,
464 logSummary,
465 lifecycleEvents: [...run.logSession.worker.entries],
466 checkpoint: run.checkpoint,
467 artifacts: mergeArtifacts(run, execution.artifacts),
468 suggestedFollowup: execution.suggestedFollowup ?? [],
469 metrics: {
470 durationMs,
471 lifecycleEventCount: logSummary.lifecycleEventCount,
472 stdoutChunkCount: logSummary.stdoutChunkCount,
473 stderrChunkCount: logSummary.stderrChunkCount,
474 checkpointCount: run.checkpoint.records.length
475 }
476 };
477}