im_wower
·
2026-03-29
stream-json-transport.ts
1export interface StreamJsonWritableStream {
2 end(chunk?: string | Uint8Array): unknown;
3 write(chunk: string | Uint8Array): boolean;
4}
5
6export interface StreamJsonReadableStream {
7 on(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
8 on(event: "end", listener: () => void): unknown;
9 on(event: "error", listener: (error: Error) => void): unknown;
10 off?(event: "data", listener: (chunk: string | Uint8Array) => void): unknown;
11 off?(event: "end", listener: () => void): unknown;
12 off?(event: "error", listener: (error: Error) => void): unknown;
13 setEncoding?(encoding: string): unknown;
14}
15
16export interface StreamJsonProcessLike {
17 pid?: number;
18 stdin?: StreamJsonWritableStream;
19 stderr?: StreamJsonReadableStream;
20 stdout?: StreamJsonReadableStream;
21 on(event: "error", listener: (error: Error) => void): unknown;
22 on(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
23 off?(event: "error", listener: (error: Error) => void): unknown;
24 off?(event: "exit", listener: (code: number | null, signal: string | null) => void): unknown;
25}
26
27export type StreamJsonCloseSource =
28 | "process.error"
29 | "process.exit"
30 | "stderr.error"
31 | "stdout.end"
32 | "stdout.error";
33
34export interface StreamJsonCloseDiagnostic {
35 exitCode?: number | null;
36 message: string;
37 signal?: string | null;
38 source: StreamJsonCloseSource;
39}
40
41export type StreamJsonMessageListener = (message: Record<string, unknown>) => void;
42export type StreamJsonCloseListener = (error: Error) => void;
43export type StreamJsonStderrListener = (text: string) => void;
44
45export interface StreamJsonTransportConfig {
46 onClose?: StreamJsonCloseListener;
47 onCloseDiagnostic?: (diagnostic: StreamJsonCloseDiagnostic) => void;
48 onMessage?: StreamJsonMessageListener;
49 onStderr?: StreamJsonStderrListener;
50 process: StreamJsonProcessLike;
51}
52
53export interface StreamJsonTransport {
54 close(): void;
55 connect(): void;
56 send(message: Record<string, unknown>): void;
57 readonly closed: boolean;
58 readonly connected: boolean;
59}
60
61export function createStreamJsonTransport(config: StreamJsonTransportConfig): StreamJsonTransport {
62 let buffer = "";
63 let closed = false;
64 let connected = false;
65 let detachListeners: (() => void) | null = null;
66 const decoder = new TextDecoder();
67
68 const decodeChunk = (chunk: string | Uint8Array): string =>
69 typeof chunk === "string" ? chunk : decoder.decode(chunk);
70
71 const cleanupListeners = (): void => {
72 detachListeners?.();
73 detachListeners = null;
74 };
75
76 const emitBufferedMessages = (): void => {
77 while (true) {
78 const newlineIndex = buffer.indexOf("\n");
79
80 if (newlineIndex < 0) {
81 return;
82 }
83
84 const line = buffer.slice(0, newlineIndex).trim();
85 buffer = buffer.slice(newlineIndex + 1);
86
87 if (line === "") {
88 continue;
89 }
90
91 let parsed: unknown;
92
93 try {
94 parsed = JSON.parse(line);
95 } catch {
96 continue;
97 }
98
99 if (parsed != null && typeof parsed === "object" && !Array.isArray(parsed)) {
100 config.onMessage?.(parsed as Record<string, unknown>);
101 }
102 }
103 };
104
105 const emitDiagnostic = (diagnostic: StreamJsonCloseDiagnostic): void => {
106 config.onCloseDiagnostic?.(diagnostic);
107 };
108
109 const closeTransport = (
110 error: Error,
111 diagnostic: Omit<StreamJsonCloseDiagnostic, "message">
112 ): void => {
113 if (closed) {
114 return;
115 }
116
117 closed = true;
118 connected = false;
119 cleanupListeners();
120 buffer = "";
121 emitDiagnostic({
122 ...diagnostic,
123 message: error.message
124 });
125 config.onClose?.(error);
126 };
127
128 return {
129 get closed() {
130 return closed;
131 },
132
133 get connected() {
134 return connected;
135 },
136
137 connect(): void {
138 if (closed || connected) {
139 return;
140 }
141
142 const stdout = config.process.stdout;
143 const stdin = config.process.stdin;
144 const stderr = config.process.stderr;
145
146 if (stdout == null || stdin == null) {
147 throw new Error("stream-json transport requires child stdin and stdout.");
148 }
149
150 connected = true;
151 stdout.setEncoding?.("utf8");
152 const handleStdoutData = (chunk: string | Uint8Array): void => {
153 buffer += decodeChunk(chunk);
154 emitBufferedMessages();
155 };
156 const handleStdoutEnd = (): void => {
157 closeTransport(new Error("Claude Code stdout ended."), {
158 source: "stdout.end"
159 });
160 };
161 const handleStdoutError = (error: Error): void => {
162 closeTransport(error, {
163 source: "stdout.error"
164 });
165 };
166 const handleStderrData = (chunk: string | Uint8Array): void => {
167 config.onStderr?.(decodeChunk(chunk));
168 };
169 const handleStderrError = (error: Error): void => {
170 const message = `Claude Code stderr stream error: ${error.message}`;
171 config.onStderr?.(message);
172 emitDiagnostic({
173 source: "stderr.error",
174 message
175 });
176 };
177 const handleProcessError = (error: Error): void => {
178 closeTransport(error, {
179 source: "process.error"
180 });
181 };
182 const handleProcessExit = (code: number | null, signal: string | null): void => {
183 closeTransport(
184 new Error(
185 `Claude Code child exited (code=${String(code)}, signal=${String(signal)}).`
186 ),
187 {
188 exitCode: code,
189 signal,
190 source: "process.exit"
191 }
192 );
193 };
194
195 stdout.on("data", handleStdoutData);
196 stdout.on("end", handleStdoutEnd);
197 stdout.on("error", handleStdoutError);
198
199 if (stderr != null) {
200 stderr.setEncoding?.("utf8");
201 stderr.on("data", handleStderrData);
202 stderr.on("error", handleStderrError);
203 }
204
205 config.process.on("error", handleProcessError);
206 config.process.on("exit", handleProcessExit);
207
208 detachListeners = () => {
209 stdout.off?.("data", handleStdoutData);
210 stdout.off?.("end", handleStdoutEnd);
211 stdout.off?.("error", handleStdoutError);
212 stderr?.off?.("data", handleStderrData);
213 stderr?.off?.("error", handleStderrError);
214 config.process.off?.("error", handleProcessError);
215 config.process.off?.("exit", handleProcessExit);
216 };
217 },
218
219 send(message: Record<string, unknown>): void {
220 if (closed || !connected || config.process.stdin == null) {
221 throw new Error("stream-json transport is not connected.");
222 }
223
224 const line = JSON.stringify(message);
225 const ok = config.process.stdin.write(`${line}\n`);
226
227 if (!ok) {
228 throw new Error("stream-json transport failed to write message.");
229 }
230 },
231
232 close(): void {
233 if (closed) {
234 return;
235 }
236
237 closed = true;
238 connected = false;
239 cleanupListeners();
240 buffer = "";
241
242 try {
243 config.process.stdin?.end();
244 } catch {
245 // ignore close errors
246 }
247 }
248 };
249}