im_wower
·
2026-03-25
index.test.js
1import assert from "node:assert/strict";
2import { mkdtempSync, readFileSync } from "node:fs";
3import { tmpdir } from "node:os";
4import { join } from "node:path";
5import test from "node:test";
6
7import {
8 CodexdDaemon,
9 CodexdLocalService,
10 resolveCodexdConfig
11} from "../dist/index.js";
12
13class FakeEventStream {
14 constructor() {
15 this.listeners = new Set();
16 }
17
18 emit(event) {
19 for (const listener of this.listeners) {
20 listener(event);
21 }
22 }
23
24 subscribe(listener) {
25 this.listeners.add(listener);
26
27 return {
28 unsubscribe: () => {
29 this.listeners.delete(listener);
30 }
31 };
32 }
33}
34
35class FakeStream {
36 constructor() {
37 this.listeners = new Map();
38 }
39
40 on(event, listener) {
41 this.listeners.set(event, [...(this.listeners.get(event) ?? []), listener]);
42 return this;
43 }
44
45 setEncoding() {
46 return this;
47 }
48
49 emit(event, ...args) {
50 for (const listener of this.listeners.get(event) ?? []) {
51 listener(...args);
52 }
53 }
54}
55
56class FakeChild {
57 constructor() {
58 this.pid = 4242;
59 this.stdin = {
60 end() {},
61 write() {
62 return true;
63 }
64 };
65 this.stdout = new FakeStream();
66 this.stderr = new FakeStream();
67 this.listeners = new Map();
68 this.onceListeners = new Map();
69 }
70
71 on(event, listener) {
72 this.listeners.set(event, [...(this.listeners.get(event) ?? []), listener]);
73 return this;
74 }
75
76 once(event, listener) {
77 this.onceListeners.set(event, [...(this.onceListeners.get(event) ?? []), listener]);
78 return this;
79 }
80
81 kill(signal = "SIGTERM") {
82 this.emit("exit", 0, signal);
83 return true;
84 }
85
86 emit(event, ...args) {
87 for (const listener of this.listeners.get(event) ?? []) {
88 listener(...args);
89 }
90
91 for (const listener of this.onceListeners.get(event) ?? []) {
92 listener(...args);
93 }
94
95 this.onceListeners.delete(event);
96 }
97}
98
99class FakeRpcAppServerChild extends FakeChild {
100 constructor() {
101 super();
102 this.nextThreadId = 1;
103 this.nextTurnId = 1;
104 this.requests = "";
105 this.threads = new Map();
106 this.stdin = {
107 end: () => {
108 this.stdout.emit("end");
109 },
110 write: (chunk) => {
111 this.requests += typeof chunk === "string" ? chunk : new TextDecoder().decode(chunk);
112
113 while (true) {
114 const newlineIndex = this.requests.indexOf("\n");
115
116 if (newlineIndex < 0) {
117 break;
118 }
119
120 const line = this.requests.slice(0, newlineIndex).trim();
121 this.requests = this.requests.slice(newlineIndex + 1);
122
123 if (line !== "") {
124 this.handleRequest(JSON.parse(line));
125 }
126 }
127
128 return true;
129 }
130 };
131 }
132
133 emitRpcMessage(payload, { trailingNewline = true } = {}) {
134 this.stdout.emit("data", `${JSON.stringify(payload)}${trailingNewline ? "\n" : ""}`);
135 }
136
137 finishRetryingTurn(session, turnId, completedTurn) {
138 session.thread.turns = session.thread.turns.map((entry) =>
139 entry.id === turnId ? completedTurn : entry
140 );
141 this.emitRpcMessage(
142 {
143 method: "turn/completed",
144 params: {
145 threadId: session.thread.id,
146 turn: completedTurn
147 }
148 },
149 {
150 trailingNewline: false
151 }
152 );
153 this.stdout.emit("end");
154 }
155
156 handleRequest(request) {
157 switch (request.method) {
158 case "initialize":
159 queueMicrotask(() => {
160 this.emitRpcMessage({
161 id: request.id,
162 result: {
163 platformFamily: "unix",
164 platformOs: "macos",
165 userAgent: "codex-cli fake-rpc"
166 }
167 });
168 });
169 break;
170
171 case "thread/start":
172 queueMicrotask(() => {
173 const threadId = `thread-${this.nextThreadId}`;
174 this.nextThreadId += 1;
175 const thread = {
176 cliVersion: "test",
177 createdAt: Date.now(),
178 cwd: request.params?.cwd ?? "/tmp/fake-rpc",
179 ephemeral: true,
180 id: threadId,
181 modelProvider: "openai",
182 name: null,
183 preview: "fake rpc session",
184 source: { custom: "codexd-rpc-test" },
185 status: { type: "idle" },
186 turns: [],
187 updatedAt: Date.now()
188 };
189 const session = {
190 approvalPolicy: "never",
191 cwd: thread.cwd,
192 model: "gpt-5.4",
193 modelProvider: "openai",
194 reasoningEffort: "medium",
195 sandbox: { type: "dangerFullAccess" },
196 serviceTier: null,
197 thread
198 };
199
200 this.threads.set(threadId, session);
201 this.emitRpcMessage({
202 method: "thread/started",
203 params: {
204 thread
205 }
206 });
207 this.emitRpcMessage({
208 id: request.id,
209 result: session
210 });
211 });
212 break;
213
214 case "turn/start":
215 queueMicrotask(() => {
216 const session = this.threads.get(request.params.threadId);
217
218 if (session == null) {
219 this.emitRpcMessage({
220 id: request.id,
221 error: {
222 code: -32000,
223 message: `unknown thread ${request.params.threadId}`
224 }
225 });
226 return;
227 }
228
229 const turnNumber = this.nextTurnId;
230 const turnId = `turn-${turnNumber}`;
231 this.nextTurnId += 1;
232 const turn = {
233 error: null,
234 id: turnId,
235 status: "inProgress"
236 };
237 const completedTurn = {
238 ...turn,
239 status: "completed"
240 };
241
242 session.thread.turns = [...session.thread.turns, turn];
243 this.emitRpcMessage({
244 method: "turn/started",
245 params: {
246 threadId: session.thread.id,
247 turn
248 }
249 });
250 this.emitRpcMessage({
251 id: request.id,
252 result: {
253 turn
254 }
255 });
256 this.emitRpcMessage({
257 method: "item/agentMessage/delta",
258 params: {
259 delta: `reply ${turnId}`,
260 itemId: `item-${turnId}`,
261 threadId: session.thread.id,
262 turnId
263 }
264 });
265
266 if (turnNumber === 2) {
267 this.emitRpcMessage({
268 method: "error",
269 params: {
270 error: {
271 additionalDetails: "timeout waiting for child process to exit",
272 codexErrorInfo: {
273 responseStreamDisconnected: {
274 httpStatusCode: null
275 }
276 },
277 message: "Reconnecting... 2/5"
278 },
279 threadId: session.thread.id,
280 turnId,
281 willRetry: true
282 }
283 });
284 setTimeout(() => {
285 this.finishRetryingTurn(session, turnId, completedTurn);
286 }, 25);
287 return;
288 }
289
290 session.thread.turns = session.thread.turns.map((entry) =>
291 entry.id === turnId ? completedTurn : entry
292 );
293 this.emitRpcMessage({
294 method: "turn/completed",
295 params: {
296 threadId: session.thread.id,
297 turn: completedTurn
298 }
299 });
300 });
301 break;
302
303 default:
304 queueMicrotask(() => {
305 this.emitRpcMessage({
306 id: request.id,
307 error: {
308 code: -32601,
309 message: `unexpected method ${request.method}`
310 }
311 });
312 });
313 break;
314 }
315 }
316}
317
318class FakeRpcAppServerDisconnectBeforeCompletedChild extends FakeRpcAppServerChild {
319 finishRetryingTurn() {
320 this.stdout.emit("end");
321 }
322}
323
324class FakeAppServerAdapter {
325 constructor(defaultCwd, options = {}) {
326 this.defaultCwd = defaultCwd;
327 this.events = new FakeEventStream();
328 this.nextThreadId = 1;
329 this.nextTurnId = 1;
330 this.retryingTurnCompletionDelayMs = options.retryingTurnCompletionDelayMs ?? 0;
331 this.retryingTurnNumbers = new Set(options.retryingTurnNumbers ?? []);
332 this.sessions = new Map();
333 }
334
335 async close() {}
336
337 async initialize() {
338 return {
339 platformFamily: "unix",
340 platformOs: "macos",
341 userAgent: "codex-cli test"
342 };
343 }
344
345 async threadResume(params) {
346 const session = this.sessions.get(params.threadId);
347
348 if (session == null) {
349 throw new Error(`unknown thread ${params.threadId}`);
350 }
351
352 return session;
353 }
354
355 async threadStart(params = {}) {
356 const threadId = `thread-${this.nextThreadId}`;
357 this.nextThreadId += 1;
358 const session = {
359 thread: {
360 cliVersion: "test",
361 createdAt: Date.now(),
362 cwd: params.cwd ?? this.defaultCwd,
363 ephemeral: params.ephemeral ?? true,
364 id: threadId,
365 modelProvider: params.modelProvider ?? "openai",
366 name: null,
367 preview: "fake session",
368 source: { custom: "codexd-test" },
369 status: { type: "idle" },
370 turns: [],
371 updatedAt: Date.now()
372 },
373 approvalPolicy: params.approvalPolicy ?? "never",
374 cwd: params.cwd ?? this.defaultCwd,
375 model: params.model ?? "gpt-5.4",
376 modelProvider: params.modelProvider ?? "openai",
377 reasoningEffort: "medium",
378 sandbox: { type: "dangerFullAccess" },
379 serviceTier: params.serviceTier ?? null
380 };
381
382 this.sessions.set(threadId, session);
383 this.events.emit({
384 notificationMethod: "thread/started",
385 thread: session.thread,
386 type: "thread.started"
387 });
388 return session;
389 }
390
391 async turnInterrupt() {}
392
393 async turnStart(params) {
394 const session = this.sessions.get(params.threadId);
395
396 if (session == null) {
397 throw new Error(`unknown thread ${params.threadId}`);
398 }
399
400 const turnNumber = this.nextTurnId;
401 const turnId = `turn-${turnNumber}`;
402 this.nextTurnId += 1;
403 const turn = {
404 error: null,
405 id: turnId,
406 status: "inProgress"
407 };
408
409 session.thread.turns = [...(session.thread.turns ?? []), turn];
410
411 queueMicrotask(() => {
412 this.events.emit({
413 notificationMethod: "turn/started",
414 threadId: params.threadId,
415 turn,
416 type: "turn.started"
417 });
418 const completeTurn = () => {
419 this.events.emit({
420 delta: "hello from fake adapter",
421 itemId: "item-1",
422 notificationMethod: "item/agentMessage/delta",
423 threadId: params.threadId,
424 turnId,
425 type: "turn.message.delta"
426 });
427
428 const completedTurn = {
429 ...turn,
430 status: "completed"
431 };
432
433 session.thread.turns = session.thread.turns.map((entry) =>
434 entry.id === completedTurn.id ? completedTurn : entry
435 );
436 this.events.emit({
437 notificationMethod: "turn/completed",
438 threadId: params.threadId,
439 turn: completedTurn,
440 type: "turn.completed"
441 });
442 };
443
444 if (!this.retryingTurnNumbers.has(turnNumber)) {
445 completeTurn();
446 return;
447 }
448
449 this.events.emit({
450 error: {
451 additionalDetails: "timeout waiting for child process to exit",
452 codexErrorInfo: {
453 responseStreamDisconnected: {
454 httpStatusCode: null
455 }
456 },
457 message: "Reconnecting... 2/5"
458 },
459 notificationMethod: "error",
460 threadId: params.threadId,
461 turnId,
462 type: "turn.error",
463 willRetry: true
464 });
465 setTimeout(() => {
466 completeTurn();
467 }, this.retryingTurnCompletionDelayMs);
468 });
469
470 return {
471 turn
472 };
473 }
474
475 async turnSteer(params) {
476 return {
477 turnId: params.expectedTurnId
478 };
479 }
480}
481
482test("CodexdDaemon persists daemon identity, child state, session registry, and recent events", async () => {
483 const repoRoot = mkdtempSync(join(tmpdir(), "codexd-daemon-test-"));
484 const config = resolveCodexdConfig({
485 logsDir: join(repoRoot, "logs"),
486 repoRoot,
487 stateDir: join(repoRoot, "state")
488 });
489 const fakeChild = new FakeChild();
490 const daemon = new CodexdDaemon(config, {
491 env: {
492 HOME: repoRoot
493 },
494 spawner: {
495 spawn(command, args, options) {
496 assert.equal(command, "codex");
497 assert.deepEqual(args, ["app-server"]);
498 assert.equal(options.cwd, repoRoot);
499
500 queueMicrotask(() => {
501 fakeChild.emit("spawn");
502 fakeChild.stdout.emit("data", "ready from fake child\n");
503 fakeChild.stderr.emit("data", "warning from fake child\n");
504 });
505
506 return fakeChild;
507 }
508 }
509 });
510
511 const started = await daemon.start();
512 assert.equal(started.daemon.child.status, "running");
513 assert.equal(started.daemon.child.pid, 4242);
514
515 const session = await daemon.registerSession({
516 metadata: {
517 runId: "run-1"
518 },
519 purpose: "worker"
520 });
521 assert.equal(session.status, "active");
522
523 const closed = await daemon.closeSession(session.sessionId);
524 assert.equal(closed?.status, "closed");
525
526 const stopped = await daemon.stop();
527 assert.equal(stopped.daemon.started, false);
528 assert.equal(stopped.runRegistry.runs.length, 0);
529 assert.equal(stopped.sessionRegistry.sessions.length, 1);
530 assert.equal(stopped.sessionRegistry.sessions[0].status, "closed");
531 assert.ok(stopped.recentEvents.events.length >= 4);
532
533 const daemonState = JSON.parse(readFileSync(config.paths.daemonStatePath, "utf8"));
534 const sessionRegistry = JSON.parse(readFileSync(config.paths.sessionRegistryPath, "utf8"));
535 const runRegistry = JSON.parse(readFileSync(config.paths.runRegistryPath, "utf8"));
536 const recentEvents = JSON.parse(readFileSync(config.paths.recentEventsPath, "utf8"));
537 const eventLog = readFileSync(config.paths.structuredEventLogPath, "utf8");
538
539 assert.equal(daemonState.child.status, "stopped");
540 assert.equal(runRegistry.runs.length, 0);
541 assert.equal(sessionRegistry.sessions[0].status, "closed");
542 assert.ok(recentEvents.events.length >= 4);
543 assert.match(eventLog, /child\.started/);
544 assert.match(eventLog, /session\.registered/);
545});
546
547test("CodexdLocalService starts the local HTTP surface and supports status, sessions, turns, and rejects runs routes", async () => {
548 const repoRoot = mkdtempSync(join(tmpdir(), "codexd-service-test-"));
549 const config = resolveCodexdConfig({
550 localApiBase: "http://127.0.0.1:0",
551 logsDir: join(repoRoot, "logs"),
552 repoRoot,
553 serverEndpoint: "ws://127.0.0.1:9999/codex-app-server",
554 serverStrategy: "external",
555 stateDir: join(repoRoot, "state")
556 });
557 const adapter = new FakeAppServerAdapter(repoRoot);
558 const service = new CodexdLocalService(config, {
559 appServerClientFactory: {
560 create() {
561 return adapter;
562 }
563 },
564 env: {
565 HOME: repoRoot
566 },
567 runExecutor: async (request) => {
568 await sleep(10);
569
570 return {
571 invocation: {
572 additionalWritableDirectories: request.additionalWritableDirectories ?? [],
573 args: ["exec"],
574 color: "never",
575 command: "codex",
576 config: request.config ?? [],
577 cwd: request.cwd ?? repoRoot,
578 ephemeral: true,
579 images: request.images ?? [],
580 json: true,
581 prompt: request.prompt,
582 purpose: request.purpose ?? "fallback-worker",
583 skipGitRepoCheck: request.skipGitRepoCheck ?? false,
584 timeoutMs: request.timeoutMs ?? 100
585 },
586 ok: true,
587 result: {
588 durationMs: 10,
589 exitCode: 0,
590 finishedAt: new Date().toISOString(),
591 jsonEvents: null,
592 jsonParseErrors: [],
593 lastMessage: `completed ${request.prompt}`,
594 signal: null,
595 startedAt: new Date().toISOString(),
596 stderr: "",
597 stdout: "ok",
598 timedOut: false
599 }
600 };
601 }
602 });
603
604 try {
605 const started = await service.start();
606 assert.equal(started.service.listening, true);
607 assert.match(started.service.resolvedBaseUrl, /^http:\/\/127\.0\.0\.1:\d+$/u);
608 assert.match(started.service.eventStreamUrl, /^ws:\/\/127\.0\.0\.1:\d+\/v1\/codexd\/events$/u);
609
610 const baseUrl = started.service.resolvedBaseUrl;
611 assert.ok(baseUrl);
612
613 const healthz = await fetchJson(`${baseUrl}/healthz`);
614 assert.equal(healthz.status, 200);
615 assert.equal(healthz.json.ok, true);
616
617 const describe = await fetchJson(`${baseUrl}/describe`);
618 assert.equal(describe.status, 200);
619 assert.equal(describe.json.ok, true);
620 assert.equal(describe.json.name, "codexd");
621 assert.equal(describe.json.surface, "local-api");
622 assert.equal(describe.json.base_url, baseUrl);
623 assert.equal(describe.json.mode.current, "app-server");
624 assert.equal(describe.json.mode.conductor_role, "proxy");
625 assert.equal(describe.json.event_stream.path, "/v1/codexd/events");
626 assert.match(describe.json.event_stream.url, /^ws:\/\/127\.0\.0\.1:\d+\/v1\/codexd\/events$/u);
627 assert.deepEqual(
628 describe.json.routes.map((route) => `${route.method} ${route.path}`),
629 [
630 "GET /healthz",
631 "GET /describe",
632 "GET /v1/codexd/status",
633 "GET /v1/codexd/sessions",
634 "GET /v1/codexd/sessions/:session_id",
635 "POST /v1/codexd/sessions",
636 "POST /v1/codexd/turn",
637 "WS /v1/codexd/events"
638 ]
639 );
640 assert.equal(describe.json.capabilities.turn_create, true);
641 assert.equal(describe.json.capabilities.websocket_events, true);
642 assert.doesNotMatch(JSON.stringify(describe.json), /runs/iu);
643 assert.doesNotMatch(JSON.stringify(describe.json), /exec/iu);
644
645 const status = await fetchJson(`${baseUrl}/v1/codexd/status`);
646 assert.equal(status.status, 200);
647 assert.equal(status.json.ok, true);
648 assert.equal(status.json.data.snapshot.daemon.started, true);
649
650 const createdSession = await postJson(`${baseUrl}/v1/codexd/sessions`, {
651 cwd: repoRoot,
652 model: "gpt-5.4",
653 purpose: "duplex"
654 });
655 assert.equal(createdSession.status, 201);
656 const session = createdSession.json.data.session;
657 assert.equal(session.purpose, "duplex");
658 assert.match(session.threadId, /^thread-/u);
659
660 const sessions = await fetchJson(`${baseUrl}/v1/codexd/sessions`);
661 assert.equal(sessions.status, 200);
662 assert.equal(sessions.json.data.sessions.length, 1);
663
664 const readSession = await fetchJson(`${baseUrl}/v1/codexd/sessions/${session.sessionId}`);
665 assert.equal(readSession.status, 200);
666 assert.equal(readSession.json.data.session.sessionId, session.sessionId);
667
668 const createdTurn = await postJson(`${baseUrl}/v1/codexd/turn`, {
669 input: "Say hello.",
670 sessionId: session.sessionId
671 });
672 assert.equal(createdTurn.status, 202);
673 assert.match(createdTurn.json.data.turnId, /^turn-/u);
674
675 const completedSession = await waitFor(async () => {
676 const current = await fetchJson(`${baseUrl}/v1/codexd/sessions/${session.sessionId}`);
677 return current.json.data.session.lastTurnStatus === "completed" ? current : null;
678 });
679 assert.equal(completedSession.json.data.session.currentTurnId, null);
680 assert.equal(completedSession.json.data.session.lastTurnStatus, "completed");
681
682 const listRuns = await fetchJson(`${baseUrl}/v1/codexd/runs`);
683 assert.equal(listRuns.status, 404);
684
685 const createRun = await postJson(`${baseUrl}/v1/codexd/runs`, {
686 cwd: repoRoot,
687 prompt: "Inspect repo"
688 });
689 assert.equal(createRun.status, 404);
690
691 const readRun = await fetchJson(`${baseUrl}/v1/codexd/runs/run-legacy`);
692 assert.equal(readRun.status, 404);
693 } finally {
694 await service.stop();
695 }
696});
697
698test("CodexdDaemon keeps retrying turns in progress until the app-server completes them", async () => {
699 const repoRoot = mkdtempSync(join(tmpdir(), "codexd-retry-test-"));
700 const config = resolveCodexdConfig({
701 logsDir: join(repoRoot, "logs"),
702 repoRoot,
703 serverEndpoint: "ws://127.0.0.1:9999/codex-app-server",
704 serverStrategy: "external",
705 stateDir: join(repoRoot, "state")
706 });
707 const adapter = new FakeAppServerAdapter(repoRoot, {
708 retryingTurnCompletionDelayMs: 100,
709 retryingTurnNumbers: [2]
710 });
711 const daemon = new CodexdDaemon(config, {
712 appServerClientFactory: {
713 create() {
714 return adapter;
715 }
716 },
717 env: {
718 HOME: repoRoot
719 }
720 });
721
722 await daemon.start();
723
724 try {
725 const firstSession = await daemon.createSession({
726 cwd: repoRoot,
727 purpose: "duplex"
728 });
729 await daemon.createTurn({
730 input: "Say hello.",
731 sessionId: firstSession.sessionId
732 });
733 await waitFor(() => {
734 const current = daemon.getSession(firstSession.sessionId);
735 return current?.lastTurnStatus === "completed" ? current : null;
736 });
737
738 const secondSession = await daemon.createSession({
739 cwd: repoRoot,
740 purpose: "duplex"
741 });
742 const secondTurn = await daemon.createTurn({
743 input: "Retry and finish.",
744 sessionId: secondSession.sessionId
745 });
746
747 const retryingSession = await waitFor(() => {
748 const current = daemon.getSession(secondSession.sessionId);
749 const sawRetryEvent = daemon
750 .getStatusSnapshot()
751 .recentEvents.events.some(
752 (event) =>
753 event.type === "app-server.turn.error" &&
754 event.detail?.turnId === secondTurn.turnId &&
755 event.detail?.willRetry === true
756 );
757
758 return sawRetryEvent && current?.currentTurnId === secondTurn.turnId ? current : null;
759 });
760 assert.equal(retryingSession.currentTurnId, secondTurn.turnId);
761 assert.equal(retryingSession.lastTurnId, secondTurn.turnId);
762 assert.equal(retryingSession.lastTurnStatus, "inProgress");
763
764 const completedSession = await waitFor(() => {
765 const current = daemon.getSession(secondSession.sessionId);
766 return current?.lastTurnStatus === "completed" ? current : null;
767 });
768 assert.equal(completedSession.currentTurnId, null);
769 assert.equal(completedSession.lastTurnId, secondTurn.turnId);
770 assert.equal(completedSession.lastTurnStatus, "completed");
771 } finally {
772 await daemon.stop();
773 }
774});
775
776test("CodexdDaemon flushes the final stdio completion event so sequential sessions still finish", async () => {
777 const repoRoot = mkdtempSync(join(tmpdir(), "codexd-stdio-tail-test-"));
778 const config = resolveCodexdConfig({
779 logsDir: join(repoRoot, "logs"),
780 repoRoot,
781 stateDir: join(repoRoot, "state")
782 });
783 const fakeChild = new FakeRpcAppServerChild();
784 const daemon = new CodexdDaemon(config, {
785 env: {
786 HOME: repoRoot
787 },
788 spawner: {
789 spawn(command, args, options) {
790 assert.equal(command, "codex");
791 assert.deepEqual(args, ["app-server"]);
792 assert.equal(options.cwd, repoRoot);
793
794 queueMicrotask(() => {
795 fakeChild.emit("spawn");
796 });
797
798 return fakeChild;
799 }
800 }
801 });
802
803 await daemon.start();
804
805 try {
806 const firstSession = await daemon.createSession({
807 cwd: repoRoot,
808 purpose: "duplex"
809 });
810 const firstTurn = await daemon.createTurn({
811 input: "First turn.",
812 sessionId: firstSession.sessionId
813 });
814 const completedFirstSession = await waitFor(() => {
815 const current = daemon.getSession(firstSession.sessionId);
816 return current?.lastTurnStatus === "completed" ? current : null;
817 });
818 assert.equal(completedFirstSession.currentTurnId, null);
819 assert.equal(completedFirstSession.lastTurnId, firstTurn.turnId);
820
821 const secondSession = await daemon.createSession({
822 cwd: repoRoot,
823 purpose: "duplex"
824 });
825 const secondTurn = await daemon.createTurn({
826 input: "Second turn.",
827 sessionId: secondSession.sessionId
828 });
829
830 const retryingSecondSession = await waitFor(() => {
831 const current = daemon.getSession(secondSession.sessionId);
832 const sawRetryEvent = daemon
833 .getStatusSnapshot()
834 .recentEvents.events.some(
835 (event) =>
836 event.type === "app-server.turn.error" &&
837 event.detail?.turnId === secondTurn.turnId &&
838 event.detail?.willRetry === true
839 );
840
841 return sawRetryEvent && current?.lastTurnStatus === "inProgress" ? current : null;
842 });
843 assert.equal(retryingSecondSession.currentTurnId, secondTurn.turnId);
844
845 const completedSecondSession = await waitFor(() => {
846 const current = daemon.getSession(secondSession.sessionId);
847 const sawCompletedEvent = daemon
848 .getStatusSnapshot()
849 .recentEvents.events.some(
850 (event) =>
851 event.type === "app-server.turn.completed" &&
852 event.detail?.turnId === secondTurn.turnId
853 );
854
855 return sawCompletedEvent && current?.lastTurnStatus === "completed" ? current : null;
856 });
857 assert.equal(completedSecondSession.currentTurnId, null);
858 assert.equal(completedSecondSession.lastTurnId, secondTurn.turnId);
859 assert.equal(completedSecondSession.lastTurnStatus, "completed");
860 } finally {
861 await daemon.stop();
862 }
863});
864
865test("CodexdDaemon classifies transport closes before a legal completion as a new failure", async () => {
866 const repoRoot = mkdtempSync(join(tmpdir(), "codexd-stdio-disconnect-test-"));
867 const config = resolveCodexdConfig({
868 logsDir: join(repoRoot, "logs"),
869 repoRoot,
870 stateDir: join(repoRoot, "state")
871 });
872 const fakeChild = new FakeRpcAppServerDisconnectBeforeCompletedChild();
873 const daemon = new CodexdDaemon(config, {
874 env: {
875 HOME: repoRoot
876 },
877 spawner: {
878 spawn(command, args, options) {
879 assert.equal(command, "codex");
880 assert.deepEqual(args, ["app-server"]);
881 assert.equal(options.cwd, repoRoot);
882
883 queueMicrotask(() => {
884 fakeChild.emit("spawn");
885 });
886
887 return fakeChild;
888 }
889 }
890 });
891
892 await daemon.start();
893
894 try {
895 const firstSession = await daemon.createSession({
896 cwd: repoRoot,
897 purpose: "duplex"
898 });
899 await daemon.createTurn({
900 input: "First turn.",
901 sessionId: firstSession.sessionId
902 });
903 await waitFor(() => {
904 const current = daemon.getSession(firstSession.sessionId);
905 return current?.lastTurnStatus === "completed" ? current : null;
906 });
907
908 const secondSession = await daemon.createSession({
909 cwd: repoRoot,
910 purpose: "duplex"
911 });
912 const secondTurn = await daemon.createTurn({
913 input: "Second turn.",
914 sessionId: secondSession.sessionId
915 });
916
917 const failedSecondSession = await waitFor(() => {
918 const current = daemon.getSession(secondSession.sessionId);
919 const missingCompletedEvent = daemon
920 .getStatusSnapshot()
921 .recentEvents.events.find(
922 (event) =>
923 event.type === "app-server.turn.completed.missing" &&
924 event.detail?.turnId === secondTurn.turnId
925 );
926
927 return current?.lastTurnStatus === "failed" && missingCompletedEvent != null
928 ? {
929 event: missingCompletedEvent,
930 session: current
931 }
932 : null;
933 });
934 assert.equal(failedSecondSession.session.currentTurnId, null);
935 assert.equal(failedSecondSession.session.lastTurnId, secondTurn.turnId);
936 assert.equal(failedSecondSession.session.lastTurnStatus, "failed");
937 assert.deepEqual(failedSecondSession.event.detail, {
938 childExitCode: null,
939 childPid: 4242,
940 childSignal: null,
941 childStatus: "running",
942 failureClass: "transport_closed_before_turn_completed",
943 flushedTrailingMessage: false,
944 sessionId: secondSession.sessionId,
945 threadId: secondSession.threadId,
946 trailingMessageLength: 0,
947 transportCloseMessage: "Codex app-server stdio stdout ended.",
948 transportCloseSource: "stdout.end",
949 turnId: secondTurn.turnId,
950 turnStatusAtClose: "inProgress"
951 });
952 assert.equal(
953 daemon.getStatusSnapshot().recentEvents.events.some(
954 (event) =>
955 event.type === "app-server.turn.completed" &&
956 event.detail?.turnId === secondTurn.turnId
957 ),
958 false
959 );
960 } finally {
961 await daemon.stop();
962 }
963});
964
965async function fetchJson(url, init) {
966 const response = await fetch(url, init);
967
968 return {
969 json: await response.json(),
970 status: response.status
971 };
972}
973
974async function postJson(url, body) {
975 return await fetchJson(url, {
976 body: JSON.stringify(body),
977 headers: {
978 "content-type": "application/json"
979 },
980 method: "POST"
981 });
982}
983
984async function sleep(ms) {
985 await new Promise((resolve) => {
986 setTimeout(resolve, ms);
987 });
988}
989
990async function waitFor(loader, timeoutMs = 2_000) {
991 const startedAt = Date.now();
992
993 while (Date.now() - startedAt < timeoutMs) {
994 const value = await loader();
995
996 if (value != null) {
997 return value;
998 }
999
1000 await sleep(20);
1001 }
1002
1003 throw new Error("timed out waiting for expected condition");
1004}