baa-conductor


baa-conductor / apps / codexd / src
im_wower  ·  2026-03-22

websocket.ts

  1import { createHash, randomUUID } from "node:crypto";
  2import type { IncomingMessage } from "node:http";
  3import type { Socket } from "node:net";
  4
  5const NORMAL_CLOSE_CODE = 1000;
  6const INVALID_MESSAGE_CLOSE_CODE = 4002;
  7const UNSUPPORTED_DATA_CLOSE_CODE = 1003;
  8const MESSAGE_TOO_LARGE_CLOSE_CODE = 1009;
  9const MAX_FRAME_PAYLOAD_BYTES = 1024 * 1024;
 10const WS_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 11
 12type JsonRecord = Record<string, unknown>;
 13
 14function asRecord(value: unknown): JsonRecord | null {
 15  if (value === null || typeof value !== "object" || Array.isArray(value)) {
 16    return null;
 17  }
 18
 19  return value as JsonRecord;
 20}
 21
 22function normalizeNonEmptyString(value: unknown): string | null {
 23  if (typeof value !== "string") {
 24    return null;
 25  }
 26
 27  const normalized = value.trim();
 28  return normalized === "" ? null : normalized;
 29}
 30
 31function normalizePathname(value: string): string {
 32  const normalized = value.replace(/\/+$/u, "");
 33  return normalized === "" ? "/" : normalized;
 34}
 35
 36function buildWebSocketAcceptValue(key: string): string {
 37  return createHash("sha1").update(`${key}${WS_GUID}`).digest("base64");
 38}
 39
 40function buildClosePayload(code: number, reason: string): Buffer {
 41  const reasonBuffer = Buffer.from(reason, "utf8");
 42  const payload = Buffer.allocUnsafe(2 + reasonBuffer.length);
 43  payload.writeUInt16BE(code, 0);
 44  reasonBuffer.copy(payload, 2);
 45  return payload;
 46}
 47
 48function buildFrame(opcode: number, payload: Buffer = Buffer.alloc(0)): Buffer {
 49  let header: Buffer;
 50
 51  if (payload.length < 126) {
 52    header = Buffer.allocUnsafe(2);
 53    header[0] = 0x80 | opcode;
 54    header[1] = payload.length;
 55    return Buffer.concat([header, payload]);
 56  }
 57
 58  if (payload.length <= 0xffff) {
 59    header = Buffer.allocUnsafe(4);
 60    header[0] = 0x80 | opcode;
 61    header[1] = 126;
 62    header.writeUInt16BE(payload.length, 2);
 63    return Buffer.concat([header, payload]);
 64  }
 65
 66  header = Buffer.allocUnsafe(10);
 67  header[0] = 0x80 | opcode;
 68  header[1] = 127;
 69  header.writeBigUInt64BE(BigInt(payload.length), 2);
 70  return Buffer.concat([header, payload]);
 71}
 72
 73export class CodexdWebSocketConnection {
 74  readonly id = randomUUID();
 75  private buffer = Buffer.alloc(0);
 76  private closed = false;
 77
 78  constructor(
 79    private readonly socket: Socket,
 80    private readonly onMessage: (connection: CodexdWebSocketConnection, message: JsonRecord) => void,
 81    private readonly onClose: (connection: CodexdWebSocketConnection) => void
 82  ) {
 83    this.socket.setNoDelay(true);
 84    this.socket.on("data", (chunk) => {
 85      this.handleData(chunk);
 86    });
 87    this.socket.on("close", () => {
 88      this.handleClosed();
 89    });
 90    this.socket.on("end", () => {
 91      this.handleClosed();
 92    });
 93    this.socket.on("error", () => {
 94      this.handleClosed();
 95    });
 96  }
 97
 98  attachHead(head: Buffer): void {
 99    if (head.length > 0) {
100      this.handleData(head);
101    }
102  }
103
104  sendJson(payload: JsonRecord): boolean {
105    if (this.closed) {
106      return false;
107    }
108
109    try {
110      this.socket.write(buildFrame(0x1, Buffer.from(`${JSON.stringify(payload)}\n`, "utf8")));
111      return true;
112    } catch {
113      this.handleClosed();
114      return false;
115    }
116  }
117
118  close(code = NORMAL_CLOSE_CODE, reason = ""): void {
119    if (this.closed) {
120      return;
121    }
122
123    this.closed = true;
124
125    try {
126      this.socket.write(buildFrame(0x8, buildClosePayload(code, reason)));
127    } catch {
128      // Best effort.
129    }
130
131    this.socket.end();
132    this.socket.destroySoon?.();
133    this.onClose(this);
134  }
135
136  private handleClosed(): void {
137    if (this.closed) {
138      return;
139    }
140
141    this.closed = true;
142    this.socket.destroy();
143    this.onClose(this);
144  }
145
146  private handleData(chunk: Buffer): void {
147    if (this.closed) {
148      return;
149    }
150
151    this.buffer = Buffer.concat([this.buffer, chunk]);
152
153    while (true) {
154      const frame = this.readFrame();
155
156      if (frame == null) {
157        return;
158      }
159
160      if (frame.opcode === 0x8) {
161        this.close();
162        return;
163      }
164
165      if (frame.opcode === 0x9) {
166        this.socket.write(buildFrame(0xA, frame.payload));
167        continue;
168      }
169
170      if (frame.opcode === 0xA) {
171        continue;
172      }
173
174      if (frame.opcode !== 0x1) {
175        this.close(UNSUPPORTED_DATA_CLOSE_CODE, "Only text websocket frames are supported.");
176        return;
177      }
178
179      let payload: JsonRecord | null = null;
180
181      try {
182        payload = asRecord(JSON.parse(frame.payload.toString("utf8")));
183      } catch {
184        payload = null;
185      }
186
187      if (payload == null) {
188        this.close(INVALID_MESSAGE_CLOSE_CODE, "WS payload must be a JSON object.");
189        return;
190      }
191
192      this.onMessage(this, payload);
193    }
194  }
195
196  private readFrame(): { opcode: number; payload: Buffer } | null {
197    if (this.buffer.length < 2) {
198      return null;
199    }
200
201    const firstByte = this.buffer[0];
202    const secondByte = this.buffer[1];
203
204    if (firstByte == null || secondByte == null) {
205      return null;
206    }
207
208    const fin = (firstByte & 0x80) !== 0;
209    const opcode = firstByte & 0x0f;
210    const masked = (secondByte & 0x80) !== 0;
211    let payloadLength = secondByte & 0x7f;
212    let offset = 2;
213
214    if (!fin) {
215      this.close(UNSUPPORTED_DATA_CLOSE_CODE, "Fragmented websocket frames are not supported.");
216      return null;
217    }
218
219    if (!masked) {
220      this.close(INVALID_MESSAGE_CLOSE_CODE, "Client websocket frames must be masked.");
221      return null;
222    }
223
224    if (payloadLength === 126) {
225      if (this.buffer.length < offset + 2) {
226        return null;
227      }
228
229      payloadLength = this.buffer.readUInt16BE(offset);
230      offset += 2;
231    } else if (payloadLength === 127) {
232      if (this.buffer.length < offset + 8) {
233        return null;
234      }
235
236      const extendedLength = this.buffer.readBigUInt64BE(offset);
237
238      if (extendedLength > BigInt(MAX_FRAME_PAYLOAD_BYTES)) {
239        this.close(MESSAGE_TOO_LARGE_CLOSE_CODE, "WS payload is too large.");
240        return null;
241      }
242
243      payloadLength = Number(extendedLength);
244      offset += 8;
245    }
246
247    if (payloadLength > MAX_FRAME_PAYLOAD_BYTES) {
248      this.close(MESSAGE_TOO_LARGE_CLOSE_CODE, "WS payload is too large.");
249      return null;
250    }
251
252    if (this.buffer.length < offset + 4 + payloadLength) {
253      return null;
254    }
255
256    const mask = this.buffer.subarray(offset, offset + 4);
257    offset += 4;
258    const payload = this.buffer.subarray(offset, offset + payloadLength);
259    const unmasked = Buffer.allocUnsafe(payloadLength);
260
261    for (let index = 0; index < payloadLength; index += 1) {
262      const maskByte = mask[index % 4];
263      const payloadByte = payload[index];
264
265      if (maskByte == null || payloadByte == null) {
266        this.close(INVALID_MESSAGE_CLOSE_CODE, "Malformed websocket payload.");
267        return null;
268      }
269
270      unmasked[index] = payloadByte ^ maskByte;
271    }
272
273    this.buffer = this.buffer.subarray(offset + payloadLength);
274
275    return {
276      opcode,
277      payload: unmasked
278    };
279  }
280}
281
282export interface CodexdWebSocketServerOptions {
283  onClientMessage?: (connection: CodexdWebSocketConnection, message: JsonRecord) => void;
284  onConnected?: (connection: CodexdWebSocketConnection) => void;
285  path: string;
286}
287
288export class CodexdWebSocketServer {
289  private readonly connections = new Set<CodexdWebSocketConnection>();
290  private readonly onClientMessage: (
291    connection: CodexdWebSocketConnection,
292    message: JsonRecord
293  ) => void;
294  private readonly onConnected: (connection: CodexdWebSocketConnection) => void;
295
296  constructor(private readonly options: CodexdWebSocketServerOptions) {
297    this.onClientMessage = options.onClientMessage ?? defaultOnClientMessage;
298    this.onConnected = options.onConnected ?? (() => {});
299  }
300
301  broadcast(payload: JsonRecord): void {
302    for (const connection of [...this.connections]) {
303      connection.sendJson(payload);
304    }
305  }
306
307  getConnectionCount(): number {
308    return this.connections.size;
309  }
310
311  handleUpgrade(request: IncomingMessage, socket: Socket, head: Buffer): boolean {
312    const pathname = normalizePathname(new URL(request.url ?? "/", "http://127.0.0.1").pathname);
313
314    if (pathname !== normalizePathname(this.options.path)) {
315      socket.write("HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n");
316      socket.destroy();
317      return false;
318    }
319
320    const upgrade = normalizeNonEmptyString(request.headers.upgrade);
321    const key = normalizeNonEmptyString(request.headers["sec-websocket-key"]);
322    const version = normalizeNonEmptyString(request.headers["sec-websocket-version"]);
323
324    if (request.method !== "GET" || upgrade?.toLowerCase() !== "websocket" || key == null) {
325      socket.write("HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n");
326      socket.destroy();
327      return false;
328    }
329
330    if (version !== "13") {
331      socket.write(
332        "HTTP/1.1 426 Upgrade Required\r\nSec-WebSocket-Version: 13\r\nConnection: close\r\n\r\n"
333      );
334      socket.destroy();
335      return false;
336    }
337
338    socket.write(
339      [
340        "HTTP/1.1 101 Switching Protocols",
341        "Upgrade: websocket",
342        "Connection: Upgrade",
343        `Sec-WebSocket-Accept: ${buildWebSocketAcceptValue(key)}`,
344        "",
345        ""
346      ].join("\r\n")
347    );
348
349    const connection = new CodexdWebSocketConnection(
350      socket,
351      (nextConnection, message) => {
352        this.onClientMessage(nextConnection, message);
353      },
354      (nextConnection) => {
355        this.unregister(nextConnection);
356      }
357    );
358
359    this.connections.add(connection);
360    connection.attachHead(head);
361    this.onConnected(connection);
362    return true;
363  }
364
365  async stop(): Promise<void> {
366    for (const connection of [...this.connections]) {
367      connection.close(1001, "server shutdown");
368    }
369
370    this.connections.clear();
371  }
372
373  private unregister(connection: CodexdWebSocketConnection): void {
374    this.connections.delete(connection);
375  }
376}
377
378function defaultOnClientMessage(
379  connection: CodexdWebSocketConnection,
380  message: JsonRecord
381): void {
382  const type = normalizeNonEmptyString(message.type);
383
384  if (type === "ping") {
385    connection.sendJson({
386      type: "pong"
387    });
388  }
389}