baa-conductor


baa-conductor / apps / codexd / src
im_wower  ·  2026-03-25

index.test.js

   1import assert from "node:assert/strict";
   2import { mkdtempSync, readFileSync } from "node:fs";
   3import { tmpdir } from "node:os";
   4import { join } from "node:path";
   5import test from "node:test";
   6
   7import {
   8  CodexdDaemon,
   9  CodexdLocalService,
  10  resolveCodexdConfig
  11} from "../dist/index.js";
  12
  13class FakeEventStream {
  14  constructor() {
  15    this.listeners = new Set();
  16  }
  17
  18  emit(event) {
  19    for (const listener of this.listeners) {
  20      listener(event);
  21    }
  22  }
  23
  24  subscribe(listener) {
  25    this.listeners.add(listener);
  26
  27    return {
  28      unsubscribe: () => {
  29        this.listeners.delete(listener);
  30      }
  31    };
  32  }
  33}
  34
  35class FakeStream {
  36  constructor() {
  37    this.listeners = new Map();
  38  }
  39
  40  on(event, listener) {
  41    this.listeners.set(event, [...(this.listeners.get(event) ?? []), listener]);
  42    return this;
  43  }
  44
  45  setEncoding() {
  46    return this;
  47  }
  48
  49  emit(event, ...args) {
  50    for (const listener of this.listeners.get(event) ?? []) {
  51      listener(...args);
  52    }
  53  }
  54}
  55
  56class FakeChild {
  57  constructor() {
  58    this.pid = 4242;
  59    this.stdin = {
  60      end() {},
  61      write() {
  62        return true;
  63      }
  64    };
  65    this.stdout = new FakeStream();
  66    this.stderr = new FakeStream();
  67    this.listeners = new Map();
  68    this.onceListeners = new Map();
  69  }
  70
  71  on(event, listener) {
  72    this.listeners.set(event, [...(this.listeners.get(event) ?? []), listener]);
  73    return this;
  74  }
  75
  76  once(event, listener) {
  77    this.onceListeners.set(event, [...(this.onceListeners.get(event) ?? []), listener]);
  78    return this;
  79  }
  80
  81  kill(signal = "SIGTERM") {
  82    this.emit("exit", 0, signal);
  83    return true;
  84  }
  85
  86  emit(event, ...args) {
  87    for (const listener of this.listeners.get(event) ?? []) {
  88      listener(...args);
  89    }
  90
  91    for (const listener of this.onceListeners.get(event) ?? []) {
  92      listener(...args);
  93    }
  94
  95    this.onceListeners.delete(event);
  96  }
  97}
  98
  99class FakeRpcAppServerChild extends FakeChild {
 100  constructor() {
 101    super();
 102    this.nextThreadId = 1;
 103    this.nextTurnId = 1;
 104    this.requests = "";
 105    this.threads = new Map();
 106    this.stdin = {
 107      end: () => {
 108        this.stdout.emit("end");
 109      },
 110      write: (chunk) => {
 111        this.requests += typeof chunk === "string" ? chunk : new TextDecoder().decode(chunk);
 112
 113        while (true) {
 114          const newlineIndex = this.requests.indexOf("\n");
 115
 116          if (newlineIndex < 0) {
 117            break;
 118          }
 119
 120          const line = this.requests.slice(0, newlineIndex).trim();
 121          this.requests = this.requests.slice(newlineIndex + 1);
 122
 123          if (line !== "") {
 124            this.handleRequest(JSON.parse(line));
 125          }
 126        }
 127
 128        return true;
 129      }
 130    };
 131  }
 132
 133  emitRpcMessage(payload, { trailingNewline = true } = {}) {
 134    this.stdout.emit("data", `${JSON.stringify(payload)}${trailingNewline ? "\n" : ""}`);
 135  }
 136
 137  finishRetryingTurn(session, turnId, completedTurn) {
 138    session.thread.turns = session.thread.turns.map((entry) =>
 139      entry.id === turnId ? completedTurn : entry
 140    );
 141    this.emitRpcMessage(
 142      {
 143        method: "turn/completed",
 144        params: {
 145          threadId: session.thread.id,
 146          turn: completedTurn
 147        }
 148      },
 149      {
 150        trailingNewline: false
 151      }
 152    );
 153    this.stdout.emit("end");
 154  }
 155
 156  handleRequest(request) {
 157    switch (request.method) {
 158      case "initialize":
 159        queueMicrotask(() => {
 160          this.emitRpcMessage({
 161            id: request.id,
 162            result: {
 163              platformFamily: "unix",
 164              platformOs: "macos",
 165              userAgent: "codex-cli fake-rpc"
 166            }
 167          });
 168        });
 169        break;
 170
 171      case "thread/start":
 172        queueMicrotask(() => {
 173          const threadId = `thread-${this.nextThreadId}`;
 174          this.nextThreadId += 1;
 175          const thread = {
 176            cliVersion: "test",
 177            createdAt: Date.now(),
 178            cwd: request.params?.cwd ?? "/tmp/fake-rpc",
 179            ephemeral: true,
 180            id: threadId,
 181            modelProvider: "openai",
 182            name: null,
 183            preview: "fake rpc session",
 184            source: { custom: "codexd-rpc-test" },
 185            status: { type: "idle" },
 186            turns: [],
 187            updatedAt: Date.now()
 188          };
 189          const session = {
 190            approvalPolicy: "never",
 191            cwd: thread.cwd,
 192            model: "gpt-5.4",
 193            modelProvider: "openai",
 194            reasoningEffort: "medium",
 195            sandbox: { type: "dangerFullAccess" },
 196            serviceTier: null,
 197            thread
 198          };
 199
 200          this.threads.set(threadId, session);
 201          this.emitRpcMessage({
 202            method: "thread/started",
 203            params: {
 204              thread
 205            }
 206          });
 207          this.emitRpcMessage({
 208            id: request.id,
 209            result: session
 210          });
 211        });
 212        break;
 213
 214      case "turn/start":
 215        queueMicrotask(() => {
 216          const session = this.threads.get(request.params.threadId);
 217
 218          if (session == null) {
 219            this.emitRpcMessage({
 220              id: request.id,
 221              error: {
 222                code: -32000,
 223                message: `unknown thread ${request.params.threadId}`
 224              }
 225            });
 226            return;
 227          }
 228
 229          const turnNumber = this.nextTurnId;
 230          const turnId = `turn-${turnNumber}`;
 231          this.nextTurnId += 1;
 232          const turn = {
 233            error: null,
 234            id: turnId,
 235            status: "inProgress"
 236          };
 237          const completedTurn = {
 238            ...turn,
 239            status: "completed"
 240          };
 241
 242          session.thread.turns = [...session.thread.turns, turn];
 243          this.emitRpcMessage({
 244            method: "turn/started",
 245            params: {
 246              threadId: session.thread.id,
 247              turn
 248            }
 249          });
 250          this.emitRpcMessage({
 251            id: request.id,
 252            result: {
 253              turn
 254            }
 255          });
 256          this.emitRpcMessage({
 257            method: "item/agentMessage/delta",
 258            params: {
 259              delta: `reply ${turnId}`,
 260              itemId: `item-${turnId}`,
 261              threadId: session.thread.id,
 262              turnId
 263            }
 264          });
 265
 266          if (turnNumber === 2) {
 267            this.emitRpcMessage({
 268              method: "error",
 269              params: {
 270                error: {
 271                  additionalDetails: "timeout waiting for child process to exit",
 272                  codexErrorInfo: {
 273                    responseStreamDisconnected: {
 274                      httpStatusCode: null
 275                    }
 276                  },
 277                  message: "Reconnecting... 2/5"
 278                },
 279                threadId: session.thread.id,
 280                turnId,
 281                willRetry: true
 282              }
 283            });
 284            setTimeout(() => {
 285              this.finishRetryingTurn(session, turnId, completedTurn);
 286            }, 25);
 287            return;
 288          }
 289
 290          session.thread.turns = session.thread.turns.map((entry) =>
 291            entry.id === turnId ? completedTurn : entry
 292          );
 293          this.emitRpcMessage({
 294            method: "turn/completed",
 295            params: {
 296              threadId: session.thread.id,
 297              turn: completedTurn
 298            }
 299          });
 300        });
 301        break;
 302
 303      default:
 304        queueMicrotask(() => {
 305          this.emitRpcMessage({
 306            id: request.id,
 307            error: {
 308              code: -32601,
 309              message: `unexpected method ${request.method}`
 310            }
 311          });
 312        });
 313        break;
 314    }
 315  }
 316}
 317
 318class FakeRpcAppServerDisconnectBeforeCompletedChild extends FakeRpcAppServerChild {
 319  finishRetryingTurn() {
 320    this.stdout.emit("end");
 321  }
 322}
 323
 324class FakeAppServerAdapter {
 325  constructor(defaultCwd, options = {}) {
 326    this.defaultCwd = defaultCwd;
 327    this.events = new FakeEventStream();
 328    this.nextThreadId = 1;
 329    this.nextTurnId = 1;
 330    this.retryingTurnCompletionDelayMs = options.retryingTurnCompletionDelayMs ?? 0;
 331    this.retryingTurnNumbers = new Set(options.retryingTurnNumbers ?? []);
 332    this.sessions = new Map();
 333  }
 334
 335  async close() {}
 336
 337  async initialize() {
 338    return {
 339      platformFamily: "unix",
 340      platformOs: "macos",
 341      userAgent: "codex-cli test"
 342    };
 343  }
 344
 345  async threadResume(params) {
 346    const session = this.sessions.get(params.threadId);
 347
 348    if (session == null) {
 349      throw new Error(`unknown thread ${params.threadId}`);
 350    }
 351
 352    return session;
 353  }
 354
 355  async threadStart(params = {}) {
 356    const threadId = `thread-${this.nextThreadId}`;
 357    this.nextThreadId += 1;
 358    const session = {
 359      thread: {
 360        cliVersion: "test",
 361        createdAt: Date.now(),
 362        cwd: params.cwd ?? this.defaultCwd,
 363        ephemeral: params.ephemeral ?? true,
 364        id: threadId,
 365        modelProvider: params.modelProvider ?? "openai",
 366        name: null,
 367        preview: "fake session",
 368        source: { custom: "codexd-test" },
 369        status: { type: "idle" },
 370        turns: [],
 371        updatedAt: Date.now()
 372      },
 373      approvalPolicy: params.approvalPolicy ?? "never",
 374      cwd: params.cwd ?? this.defaultCwd,
 375      model: params.model ?? "gpt-5.4",
 376      modelProvider: params.modelProvider ?? "openai",
 377      reasoningEffort: "medium",
 378      sandbox: { type: "dangerFullAccess" },
 379      serviceTier: params.serviceTier ?? null
 380    };
 381
 382    this.sessions.set(threadId, session);
 383    this.events.emit({
 384      notificationMethod: "thread/started",
 385      thread: session.thread,
 386      type: "thread.started"
 387    });
 388    return session;
 389  }
 390
 391  async turnInterrupt() {}
 392
 393  async turnStart(params) {
 394    const session = this.sessions.get(params.threadId);
 395
 396    if (session == null) {
 397      throw new Error(`unknown thread ${params.threadId}`);
 398    }
 399
 400    const turnNumber = this.nextTurnId;
 401    const turnId = `turn-${turnNumber}`;
 402    this.nextTurnId += 1;
 403    const turn = {
 404      error: null,
 405      id: turnId,
 406      status: "inProgress"
 407    };
 408
 409    session.thread.turns = [...(session.thread.turns ?? []), turn];
 410
 411    queueMicrotask(() => {
 412      this.events.emit({
 413        notificationMethod: "turn/started",
 414        threadId: params.threadId,
 415        turn,
 416        type: "turn.started"
 417      });
 418      const completeTurn = () => {
 419        this.events.emit({
 420          delta: "hello from fake adapter",
 421          itemId: "item-1",
 422          notificationMethod: "item/agentMessage/delta",
 423          threadId: params.threadId,
 424          turnId,
 425          type: "turn.message.delta"
 426        });
 427
 428        const completedTurn = {
 429          ...turn,
 430          status: "completed"
 431        };
 432
 433        session.thread.turns = session.thread.turns.map((entry) =>
 434          entry.id === completedTurn.id ? completedTurn : entry
 435        );
 436        this.events.emit({
 437          notificationMethod: "turn/completed",
 438          threadId: params.threadId,
 439          turn: completedTurn,
 440          type: "turn.completed"
 441        });
 442      };
 443
 444      if (!this.retryingTurnNumbers.has(turnNumber)) {
 445        completeTurn();
 446        return;
 447      }
 448
 449      this.events.emit({
 450        error: {
 451          additionalDetails: "timeout waiting for child process to exit",
 452          codexErrorInfo: {
 453            responseStreamDisconnected: {
 454              httpStatusCode: null
 455            }
 456          },
 457          message: "Reconnecting... 2/5"
 458        },
 459        notificationMethod: "error",
 460        threadId: params.threadId,
 461        turnId,
 462        type: "turn.error",
 463        willRetry: true
 464      });
 465      setTimeout(() => {
 466        completeTurn();
 467      }, this.retryingTurnCompletionDelayMs);
 468    });
 469
 470    return {
 471      turn
 472    };
 473  }
 474
 475  async turnSteer(params) {
 476    return {
 477      turnId: params.expectedTurnId
 478    };
 479  }
 480}
 481
 482test("CodexdDaemon persists daemon identity, child state, session registry, and recent events", async () => {
 483  const repoRoot = mkdtempSync(join(tmpdir(), "codexd-daemon-test-"));
 484  const config = resolveCodexdConfig({
 485    logsDir: join(repoRoot, "logs"),
 486    repoRoot,
 487    stateDir: join(repoRoot, "state")
 488  });
 489  const fakeChild = new FakeChild();
 490  const daemon = new CodexdDaemon(config, {
 491    env: {
 492      HOME: repoRoot
 493    },
 494    spawner: {
 495      spawn(command, args, options) {
 496        assert.equal(command, "codex");
 497        assert.deepEqual(args, ["app-server"]);
 498        assert.equal(options.cwd, repoRoot);
 499
 500        queueMicrotask(() => {
 501          fakeChild.emit("spawn");
 502          fakeChild.stdout.emit("data", "ready from fake child\n");
 503          fakeChild.stderr.emit("data", "warning from fake child\n");
 504        });
 505
 506        return fakeChild;
 507      }
 508    }
 509  });
 510
 511  const started = await daemon.start();
 512  assert.equal(started.daemon.child.status, "running");
 513  assert.equal(started.daemon.child.pid, 4242);
 514
 515  const session = await daemon.registerSession({
 516    metadata: {
 517      runId: "run-1"
 518    },
 519    purpose: "worker"
 520  });
 521  assert.equal(session.status, "active");
 522
 523  const closed = await daemon.closeSession(session.sessionId);
 524  assert.equal(closed?.status, "closed");
 525
 526  const stopped = await daemon.stop();
 527  assert.equal(stopped.daemon.started, false);
 528  assert.equal(stopped.runRegistry.runs.length, 0);
 529  assert.equal(stopped.sessionRegistry.sessions.length, 1);
 530  assert.equal(stopped.sessionRegistry.sessions[0].status, "closed");
 531  assert.ok(stopped.recentEvents.events.length >= 4);
 532
 533  const daemonState = JSON.parse(readFileSync(config.paths.daemonStatePath, "utf8"));
 534  const sessionRegistry = JSON.parse(readFileSync(config.paths.sessionRegistryPath, "utf8"));
 535  const runRegistry = JSON.parse(readFileSync(config.paths.runRegistryPath, "utf8"));
 536  const recentEvents = JSON.parse(readFileSync(config.paths.recentEventsPath, "utf8"));
 537  const eventLog = readFileSync(config.paths.structuredEventLogPath, "utf8");
 538
 539  assert.equal(daemonState.child.status, "stopped");
 540  assert.equal(runRegistry.runs.length, 0);
 541  assert.equal(sessionRegistry.sessions[0].status, "closed");
 542  assert.ok(recentEvents.events.length >= 4);
 543  assert.match(eventLog, /child\.started/);
 544  assert.match(eventLog, /session\.registered/);
 545});
 546
 547test("CodexdLocalService starts the local HTTP surface and supports status, sessions, turns, and rejects runs routes", async () => {
 548  const repoRoot = mkdtempSync(join(tmpdir(), "codexd-service-test-"));
 549  const config = resolveCodexdConfig({
 550    localApiBase: "http://127.0.0.1:0",
 551    logsDir: join(repoRoot, "logs"),
 552    repoRoot,
 553    serverEndpoint: "ws://127.0.0.1:9999/codex-app-server",
 554    serverStrategy: "external",
 555    stateDir: join(repoRoot, "state")
 556  });
 557  const adapter = new FakeAppServerAdapter(repoRoot);
 558  const service = new CodexdLocalService(config, {
 559    appServerClientFactory: {
 560      create() {
 561        return adapter;
 562      }
 563    },
 564    env: {
 565      HOME: repoRoot
 566    },
 567    runExecutor: async (request) => {
 568      await sleep(10);
 569
 570      return {
 571        invocation: {
 572          additionalWritableDirectories: request.additionalWritableDirectories ?? [],
 573          args: ["exec"],
 574          color: "never",
 575          command: "codex",
 576          config: request.config ?? [],
 577          cwd: request.cwd ?? repoRoot,
 578          ephemeral: true,
 579          images: request.images ?? [],
 580          json: true,
 581          prompt: request.prompt,
 582          purpose: request.purpose ?? "fallback-worker",
 583          skipGitRepoCheck: request.skipGitRepoCheck ?? false,
 584          timeoutMs: request.timeoutMs ?? 100
 585        },
 586        ok: true,
 587        result: {
 588          durationMs: 10,
 589          exitCode: 0,
 590          finishedAt: new Date().toISOString(),
 591          jsonEvents: null,
 592          jsonParseErrors: [],
 593          lastMessage: `completed ${request.prompt}`,
 594          signal: null,
 595          startedAt: new Date().toISOString(),
 596          stderr: "",
 597          stdout: "ok",
 598          timedOut: false
 599        }
 600      };
 601    }
 602  });
 603
 604  try {
 605    const started = await service.start();
 606    assert.equal(started.service.listening, true);
 607    assert.match(started.service.resolvedBaseUrl, /^http:\/\/127\.0\.0\.1:\d+$/u);
 608    assert.match(started.service.eventStreamUrl, /^ws:\/\/127\.0\.0\.1:\d+\/v1\/codexd\/events$/u);
 609
 610    const baseUrl = started.service.resolvedBaseUrl;
 611    assert.ok(baseUrl);
 612
 613    const healthz = await fetchJson(`${baseUrl}/healthz`);
 614    assert.equal(healthz.status, 200);
 615    assert.equal(healthz.json.ok, true);
 616
 617    const describe = await fetchJson(`${baseUrl}/describe`);
 618    assert.equal(describe.status, 200);
 619    assert.equal(describe.json.ok, true);
 620    assert.equal(describe.json.name, "codexd");
 621    assert.equal(describe.json.surface, "local-api");
 622    assert.equal(describe.json.base_url, baseUrl);
 623    assert.equal(describe.json.mode.current, "app-server");
 624    assert.equal(describe.json.mode.conductor_role, "proxy");
 625    assert.equal(describe.json.event_stream.path, "/v1/codexd/events");
 626    assert.match(describe.json.event_stream.url, /^ws:\/\/127\.0\.0\.1:\d+\/v1\/codexd\/events$/u);
 627    assert.deepEqual(
 628      describe.json.routes.map((route) => `${route.method} ${route.path}`),
 629      [
 630        "GET /healthz",
 631        "GET /describe",
 632        "GET /v1/codexd/status",
 633        "GET /v1/codexd/sessions",
 634        "GET /v1/codexd/sessions/:session_id",
 635        "POST /v1/codexd/sessions",
 636        "POST /v1/codexd/turn",
 637        "WS /v1/codexd/events"
 638      ]
 639    );
 640    assert.equal(describe.json.capabilities.turn_create, true);
 641    assert.equal(describe.json.capabilities.websocket_events, true);
 642    assert.doesNotMatch(JSON.stringify(describe.json), /runs/iu);
 643    assert.doesNotMatch(JSON.stringify(describe.json), /exec/iu);
 644
 645    const status = await fetchJson(`${baseUrl}/v1/codexd/status`);
 646    assert.equal(status.status, 200);
 647    assert.equal(status.json.ok, true);
 648    assert.equal(status.json.data.snapshot.daemon.started, true);
 649
 650    const createdSession = await postJson(`${baseUrl}/v1/codexd/sessions`, {
 651      cwd: repoRoot,
 652      model: "gpt-5.4",
 653      purpose: "duplex"
 654    });
 655    assert.equal(createdSession.status, 201);
 656    const session = createdSession.json.data.session;
 657    assert.equal(session.purpose, "duplex");
 658    assert.match(session.threadId, /^thread-/u);
 659
 660    const sessions = await fetchJson(`${baseUrl}/v1/codexd/sessions`);
 661    assert.equal(sessions.status, 200);
 662    assert.equal(sessions.json.data.sessions.length, 1);
 663
 664    const readSession = await fetchJson(`${baseUrl}/v1/codexd/sessions/${session.sessionId}`);
 665    assert.equal(readSession.status, 200);
 666    assert.equal(readSession.json.data.session.sessionId, session.sessionId);
 667
 668    const createdTurn = await postJson(`${baseUrl}/v1/codexd/turn`, {
 669      input: "Say hello.",
 670      sessionId: session.sessionId
 671    });
 672    assert.equal(createdTurn.status, 202);
 673    assert.match(createdTurn.json.data.turnId, /^turn-/u);
 674
 675    const completedSession = await waitFor(async () => {
 676      const current = await fetchJson(`${baseUrl}/v1/codexd/sessions/${session.sessionId}`);
 677      return current.json.data.session.lastTurnStatus === "completed" ? current : null;
 678    });
 679    assert.equal(completedSession.json.data.session.currentTurnId, null);
 680    assert.equal(completedSession.json.data.session.lastTurnStatus, "completed");
 681
 682    const listRuns = await fetchJson(`${baseUrl}/v1/codexd/runs`);
 683    assert.equal(listRuns.status, 404);
 684
 685    const createRun = await postJson(`${baseUrl}/v1/codexd/runs`, {
 686      cwd: repoRoot,
 687      prompt: "Inspect repo"
 688    });
 689    assert.equal(createRun.status, 404);
 690
 691    const readRun = await fetchJson(`${baseUrl}/v1/codexd/runs/run-legacy`);
 692    assert.equal(readRun.status, 404);
 693  } finally {
 694    await service.stop();
 695  }
 696});
 697
 698test("CodexdDaemon keeps retrying turns in progress until the app-server completes them", async () => {
 699  const repoRoot = mkdtempSync(join(tmpdir(), "codexd-retry-test-"));
 700  const config = resolveCodexdConfig({
 701    logsDir: join(repoRoot, "logs"),
 702    repoRoot,
 703    serverEndpoint: "ws://127.0.0.1:9999/codex-app-server",
 704    serverStrategy: "external",
 705    stateDir: join(repoRoot, "state")
 706  });
 707  const adapter = new FakeAppServerAdapter(repoRoot, {
 708    retryingTurnCompletionDelayMs: 100,
 709    retryingTurnNumbers: [2]
 710  });
 711  const daemon = new CodexdDaemon(config, {
 712    appServerClientFactory: {
 713      create() {
 714        return adapter;
 715      }
 716    },
 717    env: {
 718      HOME: repoRoot
 719    }
 720  });
 721
 722  await daemon.start();
 723
 724  try {
 725    const firstSession = await daemon.createSession({
 726      cwd: repoRoot,
 727      purpose: "duplex"
 728    });
 729    await daemon.createTurn({
 730      input: "Say hello.",
 731      sessionId: firstSession.sessionId
 732    });
 733    await waitFor(() => {
 734      const current = daemon.getSession(firstSession.sessionId);
 735      return current?.lastTurnStatus === "completed" ? current : null;
 736    });
 737
 738    const secondSession = await daemon.createSession({
 739      cwd: repoRoot,
 740      purpose: "duplex"
 741    });
 742    const secondTurn = await daemon.createTurn({
 743      input: "Retry and finish.",
 744      sessionId: secondSession.sessionId
 745    });
 746
 747    const retryingSession = await waitFor(() => {
 748      const current = daemon.getSession(secondSession.sessionId);
 749      const sawRetryEvent = daemon
 750        .getStatusSnapshot()
 751        .recentEvents.events.some(
 752          (event) =>
 753            event.type === "app-server.turn.error" &&
 754            event.detail?.turnId === secondTurn.turnId &&
 755            event.detail?.willRetry === true
 756        );
 757
 758      return sawRetryEvent && current?.currentTurnId === secondTurn.turnId ? current : null;
 759    });
 760    assert.equal(retryingSession.currentTurnId, secondTurn.turnId);
 761    assert.equal(retryingSession.lastTurnId, secondTurn.turnId);
 762    assert.equal(retryingSession.lastTurnStatus, "inProgress");
 763
 764    const completedSession = await waitFor(() => {
 765      const current = daemon.getSession(secondSession.sessionId);
 766      return current?.lastTurnStatus === "completed" ? current : null;
 767    });
 768    assert.equal(completedSession.currentTurnId, null);
 769    assert.equal(completedSession.lastTurnId, secondTurn.turnId);
 770    assert.equal(completedSession.lastTurnStatus, "completed");
 771  } finally {
 772    await daemon.stop();
 773  }
 774});
 775
 776test("CodexdDaemon flushes the final stdio completion event so sequential sessions still finish", async () => {
 777  const repoRoot = mkdtempSync(join(tmpdir(), "codexd-stdio-tail-test-"));
 778  const config = resolveCodexdConfig({
 779    logsDir: join(repoRoot, "logs"),
 780    repoRoot,
 781    stateDir: join(repoRoot, "state")
 782  });
 783  const fakeChild = new FakeRpcAppServerChild();
 784  const daemon = new CodexdDaemon(config, {
 785    env: {
 786      HOME: repoRoot
 787    },
 788    spawner: {
 789      spawn(command, args, options) {
 790        assert.equal(command, "codex");
 791        assert.deepEqual(args, ["app-server"]);
 792        assert.equal(options.cwd, repoRoot);
 793
 794        queueMicrotask(() => {
 795          fakeChild.emit("spawn");
 796        });
 797
 798        return fakeChild;
 799      }
 800    }
 801  });
 802
 803  await daemon.start();
 804
 805  try {
 806    const firstSession = await daemon.createSession({
 807      cwd: repoRoot,
 808      purpose: "duplex"
 809    });
 810    const firstTurn = await daemon.createTurn({
 811      input: "First turn.",
 812      sessionId: firstSession.sessionId
 813    });
 814    const completedFirstSession = await waitFor(() => {
 815      const current = daemon.getSession(firstSession.sessionId);
 816      return current?.lastTurnStatus === "completed" ? current : null;
 817    });
 818    assert.equal(completedFirstSession.currentTurnId, null);
 819    assert.equal(completedFirstSession.lastTurnId, firstTurn.turnId);
 820
 821    const secondSession = await daemon.createSession({
 822      cwd: repoRoot,
 823      purpose: "duplex"
 824    });
 825    const secondTurn = await daemon.createTurn({
 826      input: "Second turn.",
 827      sessionId: secondSession.sessionId
 828    });
 829
 830    const retryingSecondSession = await waitFor(() => {
 831      const current = daemon.getSession(secondSession.sessionId);
 832      const sawRetryEvent = daemon
 833        .getStatusSnapshot()
 834        .recentEvents.events.some(
 835          (event) =>
 836            event.type === "app-server.turn.error" &&
 837            event.detail?.turnId === secondTurn.turnId &&
 838            event.detail?.willRetry === true
 839        );
 840
 841      return sawRetryEvent && current?.lastTurnStatus === "inProgress" ? current : null;
 842    });
 843    assert.equal(retryingSecondSession.currentTurnId, secondTurn.turnId);
 844
 845    const completedSecondSession = await waitFor(() => {
 846      const current = daemon.getSession(secondSession.sessionId);
 847      const sawCompletedEvent = daemon
 848        .getStatusSnapshot()
 849        .recentEvents.events.some(
 850          (event) =>
 851            event.type === "app-server.turn.completed" &&
 852            event.detail?.turnId === secondTurn.turnId
 853        );
 854
 855      return sawCompletedEvent && current?.lastTurnStatus === "completed" ? current : null;
 856    });
 857    assert.equal(completedSecondSession.currentTurnId, null);
 858    assert.equal(completedSecondSession.lastTurnId, secondTurn.turnId);
 859    assert.equal(completedSecondSession.lastTurnStatus, "completed");
 860  } finally {
 861    await daemon.stop();
 862  }
 863});
 864
 865test("CodexdDaemon classifies transport closes before a legal completion as a new failure", async () => {
 866  const repoRoot = mkdtempSync(join(tmpdir(), "codexd-stdio-disconnect-test-"));
 867  const config = resolveCodexdConfig({
 868    logsDir: join(repoRoot, "logs"),
 869    repoRoot,
 870    stateDir: join(repoRoot, "state")
 871  });
 872  const fakeChild = new FakeRpcAppServerDisconnectBeforeCompletedChild();
 873  const daemon = new CodexdDaemon(config, {
 874    env: {
 875      HOME: repoRoot
 876    },
 877    spawner: {
 878      spawn(command, args, options) {
 879        assert.equal(command, "codex");
 880        assert.deepEqual(args, ["app-server"]);
 881        assert.equal(options.cwd, repoRoot);
 882
 883        queueMicrotask(() => {
 884          fakeChild.emit("spawn");
 885        });
 886
 887        return fakeChild;
 888      }
 889    }
 890  });
 891
 892  await daemon.start();
 893
 894  try {
 895    const firstSession = await daemon.createSession({
 896      cwd: repoRoot,
 897      purpose: "duplex"
 898    });
 899    await daemon.createTurn({
 900      input: "First turn.",
 901      sessionId: firstSession.sessionId
 902    });
 903    await waitFor(() => {
 904      const current = daemon.getSession(firstSession.sessionId);
 905      return current?.lastTurnStatus === "completed" ? current : null;
 906    });
 907
 908    const secondSession = await daemon.createSession({
 909      cwd: repoRoot,
 910      purpose: "duplex"
 911    });
 912    const secondTurn = await daemon.createTurn({
 913      input: "Second turn.",
 914      sessionId: secondSession.sessionId
 915    });
 916
 917    const failedSecondSession = await waitFor(() => {
 918      const current = daemon.getSession(secondSession.sessionId);
 919      const missingCompletedEvent = daemon
 920        .getStatusSnapshot()
 921        .recentEvents.events.find(
 922          (event) =>
 923            event.type === "app-server.turn.completed.missing" &&
 924            event.detail?.turnId === secondTurn.turnId
 925        );
 926
 927      return current?.lastTurnStatus === "failed" && missingCompletedEvent != null
 928        ? {
 929            event: missingCompletedEvent,
 930            session: current
 931          }
 932        : null;
 933    });
 934    assert.equal(failedSecondSession.session.currentTurnId, null);
 935    assert.equal(failedSecondSession.session.lastTurnId, secondTurn.turnId);
 936    assert.equal(failedSecondSession.session.lastTurnStatus, "failed");
 937    assert.deepEqual(failedSecondSession.event.detail, {
 938      childExitCode: null,
 939      childPid: 4242,
 940      childSignal: null,
 941      childStatus: "running",
 942      failureClass: "transport_closed_before_turn_completed",
 943      flushedTrailingMessage: false,
 944      sessionId: secondSession.sessionId,
 945      threadId: secondSession.threadId,
 946      trailingMessageLength: 0,
 947      transportCloseMessage: "Codex app-server stdio stdout ended.",
 948      transportCloseSource: "stdout.end",
 949      turnId: secondTurn.turnId,
 950      turnStatusAtClose: "inProgress"
 951    });
 952    assert.equal(
 953      daemon.getStatusSnapshot().recentEvents.events.some(
 954        (event) =>
 955          event.type === "app-server.turn.completed" &&
 956          event.detail?.turnId === secondTurn.turnId
 957      ),
 958      false
 959    );
 960  } finally {
 961    await daemon.stop();
 962  }
 963});
 964
 965async function fetchJson(url, init) {
 966  const response = await fetch(url, init);
 967
 968  return {
 969    json: await response.json(),
 970    status: response.status
 971  };
 972}
 973
 974async function postJson(url, body) {
 975  return await fetchJson(url, {
 976    body: JSON.stringify(body),
 977    headers: {
 978      "content-type": "application/json"
 979    },
 980    method: "POST"
 981  });
 982}
 983
 984async function sleep(ms) {
 985  await new Promise((resolve) => {
 986    setTimeout(resolve, ms);
 987  });
 988}
 989
 990async function waitFor(loader, timeoutMs = 2_000) {
 991  const startedAt = Date.now();
 992
 993  while (Date.now() - startedAt < timeoutMs) {
 994    const value = await loader();
 995
 996    if (value != null) {
 997      return value;
 998    }
 999
1000    await sleep(20);
1001  }
1002
1003  throw new Error("timed out waiting for expected condition");
1004}