baa-conductor


baa-conductor / apps / codexd / src
im_wower  ·  2026-03-22

state-store.ts

  1import { randomUUID } from "node:crypto";
  2import { appendFile, mkdir, readFile, writeFile } from "node:fs/promises";
  3
  4import type {
  5  CodexdDaemonIdentity,
  6  CodexdDaemonState,
  7  CodexdEventLevel,
  8  CodexdManagedChildState,
  9  CodexdRecentEvent,
 10  CodexdRecentEventCacheState,
 11  CodexdResolvedConfig,
 12  CodexdRunRecord,
 13  CodexdRunRegistryState,
 14  CodexdSessionRecord,
 15  CodexdSessionRegistryState,
 16  CodexdStatusSnapshot
 17} from "./contracts.js";
 18
 19export interface CodexdStateStoreOptions {
 20  now?: () => string;
 21  onEvent?: (event: CodexdRecentEvent) => void;
 22  processId?: () => number | null;
 23  uuid?: () => string;
 24}
 25
 26export interface CodexdEventInput {
 27  detail?: Record<string, unknown> | null;
 28  level: CodexdEventLevel;
 29  message: string;
 30  type: string;
 31}
 32
 33export class CodexdStateStore {
 34  private daemonState: CodexdDaemonState | null = null;
 35  private identity: CodexdDaemonIdentity | null = null;
 36  private initialized = false;
 37  private mutationQueue: Promise<void> = Promise.resolve();
 38  private nextEventSeq = 1;
 39  private recentEvents: CodexdRecentEventCacheState | null = null;
 40  private runRegistry: CodexdRunRegistryState | null = null;
 41  private sessionRegistry: CodexdSessionRegistryState | null = null;
 42
 43  private readonly now: () => string;
 44  private readonly onEvent: (event: CodexdRecentEvent) => void;
 45  private readonly processId: () => number | null;
 46  private readonly uuid: () => string;
 47
 48  constructor(
 49    private readonly config: CodexdResolvedConfig,
 50    options: CodexdStateStoreOptions = {}
 51  ) {
 52    this.now = options.now ?? defaultNow;
 53    this.onEvent = options.onEvent ?? (() => {});
 54    this.processId = options.processId ?? defaultProcessId;
 55    this.uuid = options.uuid ?? randomUUID;
 56  }
 57
 58  async initialize(): Promise<CodexdStatusSnapshot> {
 59    if (this.initialized) {
 60      return this.getSnapshot();
 61    }
 62
 63    await mkdir(this.config.paths.logsDir, { recursive: true });
 64    await mkdir(this.config.paths.stateDir, { recursive: true });
 65
 66    const identity = await readJsonOrDefault<CodexdDaemonIdentity | null>(
 67      this.config.paths.identityPath,
 68      null
 69    );
 70    const daemonState = await readJsonOrDefault<CodexdDaemonState | null>(
 71      this.config.paths.daemonStatePath,
 72      null
 73    );
 74    const sessionRegistry = await readJsonOrDefault<CodexdSessionRegistryState | null>(
 75      this.config.paths.sessionRegistryPath,
 76      null
 77    );
 78    const runRegistry = await readJsonOrDefault<CodexdRunRegistryState | null>(
 79      this.config.paths.runRegistryPath,
 80      null
 81    );
 82    const recentEvents = await readJsonOrDefault<CodexdRecentEventCacheState | null>(
 83      this.config.paths.recentEventsPath,
 84      null
 85    );
 86
 87    this.identity = identity ?? createDaemonIdentity(this.config, this.uuid(), this.now());
 88    this.daemonState = normalizeDaemonState(daemonState, this.config, this.now());
 89    this.sessionRegistry = normalizeSessionRegistry(sessionRegistry);
 90    this.runRegistry = normalizeRunRegistry(runRegistry);
 91    this.recentEvents = normalizeRecentEvents(recentEvents, this.config.eventCacheSize);
 92    this.nextEventSeq = getNextEventSeq(this.recentEvents.events);
 93    this.initialized = true;
 94
 95    await this.persistIdentity();
 96    await this.persistDaemonState();
 97    await this.persistSessionRegistry();
 98    await this.persistRunRegistry();
 99    await this.persistRecentEvents();
100
101    return this.getSnapshot();
102  }
103
104  getSnapshot(): CodexdStatusSnapshot {
105    this.assertInitialized();
106
107    return {
108      config: cloneJson(this.config),
109      identity: cloneJson(this.identity!),
110      daemon: cloneJson(this.daemonState!),
111      sessionRegistry: cloneJson(this.sessionRegistry!),
112      runRegistry: cloneJson(this.runRegistry!),
113      recentEvents: cloneJson(this.recentEvents!)
114    };
115  }
116
117  getChildState(): CodexdManagedChildState {
118    this.assertInitialized();
119    return cloneJson(this.daemonState!.child);
120  }
121
122  getRun(runId: string): CodexdRunRecord | null {
123    this.assertInitialized();
124    return (
125      cloneJson(
126        this.runRegistry!.runs.find((entry) => entry.runId === runId) ?? null
127      )
128    );
129  }
130
131  getSession(sessionId: string): CodexdSessionRecord | null {
132    this.assertInitialized();
133    return (
134      cloneJson(
135        this.sessionRegistry!.sessions.find((entry) => entry.sessionId === sessionId) ?? null
136      )
137    );
138  }
139
140  listRuns(): CodexdRunRecord[] {
141    this.assertInitialized();
142    return cloneJson(this.runRegistry!.runs);
143  }
144
145  listSessions(): CodexdSessionRecord[] {
146    this.assertInitialized();
147    return cloneJson(this.sessionRegistry!.sessions);
148  }
149
150  async markDaemonStarted(): Promise<CodexdDaemonState> {
151    return await this.enqueueMutation(async () => {
152      this.assertInitialized();
153      const now = this.now();
154
155      this.daemonState = {
156        ...this.daemonState!,
157        started: true,
158        startedAt: now,
159        stoppedAt: null,
160        updatedAt: now,
161        pid: this.processId()
162      };
163
164      await this.persistDaemonState();
165      return cloneJson(this.daemonState!);
166    });
167  }
168
169  async markDaemonStopped(): Promise<CodexdDaemonState> {
170    return await this.enqueueMutation(async () => {
171      this.assertInitialized();
172      const now = this.now();
173
174      this.daemonState = {
175        ...this.daemonState!,
176        started: false,
177        stoppedAt: now,
178        updatedAt: now,
179        pid: null
180      };
181
182      await this.persistDaemonState();
183      return cloneJson(this.daemonState!);
184    });
185  }
186
187  async updateChildState(patch: Partial<CodexdManagedChildState>): Promise<CodexdManagedChildState> {
188    return await this.enqueueMutation(async () => {
189      this.assertInitialized();
190
191      this.daemonState = {
192        ...this.daemonState!,
193        updatedAt: this.now(),
194        child: {
195          ...this.daemonState!.child,
196          ...cloneJson(patch)
197        }
198      };
199
200      await this.persistDaemonState();
201      return cloneJson(this.daemonState!.child);
202    });
203  }
204
205  async upsertSession(record: CodexdSessionRecord): Promise<CodexdSessionRegistryState> {
206    return await this.enqueueMutation(async () => {
207      this.assertInitialized();
208      const sessions = [...this.sessionRegistry!.sessions];
209      const index = sessions.findIndex((entry) => entry.sessionId === record.sessionId);
210
211      if (index >= 0) {
212        sessions[index] = cloneJson(record);
213      } else {
214        sessions.push(cloneJson(record));
215      }
216
217      this.sessionRegistry = {
218        updatedAt: this.now(),
219        sessions
220      };
221
222      await this.persistSessionRegistry();
223      return cloneJson(this.sessionRegistry!);
224    });
225  }
226
227  async closeSession(sessionId: string): Promise<CodexdSessionRecord | null> {
228    return await this.enqueueMutation(async () => {
229      this.assertInitialized();
230      const sessions = [...this.sessionRegistry!.sessions];
231      const index = sessions.findIndex((entry) => entry.sessionId === sessionId);
232
233      if (index < 0) {
234        return null;
235      }
236
237      const existing = sessions[index];
238
239      if (existing == null) {
240        return null;
241      }
242
243      const updated: CodexdSessionRecord = {
244        ...existing,
245        status: "closed",
246        currentTurnId: null,
247        updatedAt: this.now()
248      };
249      sessions[index] = updated;
250      this.sessionRegistry = {
251        updatedAt: updated.updatedAt,
252        sessions
253      };
254
255      await this.persistSessionRegistry();
256      return cloneJson(updated);
257    });
258  }
259
260  async upsertRun(record: CodexdRunRecord): Promise<CodexdRunRegistryState> {
261    return await this.enqueueMutation(async () => {
262      this.assertInitialized();
263      const runs = [...this.runRegistry!.runs];
264      const index = runs.findIndex((entry) => entry.runId === record.runId);
265
266      if (index >= 0) {
267        runs[index] = cloneJson(record);
268      } else {
269        runs.push(cloneJson(record));
270      }
271
272      this.runRegistry = {
273        updatedAt: this.now(),
274        runs
275      };
276
277      await this.persistRunRegistry();
278      return cloneJson(this.runRegistry!);
279    });
280  }
281
282  async recordEvent(input: CodexdEventInput): Promise<CodexdRecentEvent> {
283    const entry = await this.enqueueMutation(async () => {
284      this.assertInitialized();
285      const nextEntry: CodexdRecentEvent = {
286        seq: this.nextEventSeq,
287        createdAt: this.now(),
288        level: input.level,
289        type: input.type,
290        message: input.message,
291        detail: input.detail ?? null
292      };
293
294      this.nextEventSeq += 1;
295      await appendFile(
296        this.config.paths.structuredEventLogPath,
297        `${JSON.stringify(nextEntry)}\n`,
298        "utf8"
299      );
300
301      this.recentEvents = {
302        maxEntries: this.config.eventCacheSize,
303        updatedAt: nextEntry.createdAt,
304        events: [...this.recentEvents!.events, nextEntry].slice(-this.config.eventCacheSize)
305      };
306
307      this.daemonState = {
308        ...this.daemonState!,
309        updatedAt: nextEntry.createdAt
310      };
311
312      await this.persistRecentEvents();
313      await this.persistDaemonState();
314
315      return cloneJson(nextEntry);
316    });
317
318    this.onEvent(entry);
319    return entry;
320  }
321
322  async appendChildOutput(stream: "stderr" | "stdout", text: string): Promise<void> {
323    await this.enqueueMutation(async () => {
324      this.assertInitialized();
325      const path =
326        stream === "stdout" ? this.config.paths.stdoutLogPath : this.config.paths.stderrLogPath;
327
328      await appendFile(path, text, "utf8");
329    });
330  }
331
332  private assertInitialized(): void {
333    if (
334      !this.initialized
335      || this.identity == null
336      || this.daemonState == null
337      || this.sessionRegistry == null
338      || this.runRegistry == null
339      || this.recentEvents == null
340    ) {
341      throw new Error("CodexdStateStore is not initialized.");
342    }
343  }
344
345  private async persistDaemonState(): Promise<void> {
346    this.assertInitialized();
347    await writeJsonFile(this.config.paths.daemonStatePath, this.daemonState!);
348  }
349
350  private async persistIdentity(): Promise<void> {
351    this.assertInitialized();
352    await writeJsonFile(this.config.paths.identityPath, this.identity!);
353  }
354
355  private async persistRecentEvents(): Promise<void> {
356    this.assertInitialized();
357    await writeJsonFile(this.config.paths.recentEventsPath, this.recentEvents!);
358  }
359
360  private async persistRunRegistry(): Promise<void> {
361    this.assertInitialized();
362    await writeJsonFile(this.config.paths.runRegistryPath, this.runRegistry!);
363  }
364
365  private async persistSessionRegistry(): Promise<void> {
366    this.assertInitialized();
367    await writeJsonFile(this.config.paths.sessionRegistryPath, this.sessionRegistry!);
368  }
369
370  private async enqueueMutation<T>(action: () => Promise<T>): Promise<T> {
371    const task = this.mutationQueue.then(action);
372    this.mutationQueue = task.then(
373      () => undefined,
374      () => undefined
375    );
376    return await task;
377  }
378}
379
380function createDaemonIdentity(
381  config: CodexdResolvedConfig,
382  daemonId: string,
383  createdAt: string
384): CodexdDaemonIdentity {
385  return {
386    daemonId,
387    nodeId: config.nodeId,
388    repoRoot: config.paths.repoRoot,
389    createdAt,
390    version: config.version
391  };
392}
393
394function createInitialChildState(config: CodexdResolvedConfig): CodexdManagedChildState {
395  return {
396    strategy: config.server.childStrategy,
397    mode: config.server.mode,
398    endpoint: config.server.endpoint,
399    status: "idle",
400    command: config.server.childCommand,
401    args: [...config.server.childArgs],
402    cwd: config.server.childCwd,
403    pid: null,
404    startedAt: null,
405    exitedAt: null,
406    exitCode: null,
407    signal: null,
408    lastError: null
409  };
410}
411
412function normalizeDaemonState(
413  value: CodexdDaemonState | null,
414  config: CodexdResolvedConfig,
415  now: string
416): CodexdDaemonState {
417  if (value == null) {
418    return {
419      started: false,
420      startedAt: null,
421      stoppedAt: null,
422      updatedAt: now,
423      pid: null,
424      child: createInitialChildState(config)
425    };
426  }
427
428  return {
429    ...value,
430    child: {
431      ...value.child,
432      strategy: config.server.childStrategy,
433      mode: config.server.mode,
434      endpoint: config.server.endpoint,
435      command: config.server.childCommand,
436      args: [...config.server.childArgs],
437      cwd: config.server.childCwd
438    }
439  };
440}
441
442function normalizeRecentEvents(
443  value: CodexdRecentEventCacheState | null,
444  maxEntries: number
445): CodexdRecentEventCacheState {
446  if (value == null) {
447    return {
448      maxEntries,
449      updatedAt: null,
450      events: []
451    };
452  }
453
454  return {
455    maxEntries,
456    updatedAt: value.updatedAt,
457    events: [...value.events].slice(-maxEntries)
458  };
459}
460
461function normalizeSessionRegistry(
462  value: CodexdSessionRegistryState | null
463): CodexdSessionRegistryState {
464  if (value == null) {
465    return {
466      updatedAt: null,
467      sessions: []
468    };
469  }
470
471  return {
472    updatedAt: value.updatedAt,
473    sessions: [...value.sessions]
474  };
475}
476
477function normalizeRunRegistry(
478  value: CodexdRunRegistryState | null
479): CodexdRunRegistryState {
480  if (value == null) {
481    return {
482      updatedAt: null,
483      runs: []
484    };
485  }
486
487  return {
488    updatedAt: value.updatedAt,
489    runs: [...value.runs]
490  };
491}
492
493async function readJsonOrDefault<T>(path: string, fallback: T): Promise<T> {
494  try {
495    const source = await readFile(path, "utf8");
496    return JSON.parse(source) as T;
497  } catch (error) {
498    if (isMissingFileError(error)) {
499      return fallback;
500    }
501
502    throw error;
503  }
504}
505
506function writeJsonFile(path: string, value: unknown): Promise<void> {
507  return writeFile(path, `${JSON.stringify(value, null, 2)}\n`, "utf8");
508}
509
510function cloneJson<T>(value: T): T {
511  return JSON.parse(JSON.stringify(value)) as T;
512}
513
514function getNextEventSeq(events: readonly CodexdRecentEvent[]): number {
515  const last = events[events.length - 1];
516  return last == null ? 1 : last.seq + 1;
517}
518
519function defaultNow(): string {
520  return new Date().toISOString();
521}
522
523function defaultProcessId(): number | null {
524  return typeof process !== "undefined" ? process.pid ?? null : null;
525}
526
527function isMissingFileError(error: unknown): error is Error & { code: string } {
528  return typeof error === "object" && error !== null && "code" in error && (error as { code: unknown }).code === "ENOENT";
529}