im_wower
·
2026-03-22
websocket.ts
1import { createHash, randomUUID } from "node:crypto";
2import type { IncomingMessage } from "node:http";
3import type { Socket } from "node:net";
4
5const NORMAL_CLOSE_CODE = 1000;
6const INVALID_MESSAGE_CLOSE_CODE = 4002;
7const UNSUPPORTED_DATA_CLOSE_CODE = 1003;
8const MESSAGE_TOO_LARGE_CLOSE_CODE = 1009;
9const MAX_FRAME_PAYLOAD_BYTES = 1024 * 1024;
10const WS_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
11
12type JsonRecord = Record<string, unknown>;
13
14function asRecord(value: unknown): JsonRecord | null {
15 if (value === null || typeof value !== "object" || Array.isArray(value)) {
16 return null;
17 }
18
19 return value as JsonRecord;
20}
21
22function normalizeNonEmptyString(value: unknown): string | null {
23 if (typeof value !== "string") {
24 return null;
25 }
26
27 const normalized = value.trim();
28 return normalized === "" ? null : normalized;
29}
30
31function normalizePathname(value: string): string {
32 const normalized = value.replace(/\/+$/u, "");
33 return normalized === "" ? "/" : normalized;
34}
35
36function buildWebSocketAcceptValue(key: string): string {
37 return createHash("sha1").update(`${key}${WS_GUID}`).digest("base64");
38}
39
40function buildClosePayload(code: number, reason: string): Buffer {
41 const reasonBuffer = Buffer.from(reason, "utf8");
42 const payload = Buffer.allocUnsafe(2 + reasonBuffer.length);
43 payload.writeUInt16BE(code, 0);
44 reasonBuffer.copy(payload, 2);
45 return payload;
46}
47
48function buildFrame(opcode: number, payload: Buffer = Buffer.alloc(0)): Buffer {
49 let header: Buffer;
50
51 if (payload.length < 126) {
52 header = Buffer.allocUnsafe(2);
53 header[0] = 0x80 | opcode;
54 header[1] = payload.length;
55 return Buffer.concat([header, payload]);
56 }
57
58 if (payload.length <= 0xffff) {
59 header = Buffer.allocUnsafe(4);
60 header[0] = 0x80 | opcode;
61 header[1] = 126;
62 header.writeUInt16BE(payload.length, 2);
63 return Buffer.concat([header, payload]);
64 }
65
66 header = Buffer.allocUnsafe(10);
67 header[0] = 0x80 | opcode;
68 header[1] = 127;
69 header.writeBigUInt64BE(BigInt(payload.length), 2);
70 return Buffer.concat([header, payload]);
71}
72
73export class CodexdWebSocketConnection {
74 readonly id = randomUUID();
75 private buffer = Buffer.alloc(0);
76 private closed = false;
77
78 constructor(
79 private readonly socket: Socket,
80 private readonly onMessage: (connection: CodexdWebSocketConnection, message: JsonRecord) => void,
81 private readonly onClose: (connection: CodexdWebSocketConnection) => void
82 ) {
83 this.socket.setNoDelay(true);
84 this.socket.on("data", (chunk) => {
85 this.handleData(chunk);
86 });
87 this.socket.on("close", () => {
88 this.handleClosed();
89 });
90 this.socket.on("end", () => {
91 this.handleClosed();
92 });
93 this.socket.on("error", () => {
94 this.handleClosed();
95 });
96 }
97
98 attachHead(head: Buffer): void {
99 if (head.length > 0) {
100 this.handleData(head);
101 }
102 }
103
104 sendJson(payload: JsonRecord): boolean {
105 if (this.closed) {
106 return false;
107 }
108
109 try {
110 this.socket.write(buildFrame(0x1, Buffer.from(`${JSON.stringify(payload)}\n`, "utf8")));
111 return true;
112 } catch {
113 this.handleClosed();
114 return false;
115 }
116 }
117
118 close(code = NORMAL_CLOSE_CODE, reason = ""): void {
119 if (this.closed) {
120 return;
121 }
122
123 this.closed = true;
124
125 try {
126 this.socket.write(buildFrame(0x8, buildClosePayload(code, reason)));
127 } catch {
128 // Best effort.
129 }
130
131 this.socket.end();
132 this.socket.destroySoon?.();
133 this.onClose(this);
134 }
135
136 private handleClosed(): void {
137 if (this.closed) {
138 return;
139 }
140
141 this.closed = true;
142 this.socket.destroy();
143 this.onClose(this);
144 }
145
146 private handleData(chunk: Buffer): void {
147 if (this.closed) {
148 return;
149 }
150
151 this.buffer = Buffer.concat([this.buffer, chunk]);
152
153 while (true) {
154 const frame = this.readFrame();
155
156 if (frame == null) {
157 return;
158 }
159
160 if (frame.opcode === 0x8) {
161 this.close();
162 return;
163 }
164
165 if (frame.opcode === 0x9) {
166 this.socket.write(buildFrame(0xA, frame.payload));
167 continue;
168 }
169
170 if (frame.opcode === 0xA) {
171 continue;
172 }
173
174 if (frame.opcode !== 0x1) {
175 this.close(UNSUPPORTED_DATA_CLOSE_CODE, "Only text websocket frames are supported.");
176 return;
177 }
178
179 let payload: JsonRecord | null = null;
180
181 try {
182 payload = asRecord(JSON.parse(frame.payload.toString("utf8")));
183 } catch {
184 payload = null;
185 }
186
187 if (payload == null) {
188 this.close(INVALID_MESSAGE_CLOSE_CODE, "WS payload must be a JSON object.");
189 return;
190 }
191
192 this.onMessage(this, payload);
193 }
194 }
195
196 private readFrame(): { opcode: number; payload: Buffer } | null {
197 if (this.buffer.length < 2) {
198 return null;
199 }
200
201 const firstByte = this.buffer[0];
202 const secondByte = this.buffer[1];
203
204 if (firstByte == null || secondByte == null) {
205 return null;
206 }
207
208 const fin = (firstByte & 0x80) !== 0;
209 const opcode = firstByte & 0x0f;
210 const masked = (secondByte & 0x80) !== 0;
211 let payloadLength = secondByte & 0x7f;
212 let offset = 2;
213
214 if (!fin) {
215 this.close(UNSUPPORTED_DATA_CLOSE_CODE, "Fragmented websocket frames are not supported.");
216 return null;
217 }
218
219 if (!masked) {
220 this.close(INVALID_MESSAGE_CLOSE_CODE, "Client websocket frames must be masked.");
221 return null;
222 }
223
224 if (payloadLength === 126) {
225 if (this.buffer.length < offset + 2) {
226 return null;
227 }
228
229 payloadLength = this.buffer.readUInt16BE(offset);
230 offset += 2;
231 } else if (payloadLength === 127) {
232 if (this.buffer.length < offset + 8) {
233 return null;
234 }
235
236 const extendedLength = this.buffer.readBigUInt64BE(offset);
237
238 if (extendedLength > BigInt(MAX_FRAME_PAYLOAD_BYTES)) {
239 this.close(MESSAGE_TOO_LARGE_CLOSE_CODE, "WS payload is too large.");
240 return null;
241 }
242
243 payloadLength = Number(extendedLength);
244 offset += 8;
245 }
246
247 if (payloadLength > MAX_FRAME_PAYLOAD_BYTES) {
248 this.close(MESSAGE_TOO_LARGE_CLOSE_CODE, "WS payload is too large.");
249 return null;
250 }
251
252 if (this.buffer.length < offset + 4 + payloadLength) {
253 return null;
254 }
255
256 const mask = this.buffer.subarray(offset, offset + 4);
257 offset += 4;
258 const payload = this.buffer.subarray(offset, offset + payloadLength);
259 const unmasked = Buffer.allocUnsafe(payloadLength);
260
261 for (let index = 0; index < payloadLength; index += 1) {
262 const maskByte = mask[index % 4];
263 const payloadByte = payload[index];
264
265 if (maskByte == null || payloadByte == null) {
266 this.close(INVALID_MESSAGE_CLOSE_CODE, "Malformed websocket payload.");
267 return null;
268 }
269
270 unmasked[index] = payloadByte ^ maskByte;
271 }
272
273 this.buffer = this.buffer.subarray(offset + payloadLength);
274
275 return {
276 opcode,
277 payload: unmasked
278 };
279 }
280}
281
282export interface CodexdWebSocketServerOptions {
283 onClientMessage?: (connection: CodexdWebSocketConnection, message: JsonRecord) => void;
284 onConnected?: (connection: CodexdWebSocketConnection) => void;
285 path: string;
286}
287
288export class CodexdWebSocketServer {
289 private readonly connections = new Set<CodexdWebSocketConnection>();
290 private readonly onClientMessage: (
291 connection: CodexdWebSocketConnection,
292 message: JsonRecord
293 ) => void;
294 private readonly onConnected: (connection: CodexdWebSocketConnection) => void;
295
296 constructor(private readonly options: CodexdWebSocketServerOptions) {
297 this.onClientMessage = options.onClientMessage ?? defaultOnClientMessage;
298 this.onConnected = options.onConnected ?? (() => {});
299 }
300
301 broadcast(payload: JsonRecord): void {
302 for (const connection of [...this.connections]) {
303 connection.sendJson(payload);
304 }
305 }
306
307 getConnectionCount(): number {
308 return this.connections.size;
309 }
310
311 handleUpgrade(request: IncomingMessage, socket: Socket, head: Buffer): boolean {
312 const pathname = normalizePathname(new URL(request.url ?? "/", "http://127.0.0.1").pathname);
313
314 if (pathname !== normalizePathname(this.options.path)) {
315 socket.write("HTTP/1.1 404 Not Found\r\nConnection: close\r\n\r\n");
316 socket.destroy();
317 return false;
318 }
319
320 const upgrade = normalizeNonEmptyString(request.headers.upgrade);
321 const key = normalizeNonEmptyString(request.headers["sec-websocket-key"]);
322 const version = normalizeNonEmptyString(request.headers["sec-websocket-version"]);
323
324 if (request.method !== "GET" || upgrade?.toLowerCase() !== "websocket" || key == null) {
325 socket.write("HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n");
326 socket.destroy();
327 return false;
328 }
329
330 if (version !== "13") {
331 socket.write(
332 "HTTP/1.1 426 Upgrade Required\r\nSec-WebSocket-Version: 13\r\nConnection: close\r\n\r\n"
333 );
334 socket.destroy();
335 return false;
336 }
337
338 socket.write(
339 [
340 "HTTP/1.1 101 Switching Protocols",
341 "Upgrade: websocket",
342 "Connection: Upgrade",
343 `Sec-WebSocket-Accept: ${buildWebSocketAcceptValue(key)}`,
344 "",
345 ""
346 ].join("\r\n")
347 );
348
349 const connection = new CodexdWebSocketConnection(
350 socket,
351 (nextConnection, message) => {
352 this.onClientMessage(nextConnection, message);
353 },
354 (nextConnection) => {
355 this.unregister(nextConnection);
356 }
357 );
358
359 this.connections.add(connection);
360 connection.attachHead(head);
361 this.onConnected(connection);
362 return true;
363 }
364
365 async stop(): Promise<void> {
366 for (const connection of [...this.connections]) {
367 connection.close(1001, "server shutdown");
368 }
369
370 this.connections.clear();
371 }
372
373 private unregister(connection: CodexdWebSocketConnection): void {
374 this.connections.delete(connection);
375 }
376}
377
378function defaultOnClientMessage(
379 connection: CodexdWebSocketConnection,
380 message: JsonRecord
381): void {
382 const type = normalizeNonEmptyString(message.type);
383
384 if (type === "ping") {
385 connection.sendJson({
386 type: "pong"
387 });
388 }
389}