baa-conductor


baa-conductor / apps / codexd / src
im_wower  ·  2026-03-23

local-service.ts

  1import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
  2import type { AddressInfo } from "node:net";
  3
  4import {
  5  CodexdDaemon,
  6  type CodexdCreateSessionInput,
  7  type CodexdDaemonOptions,
  8  type CodexdTurnInput
  9} from "./daemon.js";
 10import type {
 11  CodexdResolvedConfig,
 12  CodexdSessionPurpose,
 13  CodexdSessionRecord,
 14  CodexdStatusSnapshot
 15} from "./contracts.js";
 16import { CodexdWebSocketServer, type CodexdWebSocketConnection } from "./websocket.js";
 17
 18interface CodexdHttpResponse {
 19  body: string;
 20  headers: Record<string, string>;
 21  status: number;
 22}
 23
 24type JsonRecord = Record<string, unknown>;
 25
 26export interface CodexdDescribeRoute {
 27  description: string;
 28  method: "GET" | "POST" | "WS";
 29  path: string;
 30}
 31
 32export interface CodexdDescribeResponse {
 33  ok: true;
 34  name: string;
 35  surface: string;
 36  description: string;
 37  mode: {
 38    current: CodexdResolvedConfig["server"]["mode"];
 39    daemon: string;
 40    supervisor: string;
 41    transport: string;
 42    conductor_role: string;
 43  };
 44  base_url: string;
 45  event_stream: {
 46    path: string;
 47    transport: string;
 48    url: string;
 49  };
 50  routes: CodexdDescribeRoute[];
 51  capabilities: {
 52    health_probe: boolean;
 53    session_create: boolean;
 54    session_list: boolean;
 55    session_read: boolean;
 56    turn_create: boolean;
 57    turn_readback_via_session: boolean;
 58    websocket_events: boolean;
 59  };
 60  notes: string[];
 61}
 62
 63const CODEXD_FORMAL_ROUTES: CodexdDescribeRoute[] = [
 64  {
 65    description: "Lightweight health probe for the local daemon.",
 66    method: "GET",
 67    path: "/healthz"
 68  },
 69  {
 70    description: "Machine-readable description of the official codexd surface.",
 71    method: "GET",
 72    path: "/describe"
 73  },
 74  {
 75    description: "Current daemon, child, session, and recent event snapshot.",
 76    method: "GET",
 77    path: "/v1/codexd/status"
 78  },
 79  {
 80    description: "List the currently known codexd sessions.",
 81    method: "GET",
 82    path: "/v1/codexd/sessions"
 83  },
 84  {
 85    description: "Read a single codexd session and its recent events.",
 86    method: "GET",
 87    path: "/v1/codexd/sessions/:session_id"
 88  },
 89  {
 90    description: "Create or resume a codexd session.",
 91    method: "POST",
 92    path: "/v1/codexd/sessions"
 93  },
 94  {
 95    description: "Submit one turn to an existing codexd session.",
 96    method: "POST",
 97    path: "/v1/codexd/turn"
 98  },
 99  {
100    description: "WebSocket event stream for live codexd session and daemon updates.",
101    method: "WS",
102    path: "/v1/codexd/events"
103  }
104];
105
106export interface CodexdLocalServiceRuntimeInfo {
107  configuredBaseUrl: string;
108  eventStreamPath: string;
109  eventStreamUrl: string | null;
110  listening: boolean;
111  resolvedBaseUrl: string | null;
112  websocketClients: number;
113}
114
115export interface CodexdLocalServiceStatus {
116  service: CodexdLocalServiceRuntimeInfo;
117  snapshot: CodexdStatusSnapshot;
118}
119
120class CodexdHttpError extends Error {
121  constructor(
122    readonly status: number,
123    message: string
124  ) {
125    super(message);
126    this.name = "CodexdHttpError";
127  }
128}
129
130export class CodexdLocalService {
131  private readonly daemon: CodexdDaemon;
132  private resolvedBaseUrl: string | null = null;
133  private server: Server | null = null;
134  private readonly websocketServer;
135
136  constructor(
137    private readonly config: CodexdResolvedConfig,
138    options: CodexdDaemonOptions = {}
139  ) {
140    this.websocketServer = new CodexdWebSocketServer({
141      onClientMessage: (connection, message) => {
142        this.handleWebSocketMessage(connection, message);
143      },
144      onConnected: (connection) => {
145        this.handleWebSocketConnected(connection);
146      },
147      path: config.service.eventStreamPath
148    });
149    this.daemon = new CodexdDaemon(config, options);
150    this.daemon.subscribe((event) => {
151      this.websocketServer.broadcast({
152        event,
153        type: "event"
154      });
155    });
156  }
157
158  getDaemon(): CodexdDaemon {
159    return this.daemon;
160  }
161
162  getStatus(): CodexdLocalServiceStatus {
163    return {
164      service: this.getRuntimeInfo(),
165      snapshot: this.daemon.getStatusSnapshot()
166    };
167  }
168
169  getDescribe(): CodexdDescribeResponse {
170    const baseUrl = this.resolvedBaseUrl ?? this.config.service.localApiBase;
171
172    return {
173      base_url: baseUrl,
174      capabilities: {
175        health_probe: true,
176        session_create: true,
177        session_list: true,
178        session_read: true,
179        turn_create: true,
180        turn_readback_via_session: true,
181        websocket_events: true
182      },
183      description:
184        "Independent local Codex daemon for session lifecycle, turn submission, status reads, and live event streaming.",
185      event_stream: {
186        path: this.config.service.eventStreamPath,
187        transport: "websocket",
188        url: buildEventStreamUrl(baseUrl, this.config.service.eventStreamPath)
189      },
190      mode: {
191        conductor_role: "proxy",
192        current: this.config.server.mode,
193        daemon: "independent",
194        supervisor: "launchd",
195        transport: "codex app-server"
196      },
197      name: "codexd",
198      notes: [
199        "Use GET /describe first when an AI client needs to discover the official local codexd surface.",
200        "codexd is the long-running Codex runtime; conductor-daemon only proxies this service and does not host the Codex session truth.",
201        "This surface is limited to health, status, sessions, turn submission, and websocket event consumption."
202      ],
203      ok: true,
204      routes: CODEXD_FORMAL_ROUTES.map((route) => ({ ...route })),
205      surface: "local-api"
206    };
207  }
208
209  async start(): Promise<CodexdLocalServiceStatus> {
210    if (this.server != null) {
211      return this.getStatus();
212    }
213
214    await this.daemon.start();
215
216    const listenConfig = resolveLocalListenConfig(this.config.service.localApiBase);
217    const server = createServer((request, response) => {
218      void this.handleRequest(request, response);
219    });
220
221    server.on("upgrade", (request, socket, head) => {
222      this.websocketServer.handleUpgrade(request, socket, head);
223    });
224
225    try {
226      await new Promise<void>((resolve, reject) => {
227        const onError = (error: Error) => {
228          server.off("listening", onListening);
229          reject(error);
230        };
231        const onListening = () => {
232          server.off("error", onError);
233          resolve();
234        };
235
236        server.once("error", onError);
237        server.once("listening", onListening);
238        server.listen({
239          host: listenConfig.host,
240          port: listenConfig.port
241        });
242      });
243    } catch (error) {
244      await this.daemon.stop();
245      throw error;
246    }
247
248    const address = server.address();
249
250    if (address == null || typeof address === "string") {
251      server.close();
252      await this.daemon.stop();
253      throw new Error("codexd local service started without a TCP address.");
254    }
255
256    this.server = server;
257    this.resolvedBaseUrl = formatLocalApiBaseUrl(address.address, (address as AddressInfo).port);
258    return this.getStatus();
259  }
260
261  async stop(): Promise<CodexdLocalServiceStatus> {
262    if (this.server != null) {
263      const server = this.server;
264      this.server = null;
265      await this.websocketServer.stop();
266
267      await new Promise<void>((resolve, reject) => {
268        server.close((error) => {
269          if (error) {
270            reject(error);
271            return;
272          }
273
274          resolve();
275        });
276        server.closeAllConnections?.();
277      });
278    }
279
280    const snapshot = await this.daemon.stop();
281    this.resolvedBaseUrl = null;
282
283    return {
284      service: this.getRuntimeInfo(),
285      snapshot
286    };
287  }
288
289  private getRuntimeInfo(): CodexdLocalServiceRuntimeInfo {
290    return {
291      configuredBaseUrl: this.config.service.localApiBase,
292      eventStreamPath: this.config.service.eventStreamPath,
293      eventStreamUrl:
294        this.resolvedBaseUrl == null
295          ? null
296          : buildEventStreamUrl(this.resolvedBaseUrl, this.config.service.eventStreamPath),
297      listening: this.server != null,
298      resolvedBaseUrl: this.resolvedBaseUrl,
299      websocketClients: this.websocketServer.getConnectionCount()
300    };
301  }
302
303  private handleWebSocketConnected(connection: CodexdWebSocketConnection): void {
304    connection.sendJson({
305      recentEvents: this.daemon.getStatusSnapshot().recentEvents.events,
306      service: this.getRuntimeInfo(),
307      snapshot: this.daemon.getStatusSnapshot(),
308      type: "hello"
309    });
310  }
311
312  private handleWebSocketMessage(
313    connection: CodexdWebSocketConnection,
314    message: JsonRecord
315  ): void {
316    const type = readOptionalString(message.type);
317
318    switch (type) {
319      case "ping":
320        connection.sendJson({
321          type: "pong"
322        });
323        return;
324
325      case "status":
326        connection.sendJson({
327          service: this.getRuntimeInfo(),
328          snapshot: this.daemon.getStatusSnapshot(),
329          type: "status"
330        });
331        return;
332
333      default:
334        connection.sendJson({
335          message: `Unsupported websocket message type: ${type ?? "unknown"}.`,
336          type: "error"
337        });
338    }
339  }
340
341  private async handleRequest(
342    request: IncomingMessage,
343    response: ServerResponse<IncomingMessage>
344  ): Promise<void> {
345    try {
346      const payload = await this.routeHttpRequest({
347        body: await readIncomingRequestBody(request),
348        method: request.method ?? "GET",
349        path: request.url ?? "/"
350      });
351      writeHttpResponse(response, payload);
352    } catch (error) {
353      const status = error instanceof CodexdHttpError ? error.status : 500;
354      writeHttpResponse(
355        response,
356        jsonResponse(status, {
357          error: status >= 500 ? "internal_error" : "bad_request",
358          message: error instanceof Error ? error.message : String(error),
359          ok: false
360        })
361      );
362    }
363  }
364
365  private async routeHttpRequest(input: {
366    body: string | null;
367    method: string;
368    path: string;
369  }): Promise<CodexdHttpResponse> {
370    const method = input.method.toUpperCase();
371    const url = new URL(input.path, "http://127.0.0.1");
372    const pathname = normalizePathname(url.pathname);
373    const body = parseJsonObject(input.body);
374
375    if (method === "GET" && pathname === "/healthz") {
376      return jsonResponse(200, {
377        ok: true,
378        service: this.getRuntimeInfo(),
379        status: "ok"
380      });
381    }
382
383    if (method === "GET" && pathname === "/describe") {
384      return jsonResponse(200, this.getDescribe());
385    }
386
387    if (method === "GET" && pathname === "/v1/codexd/status") {
388      return jsonResponse(200, {
389        data: this.getStatus(),
390        ok: true
391      });
392    }
393
394    if (method === "GET" && pathname === "/v1/codexd/sessions") {
395      return jsonResponse(200, {
396        data: {
397          sessions: this.daemon.listSessions()
398        },
399        ok: true
400      });
401    }
402
403    if (method === "POST" && pathname === "/v1/codexd/sessions") {
404      const session = await this.daemon.createSession(parseCreateSessionInput(body));
405      return jsonResponse(201, {
406        data: {
407          session
408        },
409        ok: true
410      });
411    }
412
413    if (method === "POST" && pathname === "/v1/codexd/turn") {
414      const result = await this.daemon.createTurn(parseCreateTurnInput(body));
415      return jsonResponse(202, {
416        data: result,
417        ok: true
418      });
419    }
420
421    const sessionMatch = pathname.match(/^\/v1\/codexd\/sessions\/([^/]+)$/u);
422
423    if (method === "GET" && sessionMatch?.[1] != null) {
424      const sessionId = decodeURIComponent(sessionMatch[1]);
425      const session = this.daemon.getSession(sessionId);
426
427      if (session == null) {
428        throw new CodexdHttpError(404, `Unknown codexd session "${sessionId}".`);
429      }
430
431      return jsonResponse(200, {
432        data: {
433          recentEvents: findEventsForSession(this.daemon.getStatusSnapshot(), session),
434          session
435        },
436        ok: true
437      });
438    }
439
440    throw new CodexdHttpError(404, `Unknown codexd route ${method} ${pathname}.`);
441  }
442}
443
444function asRecord(value: unknown): JsonRecord | null {
445  if (value === null || typeof value !== "object" || Array.isArray(value)) {
446    return null;
447  }
448
449  return value as JsonRecord;
450}
451
452function buildEventStreamUrl(baseUrl: string, path: string): string {
453  const url = new URL(baseUrl);
454  url.protocol = url.protocol === "https:" ? "wss:" : "ws:";
455  url.pathname = normalizePathname(path);
456  url.search = "";
457  url.hash = "";
458  return url.toString();
459}
460
461function findEventsForSession(
462  snapshot: CodexdStatusSnapshot,
463  session: CodexdSessionRecord
464): ReturnType<CodexdStatusSnapshot["recentEvents"]["events"]["slice"]> {
465  return snapshot.recentEvents.events.filter((event) => {
466    const detail = event.detail;
467
468    if (detail == null) {
469      return false;
470    }
471
472    return detail.sessionId === session.sessionId || detail.threadId === session.threadId;
473  });
474}
475
476function formatLocalApiBaseUrl(hostname: string, port: number): string {
477  const formattedHost = hostname.includes(":") ? `[${hostname}]` : hostname;
478  return `http://${formattedHost}${port === 80 ? "" : `:${port}`}`;
479}
480
481function isLoopbackHost(hostname: string): boolean {
482  return hostname === "127.0.0.1" || hostname === "::1" || hostname === "localhost";
483}
484
485function jsonResponse(status: number, payload: unknown): CodexdHttpResponse {
486  return {
487    body: `${JSON.stringify(payload, null, 2)}\n`,
488    headers: {
489      "cache-control": "no-store",
490      "content-type": "application/json; charset=utf-8"
491    },
492    status
493  };
494}
495
496function normalizePathname(value: string): string {
497  const normalized = value.replace(/\/+$/u, "");
498  return normalized === "" ? "/" : normalized;
499}
500
501function parseCreateSessionInput(body: JsonRecord): CodexdCreateSessionInput {
502  return {
503    approvalPolicy: (body.approvalPolicy as CodexdCreateSessionInput["approvalPolicy"]) ?? null,
504    baseInstructions: readOptionalString(body.baseInstructions),
505    config: (asRecord(body.config) as CodexdCreateSessionInput["config"]) ?? null,
506    cwd: readOptionalString(body.cwd),
507    developerInstructions: readOptionalString(body.developerInstructions),
508    ephemeral:
509      typeof body.ephemeral === "boolean"
510        ? body.ephemeral
511        : null,
512    metadata: readStringMap(body.metadata),
513    model: readOptionalString(body.model),
514    modelProvider: readOptionalString(body.modelProvider),
515    personality: readOptionalString(body.personality),
516    purpose: readSessionPurpose(body.purpose),
517    sandbox: (body.sandbox as CodexdCreateSessionInput["sandbox"]) ?? null,
518    serviceTier: readOptionalString(body.serviceTier),
519    threadId: readOptionalString(body.threadId)
520  };
521}
522
523function parseCreateTurnInput(body: JsonRecord): CodexdTurnInput {
524  const rawInput = body.input ?? body.prompt;
525  const inputValue =
526    typeof rawInput === "string" || Array.isArray(rawInput)
527      ? rawInput
528      : null;
529
530  if (inputValue == null) {
531    throw new CodexdHttpError(400, "turn requires input as a string or item array.");
532  }
533
534  return {
535    approvalPolicy: (body.approvalPolicy as CodexdTurnInput["approvalPolicy"]) ?? null,
536    collaborationMode: (body.collaborationMode as CodexdTurnInput["collaborationMode"]) ?? null,
537    cwd: readOptionalString(body.cwd),
538    effort: readOptionalString(body.effort),
539    expectedTurnId: readOptionalString(body.expectedTurnId),
540    input: inputValue as CodexdTurnInput["input"],
541    model: readOptionalString(body.model),
542    outputSchema: (body.outputSchema as CodexdTurnInput["outputSchema"]) ?? null,
543    personality: readOptionalString(body.personality),
544    sandboxPolicy: (body.sandboxPolicy as CodexdTurnInput["sandboxPolicy"]) ?? null,
545    serviceTier: readOptionalString(body.serviceTier),
546    sessionId: readRequiredString(body.sessionId, "sessionId"),
547    summary: (body.summary as CodexdTurnInput["summary"]) ?? null
548  };
549}
550
551function parseJsonObject(body: string | null): JsonRecord {
552  if (body == null || body.trim() === "") {
553    return {};
554  }
555
556  let parsed: unknown;
557
558  try {
559    parsed = JSON.parse(body);
560  } catch {
561    throw new CodexdHttpError(400, "Request body must be valid JSON.");
562  }
563
564  const record = asRecord(parsed);
565
566  if (record == null) {
567    throw new CodexdHttpError(400, "Request body must be a JSON object.");
568  }
569
570  return record;
571}
572
573function readHttpPort(url: URL): number {
574  return url.port === "" ? 80 : Number.parseInt(url.port, 10);
575}
576
577async function readIncomingRequestBody(request: IncomingMessage): Promise<string | null> {
578  if (request.method == null || request.method.toUpperCase() === "GET") {
579    return null;
580  }
581
582  return await new Promise((resolve, reject) => {
583    let body = "";
584    request.setEncoding?.("utf8");
585    request.on?.("data", (chunk) => {
586      body += typeof chunk === "string" ? chunk : String(chunk);
587    });
588    request.on?.("end", () => {
589      resolve(body === "" ? null : body);
590    });
591    request.on?.("error", (error) => {
592      reject(error);
593    });
594  });
595}
596
597function readOptionalBoolean(value: unknown): boolean | undefined {
598  return typeof value === "boolean" ? value : undefined;
599}
600
601function readOptionalInteger(value: unknown): number | undefined {
602  return typeof value === "number" && Number.isInteger(value) ? value : undefined;
603}
604
605function readOptionalString(value: unknown): string | null {
606  if (typeof value !== "string") {
607    return null;
608  }
609
610  const normalized = value.trim();
611  return normalized === "" ? null : normalized;
612}
613
614function readRequiredString(value: unknown, field: string): string {
615  const normalized = readOptionalString(value);
616
617  if (normalized == null) {
618    throw new CodexdHttpError(400, `${field} must be a non-empty string.`);
619  }
620
621  return normalized;
622}
623
624function readSessionPurpose(value: unknown): CodexdSessionPurpose {
625  const normalized = readOptionalString(value) ?? "duplex";
626
627  if (normalized === "duplex" || normalized === "smoke" || normalized === "worker") {
628    return normalized;
629  }
630
631  throw new CodexdHttpError(400, `Unsupported session purpose "${String(value)}".`);
632}
633
634function readStringArray(value: unknown): string[] | undefined {
635  if (value == null) {
636    return undefined;
637  }
638
639  if (!Array.isArray(value)) {
640    throw new CodexdHttpError(400, "Expected an array of strings.");
641  }
642
643  const items: string[] = [];
644
645  for (const entry of value) {
646    const item = readOptionalString(entry);
647
648    if (item == null) {
649      throw new CodexdHttpError(400, "Expected an array of strings.");
650    }
651
652    items.push(item);
653  }
654
655  return items;
656}
657
658function readStringMap(value: unknown): Record<string, string> | undefined {
659  if (value == null) {
660    return undefined;
661  }
662
663  const record = asRecord(value);
664
665  if (record == null) {
666    throw new CodexdHttpError(400, "Expected a string map.");
667  }
668
669  const result: Record<string, string> = {};
670
671  for (const [key, entry] of Object.entries(record)) {
672    const normalizedKey = readOptionalString(key);
673    const normalizedValue = readOptionalString(entry);
674
675    if (normalizedKey == null || normalizedValue == null) {
676      throw new CodexdHttpError(400, "Expected a string map.");
677    }
678
679    result[normalizedKey] = normalizedValue;
680  }
681
682  return result;
683}
684
685function resolveLocalListenConfig(localApiBase: string): { host: string; port: number } {
686  let url: URL;
687
688  try {
689    url = new URL(localApiBase);
690  } catch {
691    throw new Error("codexd localApiBase must be a valid absolute http:// URL.");
692  }
693
694  if (url.protocol !== "http:") {
695    throw new Error("codexd localApiBase must use the http:// scheme.");
696  }
697
698  if (!isLoopbackHost(url.hostname)) {
699    throw new Error("codexd localApiBase must use a loopback host.");
700  }
701
702  if (url.pathname !== "/" || url.search !== "" || url.hash !== "") {
703    throw new Error("codexd localApiBase must not include path, query, or hash.");
704  }
705
706  if (url.username !== "" || url.password !== "") {
707    throw new Error("codexd localApiBase must not include credentials.");
708  }
709
710  return {
711    host: url.hostname === "localhost" ? "127.0.0.1" : url.hostname,
712    port: readHttpPort(url)
713  };
714}
715
716function writeHttpResponse(
717  response: ServerResponse<IncomingMessage>,
718  payload: CodexdHttpResponse
719): void {
720  response.statusCode = payload.status;
721
722  for (const [name, value] of Object.entries(payload.headers)) {
723    response.setHeader(name, value);
724  }
725
726  response.end(payload.body);
727}