baa-conductor


baa-conductor / apps / codexd / src
im_wower  ·  2026-03-25

app-server-transport.ts

  1import type { CodexAppServerTransport, CodexAppServerTransportHandlers } from "../../../packages/codex-app-server/src/transport.js";
  2
  3export interface CodexdAppServerWritableStream {
  4  end(chunk?: string | Uint8Array): unknown;
  5  write(chunk: string | Uint8Array): boolean;
  6}
  7
  8export interface CodexdAppServerReadableStream {
  9  on(event: "data", listener: (chunk: string | Uint8Array) => void): this;
 10  on(event: "end", listener: () => void): this;
 11  on(event: "error", listener: (error: Error) => void): this;
 12  setEncoding?(encoding: string): this;
 13}
 14
 15export interface CodexdAppServerProcessLike {
 16  pid?: number;
 17  stdin?: CodexdAppServerWritableStream;
 18  stderr?: CodexdAppServerReadableStream;
 19  stdout?: CodexdAppServerReadableStream;
 20  on(event: "error", listener: (error: Error) => void): this;
 21  on(event: "exit", listener: (code: number | null, signal: string | null) => void): this;
 22}
 23
 24export type CodexdAppServerTransportCloseSource =
 25  | "process.error"
 26  | "process.exit"
 27  | "stdout.end"
 28  | "stdout.error";
 29
 30export interface CodexdAppServerTransportCloseDiagnostic {
 31  exitCode?: number | null;
 32  flushedTrailingMessage: boolean;
 33  message: string;
 34  signal?: string | null;
 35  source: CodexdAppServerTransportCloseSource;
 36  trailingMessageLength: number;
 37}
 38
 39export interface CodexdAppServerStdioTransportConfig {
 40  endStdinOnClose?: boolean;
 41  onCloseDiagnostic?: (diagnostic: CodexdAppServerTransportCloseDiagnostic) => void;
 42  process: CodexdAppServerProcessLike;
 43}
 44
 45function toError(cause: unknown, fallback: string): Error {
 46  if (cause instanceof Error) {
 47    return cause;
 48  }
 49
 50  if (typeof cause === "string" && cause !== "") {
 51    return new Error(cause);
 52  }
 53
 54  return new Error(fallback);
 55}
 56
 57export function createCodexdAppServerStdioTransport(
 58  config: CodexdAppServerStdioTransportConfig
 59): CodexAppServerTransport {
 60  let buffer = "";
 61  let closed = false;
 62  let connected = false;
 63  let handlers: CodexAppServerTransportHandlers | null = null;
 64
 65  const emitBufferedMessages = (): void => {
 66    while (true) {
 67      const newlineIndex = buffer.indexOf("\n");
 68
 69      if (newlineIndex < 0) {
 70        return;
 71      }
 72
 73      const line = buffer.slice(0, newlineIndex).trim();
 74      buffer = buffer.slice(newlineIndex + 1);
 75
 76      if (line !== "") {
 77        handlers?.onMessage(line);
 78      }
 79    }
 80  };
 81
 82  const flushTrailingMessage = (): {
 83    flushedTrailingMessage: boolean;
 84    trailingMessageLength: number;
 85  } => {
 86    const line = buffer.trim();
 87    buffer = "";
 88
 89    if (line === "") {
 90      return {
 91        flushedTrailingMessage: false,
 92        trailingMessageLength: 0
 93      };
 94    }
 95
 96    handlers?.onMessage(line);
 97    return {
 98      flushedTrailingMessage: true,
 99      trailingMessageLength: line.length
100    };
101  };
102
103  const closeTransport = (
104    error: Error,
105    diagnostic: Omit<
106      CodexdAppServerTransportCloseDiagnostic,
107      "flushedTrailingMessage" | "message" | "trailingMessageLength"
108    >
109  ): void => {
110    if (closed) {
111      return;
112    }
113
114    const trailingMessage = flushTrailingMessage();
115    closed = true;
116    connected = false;
117    config.onCloseDiagnostic?.({
118      ...diagnostic,
119      flushedTrailingMessage: trailingMessage.flushedTrailingMessage,
120      message: error.message,
121      trailingMessageLength: trailingMessage.trailingMessageLength
122    });
123    handlers?.onClose(error);
124  };
125
126  return {
127    async connect(nextHandlers: CodexAppServerTransportHandlers): Promise<void> {
128      if (closed) {
129        throw new Error("Codex app-server stdio transport is already closed.");
130      }
131
132      if (connected) {
133        handlers = nextHandlers;
134        return;
135      }
136
137      const stdout = config.process.stdout;
138      const stdin = config.process.stdin;
139
140      if (stdout == null || stdin == null) {
141        throw new Error("Codex app-server stdio transport requires child stdin and stdout.");
142      }
143
144      handlers = nextHandlers;
145      connected = true;
146      stdout.setEncoding?.("utf8");
147      stdout.on("data", (chunk) => {
148        buffer += typeof chunk === "string" ? chunk : new TextDecoder().decode(chunk);
149        emitBufferedMessages();
150      });
151      stdout.on("end", () => {
152        closeTransport(new Error("Codex app-server stdio stdout ended."), {
153          source: "stdout.end"
154        });
155      });
156      stdout.on("error", (error) => {
157        closeTransport(error, {
158          source: "stdout.error"
159        });
160      });
161      config.process.on("error", (error) => {
162        closeTransport(error, {
163          source: "process.error"
164        });
165      });
166      config.process.on("exit", (code, signal) => {
167        closeTransport(
168          new Error(
169            `Codex app-server stdio child exited (code=${String(code)}, signal=${String(signal)}).`
170          ),
171          {
172            exitCode: code,
173            signal,
174            source: "process.exit"
175          }
176        );
177      });
178    },
179
180    async send(message: string): Promise<void> {
181      if (closed || !connected || config.process.stdin == null) {
182        throw new Error("Codex app-server stdio transport is not connected.");
183      }
184
185      const ok = config.process.stdin.write(`${message}\n`);
186
187      if (!ok) {
188        throw new Error("Codex app-server stdio transport failed to write request.");
189      }
190    },
191
192    async close(): Promise<void> {
193      if (closed) {
194        return;
195      }
196
197      closed = true;
198      connected = false;
199
200      if (config.endStdinOnClose ?? false) {
201        try {
202          config.process.stdin?.end();
203        } catch (error) {
204          throw toError(error, "Failed to close Codex app-server stdin.");
205        }
206      }
207    }
208  };
209}