baa-conductor


baa-conductor / apps / worker-runner / src
im_wower  ·  2026-03-22

runner.ts

  1import {
  2  persistCheckpointRecord,
  3  type CheckpointRecord
  4} from "@baa-conductor/checkpointing";
  5import {
  6  appendLifecycleEntry,
  7  appendStreamChunk,
  8  appendStreamEntry,
  9  createLocalRunLogSession,
 10  createLocalRunPaths,
 11  createRunMetadata,
 12  createRunStateSnapshot,
 13  initializeLocalRunFiles,
 14  recordLifecycleEvent,
 15  summarizeLocalRunLogSession,
 16  updateRunState,
 17  writeRunState,
 18  type LogLevel,
 19  type RunStatePatch,
 20  type RunStatus,
 21  type StreamLogChannel,
 22  type StructuredData,
 23  type WorkerLifecycleEventInput,
 24  type WorkerLifecycleEventType
 25} from "@baa-conductor/logging";
 26import type {
 27  PreparedStepRun,
 28  StepArtifact,
 29  StepExecutionOutcome,
 30  StepExecutionRequest,
 31  StepExecutionResult,
 32  WorkerExecutionOutcome,
 33  WorkerExecutor
 34} from "./contracts";
 35import {
 36  createPreparedCheckpointState,
 37  createRunCheckpointManager,
 38  emitLogTailCheckpoint,
 39  emitPreparationCheckpoint,
 40  resolveCheckpointConfig
 41} from "./checkpoints";
 42
 43function synchronizeRunState(run: PreparedStepRun, patch: RunStatePatch): void {
 44  run.state = updateRunState(run.state, {
 45    ...patch,
 46    lastEventSeq: run.logSession.nextSeq - 1,
 47    checkpointSeq: run.checkpoint.lastCheckpointSeq
 48  });
 49}
 50
 51async function persistRunState(run: PreparedStepRun): Promise<void> {
 52  await writeRunState(run.logPaths, run.state);
 53}
 54
 55async function synchronizeRunStatePersisted(
 56  run: PreparedStepRun,
 57  patch: RunStatePatch
 58): Promise<void> {
 59  synchronizeRunState(run, patch);
 60  await persistRunState(run);
 61}
 62
 63async function persistCheckpointRecordIfPresent(
 64  run: PreparedStepRun,
 65  record?: CheckpointRecord
 66): Promise<void> {
 67  if (record === undefined) {
 68    return;
 69  }
 70
 71  await persistCheckpointRecord(record);
 72  await synchronizeRunStatePersisted(run, {
 73    updatedAt: record.createdAt
 74  });
 75}
 76
 77async function recordLifecycleEventPersisted(
 78  run: PreparedStepRun,
 79  input: WorkerLifecycleEventInput,
 80  statePatch: RunStatePatch = {}
 81): Promise<void> {
 82  const entry = recordLifecycleEvent(run.logSession, input);
 83
 84  await appendLifecycleEntry(run.logSession, entry);
 85  await synchronizeRunStatePersisted(run, {
 86    ...statePatch,
 87    updatedAt: statePatch.updatedAt ?? entry.createdAt
 88  });
 89}
 90
 91async function appendStreamChunkPersisted(
 92  run: PreparedStepRun,
 93  channel: StreamLogChannel,
 94  text: string
 95): Promise<void> {
 96  const entry = appendStreamChunk(run.logSession, channel, {
 97    text
 98  });
 99
100  await appendStreamEntry(run.logSession, entry);
101  await synchronizeRunStatePersisted(run, {
102    updatedAt: entry.createdAt
103  });
104}
105
106function createDefaultArtifacts(run: PreparedStepRun): StepArtifact[] {
107  return [
108    {
109      name: "meta.json",
110      kind: "metadata",
111      path: run.logPaths.metaPath,
112      description: "Immutable run metadata for this step attempt."
113    },
114    {
115      name: "state.json",
116      kind: "state",
117      path: run.logPaths.statePath,
118      description: "Mutable run state snapshot for progress tracking and recovery."
119    },
120    {
121      name: "worker.log",
122      kind: "log",
123      path: run.logPaths.workerLogPath,
124      description: "Lifecycle events rendered into the worker log stream."
125    },
126    {
127      name: "stdout.log",
128      kind: "log",
129      path: run.logPaths.stdoutLogPath,
130      description: "Raw stdout stream produced by the worker process."
131    },
132    {
133      name: "stderr.log",
134      kind: "log",
135      path: run.logPaths.stderrLogPath,
136      description: "Raw stderr stream produced by the worker process."
137    },
138    {
139      name: "checkpoints",
140      kind: "directory",
141      path: run.logPaths.checkpointsDir,
142      description: "Checkpoint directory layout for summary/log tail records and future git diff snapshots."
143    },
144    {
145      name: "artifacts",
146      kind: "directory",
147      path: run.logPaths.artifactsDir,
148      description: "Reserved artifact directory for worker outputs."
149    }
150  ];
151}
152
153function mergeArtifacts(run: PreparedStepRun, extraArtifacts: StepArtifact[] = []): StepArtifact[] {
154  const merged = new Map<string, StepArtifact>();
155
156  for (const artifact of createDefaultArtifacts(run)) {
157    merged.set(artifact.path, artifact);
158  }
159
160  for (const artifact of extraArtifacts) {
161    merged.set(artifact.path, artifact);
162  }
163
164  return [...merged.values()];
165}
166
167function mapOutcomeToRunStatus(outcome: StepExecutionOutcome): RunStatus {
168  switch (outcome) {
169    case "prepared":
170      return "prepared";
171    case "completed":
172      return "completed";
173    case "failed":
174      return "failed";
175    case "blocked":
176      return "blocked";
177  }
178}
179
180function mapOutcomeToTerminalEvent(outcome: StepExecutionOutcome): WorkerLifecycleEventType {
181  switch (outcome) {
182    case "prepared":
183      return "step_prepared";
184    case "completed":
185      return "step_completed";
186    case "failed":
187      return "step_failed";
188    case "blocked":
189      return "step_blocked";
190  }
191}
192
193function mapOutcomeToLevel(execution: WorkerExecutionOutcome): LogLevel {
194  if (execution.outcome === "failed") {
195    return "error";
196  }
197
198  if (execution.outcome === "blocked") {
199    return "warn";
200  }
201
202  return "info";
203}
204
205function resolveExitCode(execution: WorkerExecutionOutcome): number {
206  if (execution.exitCode !== undefined) {
207    return execution.exitCode;
208  }
209
210  return execution.ok ? 0 : 1;
211}
212
213export async function prepareStepRun(request: StepExecutionRequest): Promise<PreparedStepRun> {
214  const startedAt = request.createdAt ?? new Date().toISOString();
215  const checkpointConfig = resolveCheckpointConfig(request.checkpoint);
216  const logPaths = createLocalRunPaths({
217    repoRoot: request.runtime.repoRoot,
218    taskId: request.taskId,
219    runId: request.runId,
220    runsRootDir: request.runtime.runsRootDir
221  });
222  const checkpointManager = createRunCheckpointManager(request, logPaths.checkpointsDir);
223  const checkpoint = createPreparedCheckpointState(checkpointManager, checkpointConfig);
224  const preparationCheckpoint = emitPreparationCheckpoint(checkpointManager, checkpoint, request);
225  const logSession = createLocalRunLogSession(logPaths, {
226    taskId: request.taskId,
227    stepId: request.stepId,
228    runId: request.runId
229  });
230  const run: PreparedStepRun = {
231    request,
232    logPaths,
233    logSession,
234    metadata: createRunMetadata({
235      taskId: request.taskId,
236      stepId: request.stepId,
237      stepName: request.stepName,
238      runId: request.runId,
239      workerKind: request.workerKind,
240      attempt: request.attempt,
241      repoRoot: request.runtime.repoRoot,
242      worktreePath: request.runtime.worktreePath,
243      createdAt: startedAt,
244      checkpointMode: checkpoint.mode,
245      promptSummary: request.promptSummary,
246      command: request.command
247    }),
248    state: createRunStateSnapshot({
249      attempt: request.attempt,
250      startedAt,
251      checkpointSeq: checkpoint.lastCheckpointSeq,
252      summary: preparationCheckpoint?.summary ?? `Prepared local run layout for ${request.stepId}.`
253    }),
254    checkpoint,
255    checkpointManager
256  };
257
258  await initializeLocalRunFiles(run.logPaths, run.metadata, run.state);
259  await persistCheckpointRecordIfPresent(run, preparationCheckpoint);
260
261  await recordLifecycleEventPersisted(run, {
262    type: "run_prepared",
263    level: "info",
264    createdAt: startedAt,
265    message: `Prepared local run layout for ${request.stepId}.`,
266    data: {
267      stepName: request.stepName,
268      stepKind: request.stepKind,
269      workerKind: request.workerKind,
270      attempt: request.attempt,
271      timeoutSec: request.timeoutSec,
272      runDir: run.logPaths.runDir,
273      worktreePath: request.runtime.worktreePath
274    }
275  });
276
277  if (checkpoint.mode === "capture") {
278    const checkpointEventData: StructuredData = {
279      checkpointsDir: run.logPaths.checkpointsDir,
280      nextCheckpointSeq: checkpoint.nextCheckpointSeq,
281      supportedTypes: checkpoint.supportedTypes
282    };
283
284    if (preparationCheckpoint !== undefined) {
285      checkpointEventData.latestCheckpointPath = preparationCheckpoint.file.storagePath;
286    }
287
288    if (checkpoint.gitDiffPlan !== undefined) {
289      checkpointEventData.gitDiffCommands = {
290        checkpointType: checkpoint.gitDiffPlan.checkpointType,
291        cadenceSec: {
292          min: checkpoint.gitDiffPlan.cadenceSec.min,
293          max: checkpoint.gitDiffPlan.cadenceSec.max
294        },
295        commands: {
296          statusShort: {
297            purpose: checkpoint.gitDiffPlan.commands.statusShort.purpose,
298            args: checkpoint.gitDiffPlan.commands.statusShort.args
299          },
300          binaryDiff: {
301            purpose: checkpoint.gitDiffPlan.commands.binaryDiff.purpose,
302            args: checkpoint.gitDiffPlan.commands.binaryDiff.args
303          },
304          diffStat:
305            checkpoint.gitDiffPlan.commands.diffStat === undefined
306              ? null
307              : {
308                  purpose: checkpoint.gitDiffPlan.commands.diffStat.purpose,
309                  args: checkpoint.gitDiffPlan.commands.diffStat.args
310                }
311        },
312        replay: {
313          strategy: checkpoint.gitDiffPlan.replay.strategy,
314          args: checkpoint.gitDiffPlan.replay.args,
315          baseRef: checkpoint.gitDiffPlan.replay.baseRef ?? null
316        }
317      };
318    }
319
320    await recordLifecycleEventPersisted(run, {
321      type: "checkpoint_slot_reserved",
322      level: "info",
323      message: `Initialized checkpoint manager at sequence ${checkpoint.lastCheckpointSeq}.`,
324      data: checkpointEventData
325    });
326  }
327
328  return run;
329}
330
331export function createPlaceholderWorkerExecutor(): WorkerExecutor {
332  return {
333    async execute(run: PreparedStepRun): Promise<WorkerExecutionOutcome> {
334      return {
335        ok: true,
336        outcome: "prepared",
337        summary: `Prepared local log streams and result envelope for ${run.request.stepId}; real ${run.request.workerKind} execution remains deferred.`,
338        needsHuman: false,
339        blocked: false,
340        exitCode: 0
341      };
342    }
343  };
344}
345
346export async function runStep(
347  request: StepExecutionRequest,
348  executor: WorkerExecutor = createPlaceholderWorkerExecutor()
349): Promise<StepExecutionResult> {
350  const startedAt = request.createdAt ?? new Date().toISOString();
351  const run = await prepareStepRun({
352    ...request,
353    createdAt: startedAt
354  });
355  const executionStartedAt = new Date().toISOString();
356
357  await recordLifecycleEventPersisted(
358    run,
359    {
360      type: "worker_started",
361      level: "info",
362      createdAt: executionStartedAt,
363      message: `Worker runner opened execution scope for ${request.workerKind} step ${request.stepId}.`,
364      data: {
365        stepKind: request.stepKind,
366        timeoutSec: request.timeoutSec,
367        worktreePath: request.runtime.worktreePath
368      }
369    },
370    {
371      status: "running"
372    }
373  );
374
375  const execution = await executor.execute(run);
376  const blocked = execution.blocked ?? execution.outcome === "blocked";
377  const needsHuman = execution.needsHuman ?? false;
378  const exitCode = resolveExitCode(execution);
379
380  for (const line of execution.stdout ?? []) {
381    await appendStreamChunkPersisted(run, "stdout", line);
382  }
383
384  for (const line of execution.stderr ?? []) {
385    await appendStreamChunkPersisted(run, "stderr", line);
386  }
387
388  if (execution.outcome === "prepared") {
389    await recordLifecycleEventPersisted(run, {
390      type: "worker_execution_deferred",
391      level: "info",
392      message: `Real ${request.workerKind} execution is intentionally deferred while checkpoint capture remains active.`,
393      data: {
394        supportedTypes: run.checkpoint.supportedTypes,
395        nextCheckpointSeq: run.checkpoint.nextCheckpointSeq
396      }
397    });
398  }
399
400  await recordLifecycleEventPersisted(run, {
401    type: "worker_exited",
402    level: exitCode === 0 ? "info" : "error",
403    message: `Worker runner closed execution scope with outcome ${execution.outcome}.`,
404    data: {
405      exitCode,
406      outcome: execution.outcome
407    }
408  });
409
410  const finishedAt = new Date().toISOString();
411
412  await recordLifecycleEventPersisted(
413    run,
414    {
415      type: mapOutcomeToTerminalEvent(execution.outcome),
416      level: mapOutcomeToLevel(execution),
417      createdAt: finishedAt,
418      message: execution.summary,
419      data: {
420        ok: execution.ok,
421        blocked,
422        needsHuman,
423        exitCode
424      }
425    },
426    {
427      status: mapOutcomeToRunStatus(execution.outcome),
428      finishedAt,
429      summary: execution.summary,
430      exitCode
431    }
432  );
433
434  const logTailCheckpoint = emitLogTailCheckpoint(
435    run.checkpointManager,
436    run.checkpoint,
437    run.logSession,
438    `Captured combined log tail after ${execution.outcome} for ${request.stepId}.`,
439    request.checkpoint?.logTailLines
440  );
441  await persistCheckpointRecordIfPresent(run, logTailCheckpoint);
442
443  const logSummary = summarizeLocalRunLogSession(run.logSession);
444  const durationMs = Math.max(0, Date.parse(finishedAt) - Date.parse(startedAt));
445
446  return {
447    ok: execution.ok,
448    outcome: execution.outcome,
449    summary: execution.summary,
450    needsHuman,
451    blocked,
452    taskId: request.taskId,
453    stepId: request.stepId,
454    runId: request.runId,
455    attempt: request.attempt,
456    workerKind: request.workerKind,
457    startedAt,
458    finishedAt,
459    durationMs,
460    metadata: run.metadata,
461    state: run.state,
462    logPaths: run.logPaths,
463    logSession: run.logSession,
464    logSummary,
465    lifecycleEvents: [...run.logSession.worker.entries],
466    checkpoint: run.checkpoint,
467    artifacts: mergeArtifacts(run, execution.artifacts),
468    suggestedFollowup: execution.suggestedFollowup ?? [],
469    metrics: {
470      durationMs,
471      lifecycleEventCount: logSummary.lifecycleEventCount,
472      stdoutChunkCount: logSummary.stdoutChunkCount,
473      stderrChunkCount: logSummary.stderrChunkCount,
474      checkpointCount: run.checkpoint.records.length
475    }
476  };
477}