im_wower
·
2026-03-21
checkpoints.ts
1import {
2 createCheckpointManager,
3 renderCheckpointFile,
4 type CheckpointManager,
5 type CheckpointRecord
6} from "@baa-conductor/checkpointing";
7import type {
8 LocalRunLogSession,
9 StreamChunkLogEntry,
10 WorkerLifecycleLogEntry
11} from "@baa-conductor/logging";
12import type { StepCheckpointConfig, StepCheckpointState, StepExecutionRequest } from "./contracts";
13
14const DEFAULT_LOG_TAIL_LINES = 20;
15
16type OrderedLogEntry =
17 | {
18 seq: number;
19 rendered: string;
20 }
21 | {
22 seq: number;
23 rendered: string;
24 };
25
26function mapWorkerEntry(entry: WorkerLifecycleLogEntry): OrderedLogEntry {
27 return {
28 seq: entry.seq,
29 rendered: `[worker] ${entry.renderedLine}`
30 };
31}
32
33function mapStreamEntry(channel: "stdout" | "stderr", entry: StreamChunkLogEntry): OrderedLogEntry {
34 return {
35 seq: entry.seq,
36 rendered: `[${channel}] ${entry.text}`
37 };
38}
39
40function createCheckpointState(manager: CheckpointManager, mode: StepCheckpointConfig["mode"]): StepCheckpointState {
41 const { state } = manager;
42 const latest = state.records.at(-1);
43
44 return {
45 ...state,
46 mode,
47 lastCheckpointSeq: state.lastSeq,
48 nextCheckpointSeq: state.nextSeq,
49 latest
50 };
51}
52
53function updateCheckpointState(runState: StepCheckpointState, manager: CheckpointManager): StepCheckpointState {
54 const nextState = createCheckpointState(manager, runState.mode);
55
56 Object.assign(runState, nextState);
57
58 return runState;
59}
60
61export function resolveCheckpointConfig(config?: StepCheckpointConfig): StepCheckpointConfig {
62 return config ?? {
63 mode: "capture"
64 };
65}
66
67export function createRunCheckpointManager(
68 request: StepExecutionRequest,
69 checkpointsDir: string
70): CheckpointManager {
71 const checkpointConfig = resolveCheckpointConfig(request.checkpoint);
72
73 return createCheckpointManager({
74 taskId: request.taskId,
75 stepId: request.stepId,
76 runId: request.runId,
77 directory: checkpointsDir,
78 resumeFromSeq: checkpointConfig.resumeFromSeq,
79 gitDiffBaseRef: checkpointConfig.gitDiffBaseRef,
80 includeGitDiffStat: checkpointConfig.includeGitDiffStat,
81 supportedTypes: checkpointConfig.mode === "disabled" ? [] : ["summary", "git_diff", "log_tail"],
82 note:
83 checkpointConfig.mode === "disabled"
84 ? "Checkpointing is disabled for this step request."
85 : "Checkpoint manager tracks summary/log_tail payloads and reserves git diff snapshot commands."
86 });
87}
88
89export function createPreparedCheckpointState(
90 manager: CheckpointManager,
91 config: StepCheckpointConfig
92): StepCheckpointState {
93 return createCheckpointState(manager, config.mode);
94}
95
96export function emitPreparationCheckpoint(
97 manager: CheckpointManager,
98 checkpoint: StepCheckpointState,
99 request: StepExecutionRequest
100): CheckpointRecord | undefined {
101 if (checkpoint.mode === "disabled") {
102 return undefined;
103 }
104
105 const summary = request.checkpoint?.summaryHint ?? `Prepared local run layout for ${request.stepId}.`;
106 const record = manager.createSummaryCheckpoint({
107 summary,
108 detail: `Worker runner reserved ${request.workerKind} checkpoint capture for ${request.stepName}.`
109 });
110
111 updateCheckpointState(checkpoint, manager);
112
113 return record;
114}
115
116export function emitLogTailCheckpoint(
117 manager: CheckpointManager,
118 checkpoint: StepCheckpointState,
119 session: LocalRunLogSession,
120 summary: string,
121 lineLimit: number = DEFAULT_LOG_TAIL_LINES
122): CheckpointRecord | undefined {
123 if (checkpoint.mode === "disabled") {
124 return undefined;
125 }
126
127 const orderedEntries = [
128 ...session.worker.entries.map(mapWorkerEntry),
129 ...session.stdout.entries.map((entry) => mapStreamEntry("stdout", entry)),
130 ...session.stderr.entries.map((entry) => mapStreamEntry("stderr", entry))
131 ].sort((left, right) => left.seq - right.seq);
132
133 const lines = orderedEntries.slice(-lineLimit).map((entry) => entry.rendered);
134 const record = manager.createLogTailCheckpoint({
135 summary,
136 source: "combined",
137 lines,
138 truncated: orderedEntries.length > lines.length
139 });
140
141 updateCheckpointState(checkpoint, manager);
142
143 return record;
144}
145
146export function describeRenderedCheckpoint(record: CheckpointRecord): string {
147 const rendered = renderCheckpointFile(record);
148
149 return `${record.type}:${rendered.storagePath}`;
150}