baa-conductor


baa-conductor / apps / worker-runner / src
im_wower  ·  2026-03-21

checkpoints.ts

  1import {
  2  createCheckpointManager,
  3  renderCheckpointFile,
  4  type CheckpointManager,
  5  type CheckpointRecord
  6} from "@baa-conductor/checkpointing";
  7import type {
  8  LocalRunLogSession,
  9  StreamChunkLogEntry,
 10  WorkerLifecycleLogEntry
 11} from "@baa-conductor/logging";
 12import type { StepCheckpointConfig, StepCheckpointState, StepExecutionRequest } from "./contracts";
 13
 14const DEFAULT_LOG_TAIL_LINES = 20;
 15
 16type OrderedLogEntry =
 17  | {
 18      seq: number;
 19      rendered: string;
 20    }
 21  | {
 22      seq: number;
 23      rendered: string;
 24    };
 25
 26function mapWorkerEntry(entry: WorkerLifecycleLogEntry): OrderedLogEntry {
 27  return {
 28    seq: entry.seq,
 29    rendered: `[worker] ${entry.renderedLine}`
 30  };
 31}
 32
 33function mapStreamEntry(channel: "stdout" | "stderr", entry: StreamChunkLogEntry): OrderedLogEntry {
 34  return {
 35    seq: entry.seq,
 36    rendered: `[${channel}] ${entry.text}`
 37  };
 38}
 39
 40function createCheckpointState(manager: CheckpointManager, mode: StepCheckpointConfig["mode"]): StepCheckpointState {
 41  const { state } = manager;
 42  const latest = state.records.at(-1);
 43
 44  return {
 45    ...state,
 46    mode,
 47    lastCheckpointSeq: state.lastSeq,
 48    nextCheckpointSeq: state.nextSeq,
 49    latest
 50  };
 51}
 52
 53function updateCheckpointState(runState: StepCheckpointState, manager: CheckpointManager): StepCheckpointState {
 54  const nextState = createCheckpointState(manager, runState.mode);
 55
 56  Object.assign(runState, nextState);
 57
 58  return runState;
 59}
 60
 61export function resolveCheckpointConfig(config?: StepCheckpointConfig): StepCheckpointConfig {
 62  return config ?? {
 63    mode: "capture"
 64  };
 65}
 66
 67export function createRunCheckpointManager(
 68  request: StepExecutionRequest,
 69  checkpointsDir: string
 70): CheckpointManager {
 71  const checkpointConfig = resolveCheckpointConfig(request.checkpoint);
 72
 73  return createCheckpointManager({
 74    taskId: request.taskId,
 75    stepId: request.stepId,
 76    runId: request.runId,
 77    directory: checkpointsDir,
 78    resumeFromSeq: checkpointConfig.resumeFromSeq,
 79    gitDiffBaseRef: checkpointConfig.gitDiffBaseRef,
 80    includeGitDiffStat: checkpointConfig.includeGitDiffStat,
 81    supportedTypes: checkpointConfig.mode === "disabled" ? [] : ["summary", "git_diff", "log_tail"],
 82    note:
 83      checkpointConfig.mode === "disabled"
 84        ? "Checkpointing is disabled for this step request."
 85        : "Checkpoint manager tracks summary/log_tail payloads and reserves git diff snapshot commands."
 86  });
 87}
 88
 89export function createPreparedCheckpointState(
 90  manager: CheckpointManager,
 91  config: StepCheckpointConfig
 92): StepCheckpointState {
 93  return createCheckpointState(manager, config.mode);
 94}
 95
 96export function emitPreparationCheckpoint(
 97  manager: CheckpointManager,
 98  checkpoint: StepCheckpointState,
 99  request: StepExecutionRequest
100): CheckpointRecord | undefined {
101  if (checkpoint.mode === "disabled") {
102    return undefined;
103  }
104
105  const summary = request.checkpoint?.summaryHint ?? `Prepared local run layout for ${request.stepId}.`;
106  const record = manager.createSummaryCheckpoint({
107    summary,
108    detail: `Worker runner reserved ${request.workerKind} checkpoint capture for ${request.stepName}.`
109  });
110
111  updateCheckpointState(checkpoint, manager);
112
113  return record;
114}
115
116export function emitLogTailCheckpoint(
117  manager: CheckpointManager,
118  checkpoint: StepCheckpointState,
119  session: LocalRunLogSession,
120  summary: string,
121  lineLimit: number = DEFAULT_LOG_TAIL_LINES
122): CheckpointRecord | undefined {
123  if (checkpoint.mode === "disabled") {
124    return undefined;
125  }
126
127  const orderedEntries = [
128    ...session.worker.entries.map(mapWorkerEntry),
129    ...session.stdout.entries.map((entry) => mapStreamEntry("stdout", entry)),
130    ...session.stderr.entries.map((entry) => mapStreamEntry("stderr", entry))
131  ].sort((left, right) => left.seq - right.seq);
132
133  const lines = orderedEntries.slice(-lineLimit).map((entry) => entry.rendered);
134  const record = manager.createLogTailCheckpoint({
135    summary,
136    source: "combined",
137    lines,
138    truncated: orderedEntries.length > lines.length
139  });
140
141  updateCheckpointState(checkpoint, manager);
142
143  return record;
144}
145
146export function describeRenderedCheckpoint(record: CheckpointRecord): string {
147  const rendered = renderCheckpointFile(record);
148
149  return `${record.type}:${rendered.storagePath}`;
150}