im_wower
·
2026-03-25
app-server-transport.ts
1import type { CodexAppServerTransport, CodexAppServerTransportHandlers } from "../../../packages/codex-app-server/src/transport.js";
2
3export interface CodexdAppServerWritableStream {
4 end(chunk?: string | Uint8Array): unknown;
5 write(chunk: string | Uint8Array): boolean;
6}
7
8export interface CodexdAppServerReadableStream {
9 on(event: "data", listener: (chunk: string | Uint8Array) => void): this;
10 on(event: "end", listener: () => void): this;
11 on(event: "error", listener: (error: Error) => void): this;
12 setEncoding?(encoding: string): this;
13}
14
15export interface CodexdAppServerProcessLike {
16 pid?: number;
17 stdin?: CodexdAppServerWritableStream;
18 stderr?: CodexdAppServerReadableStream;
19 stdout?: CodexdAppServerReadableStream;
20 on(event: "error", listener: (error: Error) => void): this;
21 on(event: "exit", listener: (code: number | null, signal: string | null) => void): this;
22}
23
24export type CodexdAppServerTransportCloseSource =
25 | "process.error"
26 | "process.exit"
27 | "stdout.end"
28 | "stdout.error";
29
30export interface CodexdAppServerTransportCloseDiagnostic {
31 exitCode?: number | null;
32 flushedTrailingMessage: boolean;
33 message: string;
34 signal?: string | null;
35 source: CodexdAppServerTransportCloseSource;
36 trailingMessageLength: number;
37}
38
39export interface CodexdAppServerStdioTransportConfig {
40 endStdinOnClose?: boolean;
41 onCloseDiagnostic?: (diagnostic: CodexdAppServerTransportCloseDiagnostic) => void;
42 process: CodexdAppServerProcessLike;
43}
44
45function toError(cause: unknown, fallback: string): Error {
46 if (cause instanceof Error) {
47 return cause;
48 }
49
50 if (typeof cause === "string" && cause !== "") {
51 return new Error(cause);
52 }
53
54 return new Error(fallback);
55}
56
57export function createCodexdAppServerStdioTransport(
58 config: CodexdAppServerStdioTransportConfig
59): CodexAppServerTransport {
60 let buffer = "";
61 let closed = false;
62 let connected = false;
63 let handlers: CodexAppServerTransportHandlers | null = null;
64
65 const emitBufferedMessages = (): void => {
66 while (true) {
67 const newlineIndex = buffer.indexOf("\n");
68
69 if (newlineIndex < 0) {
70 return;
71 }
72
73 const line = buffer.slice(0, newlineIndex).trim();
74 buffer = buffer.slice(newlineIndex + 1);
75
76 if (line !== "") {
77 handlers?.onMessage(line);
78 }
79 }
80 };
81
82 const flushTrailingMessage = (): {
83 flushedTrailingMessage: boolean;
84 trailingMessageLength: number;
85 } => {
86 const line = buffer.trim();
87 buffer = "";
88
89 if (line === "") {
90 return {
91 flushedTrailingMessage: false,
92 trailingMessageLength: 0
93 };
94 }
95
96 handlers?.onMessage(line);
97 return {
98 flushedTrailingMessage: true,
99 trailingMessageLength: line.length
100 };
101 };
102
103 const closeTransport = (
104 error: Error,
105 diagnostic: Omit<
106 CodexdAppServerTransportCloseDiagnostic,
107 "flushedTrailingMessage" | "message" | "trailingMessageLength"
108 >
109 ): void => {
110 if (closed) {
111 return;
112 }
113
114 const trailingMessage = flushTrailingMessage();
115 closed = true;
116 connected = false;
117 config.onCloseDiagnostic?.({
118 ...diagnostic,
119 flushedTrailingMessage: trailingMessage.flushedTrailingMessage,
120 message: error.message,
121 trailingMessageLength: trailingMessage.trailingMessageLength
122 });
123 handlers?.onClose(error);
124 };
125
126 return {
127 async connect(nextHandlers: CodexAppServerTransportHandlers): Promise<void> {
128 if (closed) {
129 throw new Error("Codex app-server stdio transport is already closed.");
130 }
131
132 if (connected) {
133 handlers = nextHandlers;
134 return;
135 }
136
137 const stdout = config.process.stdout;
138 const stdin = config.process.stdin;
139
140 if (stdout == null || stdin == null) {
141 throw new Error("Codex app-server stdio transport requires child stdin and stdout.");
142 }
143
144 handlers = nextHandlers;
145 connected = true;
146 stdout.setEncoding?.("utf8");
147 stdout.on("data", (chunk) => {
148 buffer += typeof chunk === "string" ? chunk : new TextDecoder().decode(chunk);
149 emitBufferedMessages();
150 });
151 stdout.on("end", () => {
152 closeTransport(new Error("Codex app-server stdio stdout ended."), {
153 source: "stdout.end"
154 });
155 });
156 stdout.on("error", (error) => {
157 closeTransport(error, {
158 source: "stdout.error"
159 });
160 });
161 config.process.on("error", (error) => {
162 closeTransport(error, {
163 source: "process.error"
164 });
165 });
166 config.process.on("exit", (code, signal) => {
167 closeTransport(
168 new Error(
169 `Codex app-server stdio child exited (code=${String(code)}, signal=${String(signal)}).`
170 ),
171 {
172 exitCode: code,
173 signal,
174 source: "process.exit"
175 }
176 );
177 });
178 },
179
180 async send(message: string): Promise<void> {
181 if (closed || !connected || config.process.stdin == null) {
182 throw new Error("Codex app-server stdio transport is not connected.");
183 }
184
185 const ok = config.process.stdin.write(`${message}\n`);
186
187 if (!ok) {
188 throw new Error("Codex app-server stdio transport failed to write request.");
189 }
190 },
191
192 async close(): Promise<void> {
193 if (closed) {
194 return;
195 }
196
197 closed = true;
198 connected = false;
199
200 if (config.endStdinOnClose ?? false) {
201 try {
202 config.process.stdin?.end();
203 } catch (error) {
204 throw toError(error, "Failed to close Codex app-server stdin.");
205 }
206 }
207 }
208 };
209}