baa-conductor


baa-conductor / apps / claude-coded / src
im_wower  ·  2026-03-29

stream-json-transport.ts

  1export interface StreamJsonWritableStream {
  2  end(chunk?: string | Uint8Array): unknown;
  3  write(chunk: string | Uint8Array): boolean;
  4}
  5
  6export interface StreamJsonReadableStream {
  7  on(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
  8  on(event: "end", listener: () => void): unknown;
  9  on(event: "error", listener: (error: Error) => void): unknown;
 10  off?(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
 11  off?(event: "end", listener: () => void): unknown;
 12  off?(event: "error", listener: (error: Error) => void): unknown;
 13  setEncoding?(encoding: string): unknown;
 14}
 15
 16export interface StreamJsonProcessLike {
 17  pid?: number;
 18  stdin?: StreamJsonWritableStream;
 19  stderr?: StreamJsonReadableStream;
 20  stdout?: StreamJsonReadableStream;
 21  on(event: "error", listener: (error: Error) => void): unknown;
 22  on(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
 23  off?(event: "error", listener: (error: Error) => void): unknown;
 24  off?(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
 25}
 26
 27export type StreamJsonCloseSource =
 28  | "process.error"
 29  | "process.exit"
 30  | "stderr.error"
 31  | "stdout.end"
 32  | "stdout.error";
 33
 34export interface StreamJsonCloseDiagnostic {
 35  exitCode?: number | null;
 36  message: string;
 37  signal?: string | null;
 38  source: StreamJsonCloseSource;
 39}
 40
 41export type StreamJsonMessageListener = (message: Record<string, unknown>) => void;
 42export type StreamJsonCloseListener = (error: Error) => void;
 43export type StreamJsonStderrListener = (text: string) => void;
 44
 45export interface StreamJsonTransportConfig {
 46  onClose?: StreamJsonCloseListener;
 47  onCloseDiagnostic?: (diagnostic: StreamJsonCloseDiagnostic) => void;
 48  onMessage?: StreamJsonMessageListener;
 49  onStderr?: StreamJsonStderrListener;
 50  process: StreamJsonProcessLike;
 51}
 52
 53export interface StreamJsonTransport {
 54  close(): void;
 55  connect(): void;
 56  send(message: Record<string, unknown>): void;
 57  readonly closed: boolean;
 58  readonly connected: boolean;
 59}
 60
 61export function createStreamJsonTransport(config: StreamJsonTransportConfig): StreamJsonTransport {
 62  let buffer = "";
 63  let closed = false;
 64  let connected = false;
 65  let detachListeners: (() => void) | null = null;
 66  const decoder = new TextDecoder();
 67
 68  const decodeChunk = (chunk: string | Uint8Array): string =>
 69    typeof chunk === "string" ? chunk : decoder.decode(chunk);
 70
 71  const cleanupListeners = (): void => {
 72    detachListeners?.();
 73    detachListeners = null;
 74  };
 75
 76  const emitBufferedMessages = (): void => {
 77    while (true) {
 78      const newlineIndex = buffer.indexOf("\n");
 79
 80      if (newlineIndex < 0) {
 81        return;
 82      }
 83
 84      const line = buffer.slice(0, newlineIndex).trim();
 85      buffer = buffer.slice(newlineIndex + 1);
 86
 87      if (line === "") {
 88        continue;
 89      }
 90
 91      let parsed: unknown;
 92
 93      try {
 94        parsed = JSON.parse(line);
 95      } catch {
 96        continue;
 97      }
 98
 99      if (parsed != null && typeof parsed === "object" && !Array.isArray(parsed)) {
100        config.onMessage?.(parsed as Record<string, unknown>);
101      }
102    }
103  };
104
105  const emitDiagnostic = (diagnostic: StreamJsonCloseDiagnostic): void => {
106    config.onCloseDiagnostic?.(diagnostic);
107  };
108
109  const closeTransport = (
110    error: Error,
111    diagnostic: Omit<StreamJsonCloseDiagnostic, "message">
112  ): void => {
113    if (closed) {
114      return;
115    }
116
117    closed = true;
118    connected = false;
119    cleanupListeners();
120    buffer = "";
121    emitDiagnostic({
122      ...diagnostic,
123      message: error.message
124    });
125    config.onClose?.(error);
126  };
127
128  return {
129    get closed() {
130      return closed;
131    },
132
133    get connected() {
134      return connected;
135    },
136
137    connect(): void {
138      if (closed || connected) {
139        return;
140      }
141
142      const stdout = config.process.stdout;
143      const stdin = config.process.stdin;
144      const stderr = config.process.stderr;
145
146      if (stdout == null || stdin == null) {
147        throw new Error("stream-json transport requires child stdin and stdout.");
148      }
149
150      connected = true;
151      stdout.setEncoding?.("utf8");
152      const handleStdoutData = (chunk: string | Uint8Array): void => {
153        buffer += decodeChunk(chunk);
154        emitBufferedMessages();
155      };
156      const handleStdoutEnd = (): void => {
157        closeTransport(new Error("Claude Code stdout ended."), {
158          source: "stdout.end"
159        });
160      };
161      const handleStdoutError = (error: Error): void => {
162        closeTransport(error, {
163          source: "stdout.error"
164        });
165      };
166      const handleStderrData = (chunk: string | Uint8Array): void => {
167        config.onStderr?.(decodeChunk(chunk));
168      };
169      const handleStderrError = (error: Error): void => {
170        const message = `Claude Code stderr stream error: ${error.message}`;
171        config.onStderr?.(message);
172        emitDiagnostic({
173          source: "stderr.error",
174          message
175        });
176      };
177      const handleProcessError = (error: Error): void => {
178        closeTransport(error, {
179          source: "process.error"
180        });
181      };
182      const handleProcessExit = (code: number | null, signal: string | null): void => {
183        closeTransport(
184          new Error(
185            `Claude Code child exited (code=${String(code)}, signal=${String(signal)}).`
186          ),
187          {
188            exitCode: code,
189            signal,
190            source: "process.exit"
191          }
192        );
193      };
194
195      stdout.on("data", handleStdoutData);
196      stdout.on("end", handleStdoutEnd);
197      stdout.on("error", handleStdoutError);
198
199      if (stderr != null) {
200        stderr.setEncoding?.("utf8");
201        stderr.on("data", handleStderrData);
202        stderr.on("error", handleStderrError);
203      }
204
205      config.process.on("error", handleProcessError);
206      config.process.on("exit", handleProcessExit);
207
208      detachListeners = () => {
209        stdout.off?.("data", handleStdoutData);
210        stdout.off?.("end", handleStdoutEnd);
211        stdout.off?.("error", handleStdoutError);
212        stderr?.off?.("data", handleStderrData);
213        stderr?.off?.("error", handleStderrError);
214        config.process.off?.("error", handleProcessError);
215        config.process.off?.("exit", handleProcessExit);
216      };
217    },
218
219    send(message: Record<string, unknown>): void {
220      if (closed || !connected || config.process.stdin == null) {
221        throw new Error("stream-json transport is not connected.");
222      }
223
224      const line = JSON.stringify(message);
225      const ok = config.process.stdin.write(`${line}\n`);
226
227      if (!ok) {
228        throw new Error("stream-json transport failed to write message.");
229      }
230    },
231
232    close(): void {
233      if (closed) {
234        return;
235      }
236
237      closed = true;
238      connected = false;
239      cleanupListeners();
240      buffer = "";
241
242      try {
243        config.process.stdin?.end();
244      } catch {
245        // ignore close errors
246      }
247    }
248  };
249}