baa-conductor


baa-conductor / tests / codexd
im_wower  ·  2026-03-23

codexd-e2e-smoke.test.mjs

  1import assert from "node:assert/strict";
  2import { mkdirSync, mkdtempSync, readFileSync, rmSync } from "node:fs";
  3import { tmpdir } from "node:os";
  4import { join } from "node:path";
  5import test from "node:test";
  6
  7import {
  8  CodexdLocalService,
  9  resolveCodexdConfig,
 10  runCodexdCli
 11} from "../../apps/codexd/dist/index.js";
 12import { ConductorRuntime } from "../../apps/conductor-daemon/dist/index.js";
 13
 14class FakeEventStream {
 15  constructor() {
 16    this.listeners = new Set();
 17  }
 18
 19  emit(event) {
 20    for (const listener of this.listeners) {
 21      listener(event);
 22    }
 23  }
 24
 25  subscribe(listener) {
 26    this.listeners.add(listener);
 27
 28    return {
 29      unsubscribe: () => {
 30        this.listeners.delete(listener);
 31      }
 32    };
 33  }
 34}
 35
 36class FakeStream {
 37  constructor() {
 38    this.listeners = [];
 39  }
 40
 41  on(event, listener) {
 42    if (event === "data") {
 43      this.listeners.push(listener);
 44    }
 45
 46    return this;
 47  }
 48
 49  emit(chunk) {
 50    for (const listener of this.listeners) {
 51      listener(chunk);
 52    }
 53  }
 54}
 55
 56class FakeChild {
 57  constructor() {
 58    this.pid = 43210;
 59    this.stdin = {
 60      end() {},
 61      write() {
 62        return true;
 63      }
 64    };
 65    this.stdout = new FakeStream();
 66    this.stderr = new FakeStream();
 67    this.listeners = new Map();
 68    this.onceListeners = new Map();
 69  }
 70
 71  on(event, listener) {
 72    this.listeners.set(event, [...(this.listeners.get(event) ?? []), listener]);
 73    return this;
 74  }
 75
 76  once(event, listener) {
 77    this.onceListeners.set(event, [...(this.onceListeners.get(event) ?? []), listener]);
 78    return this;
 79  }
 80
 81  kill(signal = "SIGTERM") {
 82    this.emit("exit", 0, signal);
 83    return true;
 84  }
 85
 86  emit(event, ...args) {
 87    for (const listener of this.listeners.get(event) ?? []) {
 88      listener(...args);
 89    }
 90
 91    for (const listener of this.onceListeners.get(event) ?? []) {
 92      listener(...args);
 93    }
 94
 95    this.onceListeners.delete(event);
 96  }
 97}
 98
 99class FakeAppServerAdapter {
100  constructor(defaultCwd) {
101    this.defaultCwd = defaultCwd;
102    this.events = new FakeEventStream();
103    this.nextThreadId = 1;
104    this.nextTurnId = 1;
105    this.sessions = new Map();
106  }
107
108  async close() {}
109
110  async initialize() {
111    return {
112      platformFamily: "unix",
113      platformOs: "macos",
114      userAgent: "codex-cli fake-app-server"
115    };
116  }
117
118  async threadResume(params) {
119    const session = this.sessions.get(params.threadId);
120
121    if (session == null) {
122      throw new Error(`unknown thread ${params.threadId}`);
123    }
124
125    return session;
126  }
127
128  async threadStart(params = {}) {
129    const threadId = `thread-${this.nextThreadId}`;
130    this.nextThreadId += 1;
131    const session = {
132      thread: {
133        cliVersion: "test",
134        createdAt: Date.now(),
135        cwd: params.cwd ?? this.defaultCwd,
136        ephemeral: params.ephemeral ?? true,
137        id: threadId,
138        modelProvider: params.modelProvider ?? "openai",
139        name: null,
140        preview: "fake codexd smoke session",
141        source: {
142          custom: "codexd-e2e-smoke"
143        },
144        status: {
145          type: "idle"
146        },
147        turns: [],
148        updatedAt: Date.now()
149      },
150      approvalPolicy: params.approvalPolicy ?? "never",
151      cwd: params.cwd ?? this.defaultCwd,
152      model: params.model ?? "gpt-5.4",
153      modelProvider: params.modelProvider ?? "openai",
154      reasoningEffort: "medium",
155      sandbox: {
156        type: "dangerFullAccess"
157      },
158      serviceTier: params.serviceTier ?? null
159    };
160
161    this.sessions.set(threadId, session);
162    this.events.emit({
163      notificationMethod: "thread/started",
164      thread: session.thread,
165      type: "thread.started"
166    });
167    return session;
168  }
169
170  async turnInterrupt() {}
171
172  async turnStart(params) {
173    const session = this.sessions.get(params.threadId);
174
175    if (session == null) {
176      throw new Error(`unknown thread ${params.threadId}`);
177    }
178
179    const turnId = `turn-${this.nextTurnId}`;
180    this.nextTurnId += 1;
181    const turn = {
182      error: null,
183      id: turnId,
184      status: "inProgress"
185    };
186
187    session.thread.turns = [...(session.thread.turns ?? []), turn];
188
189    queueMicrotask(() => {
190      this.events.emit({
191        notificationMethod: "turn/started",
192        threadId: params.threadId,
193        turn,
194        type: "turn.started"
195      });
196      this.events.emit({
197        delta: "hello from fake adapter",
198        itemId: "item-1",
199        notificationMethod: "item/agentMessage/delta",
200        threadId: params.threadId,
201        turnId,
202        type: "turn.message.delta"
203      });
204
205      const completedTurn = {
206        ...turn,
207        status: "completed"
208      };
209
210      session.thread.turns = session.thread.turns.map((entry) =>
211        entry.id === completedTurn.id ? completedTurn : entry
212      );
213      this.events.emit({
214        notificationMethod: "turn/completed",
215        threadId: params.threadId,
216        turn: completedTurn,
217        type: "turn.completed"
218      });
219    });
220
221    return {
222      turn
223    };
224  }
225
226  async turnSteer(params) {
227    return {
228      turnId: params.expectedTurnId
229    };
230  }
231}
232
233function createTextCollector() {
234  let text = "";
235
236  return {
237    read() {
238      return text;
239    },
240    writer: {
241      write(chunk) {
242        text += String(chunk);
243      }
244    }
245  };
246}
247
248async function fetchJson(url, init) {
249  const response = await fetch(url, init);
250  const text = await response.text();
251
252  return {
253    payload: text === "" ? null : JSON.parse(text),
254    response,
255    text
256  };
257}
258
259function readJsonFile(path) {
260  return JSON.parse(readFileSync(path, "utf8"));
261}
262
263async function sleep(ms) {
264  await new Promise((resolve) => {
265    setTimeout(resolve, ms);
266  });
267}
268
269async function waitForValue(readValue, description, timeoutMs = 5_000) {
270  const deadline = Date.now() + timeoutMs;
271  let lastError = null;
272
273  while (Date.now() < deadline) {
274    try {
275      const value = await readValue();
276
277      if (value != null) {
278        return value;
279      }
280    } catch (error) {
281      lastError = error;
282    }
283
284    await sleep(25);
285  }
286
287  if (lastError instanceof Error) {
288    throw lastError;
289  }
290
291  throw new Error(`Timed out waiting for ${description}.`);
292}
293
294async function readCodexdCliStatus(config, env) {
295  const stdout = createTextCollector();
296  const stderr = createTextCollector();
297  const argv = [
298    "node",
299    "codexd",
300    "status",
301    "--json",
302    "--repo-root",
303    config.paths.repoRoot,
304    "--logs-dir",
305    config.paths.logsRootDir,
306    "--state-dir",
307    config.paths.stateRootDir,
308    "--local-api-base",
309    config.service.localApiBase,
310    "--event-stream-path",
311    config.service.eventStreamPath,
312    "--server-endpoint",
313    config.server.endpoint,
314    "--server-strategy",
315    config.server.childStrategy,
316    "--server-command",
317    config.server.childCommand,
318    "--server-cwd",
319    config.server.childCwd
320  ];
321
322  for (const arg of config.server.childArgs) {
323    argv.push("--server-arg", arg);
324  }
325
326  const exitCode = await runCodexdCli({
327    argv,
328    env,
329    processLike: {
330      argv,
331      env,
332      pid: 424242
333    },
334    stderr: stderr.writer,
335    stdout: stdout.writer
336  });
337
338  assert.equal(exitCode, 0, stderr.read());
339  return JSON.parse(stdout.read());
340}
341
342test("codexd e2e smoke covers status, conductor proxy, session and turn flow, and persistence", async () => {
343  const workspace = mkdtempSync(join(tmpdir(), "baa-codexd-e2e-smoke-"));
344  const repoRoot = join(workspace, "repo");
345  const logsRoot = join(workspace, "logs");
346  const stateRoot = join(workspace, "state");
347  const conductorStateDir = join(workspace, "conductor-state");
348  const runsDir = join(workspace, "runs");
349  mkdirSync(repoRoot, {
350    recursive: true
351  });
352  mkdirSync(logsRoot, {
353    recursive: true
354  });
355  mkdirSync(stateRoot, {
356    recursive: true
357  });
358  mkdirSync(conductorStateDir, {
359    recursive: true
360  });
361  mkdirSync(runsDir, {
362    recursive: true
363  });
364
365  const fakeChild = new FakeChild();
366  const fakeAdapter = new FakeAppServerAdapter(repoRoot);
367  const codexdConfig = resolveCodexdConfig({
368    localApiBase: "http://127.0.0.1:0",
369    logsDir: logsRoot,
370    repoRoot,
371    stateDir: stateRoot,
372    version: "e2e-smoke"
373  });
374  const codexd = new CodexdLocalService(codexdConfig, {
375    appServerClientFactory: {
376      async create(context) {
377        assert.equal(context.config.server.mode, "app-server");
378        return fakeAdapter;
379      }
380    },
381    env: {
382      HOME: workspace
383    },
384    spawner: {
385      spawn(command, args, options) {
386        assert.equal(command, "codex");
387        assert.deepEqual(args, ["app-server"]);
388        assert.equal(options.cwd, repoRoot);
389
390        queueMicrotask(() => {
391          fakeChild.emit("spawn");
392          fakeChild.stdout.emit("codexd fake child ready\n");
393          fakeChild.stderr.emit("codexd fake child stderr\n");
394        });
395
396        return fakeChild;
397      }
398    }
399  });
400  let conductor = null;
401  let codexdStarted = false;
402  let conductorStarted = false;
403
404  try {
405    const codexdStatus = await codexd.start();
406    codexdStarted = true;
407    const codexdBaseUrl = codexdStatus.service.resolvedBaseUrl;
408
409    assert.ok(codexdBaseUrl);
410    assert.equal(codexdStatus.snapshot.daemon.started, true);
411    assert.equal(codexdStatus.snapshot.daemon.child.status, "running");
412
413    conductor = new ConductorRuntime(
414      {
415        codexdLocalApiBase: codexdBaseUrl,
416        controlApiBase: "https://conductor.makefile.so",
417        host: "mini",
418        localApiBase: "http://127.0.0.1:0",
419        nodeId: "mini-main",
420        paths: {
421          runsDir,
422          stateDir: conductorStateDir
423        },
424        role: "primary",
425        sharedToken: "replace-me"
426      },
427      {
428        autoStartLoops: false,
429        now: () => 100
430      }
431    );
432
433    const conductorSnapshot = await conductor.start();
434    conductorStarted = true;
435    const conductorBaseUrl = conductorSnapshot.controlApi.localApiBase;
436
437    assert.ok(conductorBaseUrl);
438
439    const directCodexdStatus = await fetchJson(`${codexdBaseUrl}/v1/codexd/status`);
440    assert.equal(directCodexdStatus.response.status, 200);
441    assert.equal(directCodexdStatus.payload.ok, true);
442    assert.equal(directCodexdStatus.payload.data.service.resolvedBaseUrl, codexdBaseUrl);
443    assert.equal(directCodexdStatus.payload.data.snapshot.daemon.child.status, "running");
444
445    const initialProxyStatus = await fetchJson(`${conductorBaseUrl}/v1/codex`);
446    assert.equal(initialProxyStatus.response.status, 200);
447    assert.equal(initialProxyStatus.payload.ok, true);
448    assert.equal(initialProxyStatus.payload.data.backend, "independent_codexd");
449    assert.equal(initialProxyStatus.payload.data.proxy.target_base_url, codexdBaseUrl);
450    assert.equal(initialProxyStatus.payload.data.sessions.count, 0);
451    assert.doesNotMatch(JSON.stringify(initialProxyStatus.payload.data.routes), /\/v1\/codex\/runs/u);
452
453    const sessionCreateResponse = await fetchJson(`${conductorBaseUrl}/v1/codex/sessions`, {
454      body: JSON.stringify({
455        cwd: repoRoot,
456        model: "gpt-5.4",
457        purpose: "duplex"
458      }),
459      headers: {
460        "content-type": "application/json"
461      },
462      method: "POST"
463    });
464    assert.equal(sessionCreateResponse.response.status, 201);
465    assert.equal(sessionCreateResponse.payload.ok, true);
466    const session = sessionCreateResponse.payload.data.session;
467    const sessionId = session.sessionId;
468
469    assert.match(sessionId, /^session-/u);
470    assert.equal(session.purpose, "duplex");
471    assert.equal(session.status, "active");
472    assert.equal(session.cwd, repoRoot);
473
474    const sessionsListResponse = await fetchJson(`${conductorBaseUrl}/v1/codex/sessions`);
475    assert.equal(sessionsListResponse.response.status, 200);
476    assert.equal(sessionsListResponse.payload.data.sessions.length, 1);
477    assert.equal(sessionsListResponse.payload.data.sessions[0].sessionId, sessionId);
478
479    const sessionReadResponse = await fetchJson(`${conductorBaseUrl}/v1/codex/sessions/${sessionId}`);
480    assert.equal(sessionReadResponse.response.status, 200);
481    assert.equal(sessionReadResponse.payload.ok, true);
482    assert.equal(sessionReadResponse.payload.data.session.sessionId, sessionId);
483    assert.equal(sessionReadResponse.payload.data.session.lastTurnId, null);
484
485    const turnCreateResponse = await fetchJson(`${conductorBaseUrl}/v1/codex/turn`, {
486      body: JSON.stringify({
487        input: "Summarize pending work.",
488        sessionId
489      }),
490      headers: {
491        "content-type": "application/json"
492      },
493      method: "POST"
494    });
495    assert.equal(turnCreateResponse.response.status, 202);
496    assert.equal(turnCreateResponse.payload.ok, true);
497    assert.equal(turnCreateResponse.payload.data.accepted, true);
498    const turnId = turnCreateResponse.payload.data.turnId;
499
500    assert.equal(turnId, "turn-1");
501
502    const completedSessionRead = await waitForValue(async () => {
503      const response = await fetchJson(`${conductorBaseUrl}/v1/codex/sessions/${sessionId}`);
504
505      if (response.response.status !== 200) {
506        throw new Error(`Unexpected session read status ${response.response.status}.`);
507      }
508
509      const sessionPayload = response.payload.data.session;
510      const eventTypes = response.payload.data.recentEvents.map((event) => event.type);
511
512      if (
513        sessionPayload.lastTurnId === turnId
514        && sessionPayload.lastTurnStatus === "completed"
515        && eventTypes.includes("turn.accepted")
516        && eventTypes.includes("app-server.turn.completed")
517      ) {
518        return response.payload;
519      }
520
521      return null;
522    }, "turn completion in conductor session read");
523
524    const proxyStatusAfterTurn = await fetchJson(`${conductorBaseUrl}/v1/codex`);
525    assert.equal(proxyStatusAfterTurn.response.status, 200);
526    assert.equal(proxyStatusAfterTurn.payload.data.sessions.count, 1);
527    assert.equal(proxyStatusAfterTurn.payload.data.sessions.active_count, 1);
528    assert.ok(proxyStatusAfterTurn.payload.data.recent_events.count >= 1);
529
530    const cliStatus = await readCodexdCliStatus(codexdConfig, {
531      BAA_NODE_ID: codexdConfig.nodeId,
532      HOME: workspace
533    });
534    assert.equal(cliStatus.daemon.started, true);
535    assert.equal(cliStatus.daemon.child.status, "running");
536    assert.equal(cliStatus.sessionRegistry.sessions.length, 1);
537    assert.equal(cliStatus.sessionRegistry.sessions[0].sessionId, sessionId);
538    assert.equal(cliStatus.sessionRegistry.sessions[0].lastTurnId, turnId);
539    assert.equal(cliStatus.sessionRegistry.sessions[0].lastTurnStatus, "completed");
540    assert.equal(cliStatus.runRegistry.runs.length, 0);
541
542    const daemonState = readJsonFile(codexdConfig.paths.daemonStatePath);
543    assert.equal(daemonState.started, true);
544    assert.equal(daemonState.child.status, "running");
545    assert.equal(daemonState.child.pid, 43210);
546
547    const sessionRegistry = readJsonFile(codexdConfig.paths.sessionRegistryPath);
548    assert.equal(sessionRegistry.sessions.length, 1);
549    assert.equal(sessionRegistry.sessions[0].sessionId, sessionId);
550    assert.equal(sessionRegistry.sessions[0].lastTurnId, turnId);
551    assert.equal(sessionRegistry.sessions[0].lastTurnStatus, "completed");
552
553    const recentEvents = readJsonFile(codexdConfig.paths.recentEventsPath);
554    assert.ok(recentEvents.events.some((event) => event.type === "session.created"));
555    assert.ok(recentEvents.events.some((event) => event.type === "turn.accepted"));
556    assert.ok(recentEvents.events.some((event) => event.type === "app-server.turn.completed"));
557
558    const identity = readJsonFile(codexdConfig.paths.identityPath);
559    assert.equal(identity.nodeId, "mini-main");
560    assert.equal(identity.repoRoot, repoRoot);
561
562    assert.match(
563      readFileSync(codexdConfig.paths.structuredEventLogPath, "utf8"),
564      /app-server\.turn\.completed/u
565    );
566    assert.match(
567      readFileSync(codexdConfig.paths.stdoutLogPath, "utf8"),
568      /codexd fake child ready/u
569    );
570    assert.match(
571      readFileSync(codexdConfig.paths.stderrLogPath, "utf8"),
572      /codexd fake child stderr/u
573    );
574
575    assert.equal(completedSessionRead.data.session.sessionId, sessionId);
576    assert.equal(completedSessionRead.data.session.lastTurnId, turnId);
577    assert.equal(completedSessionRead.data.session.lastTurnStatus, "completed");
578  } finally {
579    if (conductorStarted && conductor != null) {
580      await conductor.stop();
581    }
582
583    if (codexdStarted) {
584      await codexd.stop();
585    }
586
587    rmSync(workspace, {
588      force: true,
589      recursive: true
590    });
591  }
592});