baa-conductor


baa-conductor / packages / codex-app-server / src
im_wower  ·  2026-03-22

events.ts

 1import type { CodexAppServerEvent } from "./contracts.js";
 2
 3export interface CodexAppServerEventSubscription {
 4  unsubscribe(): void;
 5}
 6
 7export type CodexAppServerEventListener = (event: CodexAppServerEvent) => void;
 8
 9interface PendingIterator {
10  resolve(result: IteratorResult<CodexAppServerEvent>): void;
11}
12
13export class CodexAppServerEventStream implements AsyncIterable<CodexAppServerEvent> {
14  private readonly listeners = new Set<CodexAppServerEventListener>();
15  private readonly queue: CodexAppServerEvent[] = [];
16  private readonly waiters: PendingIterator[] = [];
17  private closed = false;
18
19  emit(event: CodexAppServerEvent): void {
20    if (this.closed) {
21      return;
22    }
23
24    const waiter = this.waiters.shift();
25
26    if (waiter !== undefined) {
27      waiter.resolve({
28        done: false,
29        value: event
30      });
31    } else {
32      this.queue.push(event);
33    }
34
35    for (const listener of this.listeners) {
36      listener(event);
37    }
38  }
39
40  close(): void {
41    if (this.closed) {
42      return;
43    }
44
45    this.closed = true;
46
47    while (this.waiters.length > 0) {
48      const waiter = this.waiters.shift();
49
50      waiter?.resolve({
51        done: true,
52        value: undefined
53      });
54    }
55  }
56
57  subscribe(listener: CodexAppServerEventListener): CodexAppServerEventSubscription {
58    this.listeners.add(listener);
59
60    return {
61      unsubscribe: () => {
62        this.listeners.delete(listener);
63      }
64    };
65  }
66
67  [Symbol.asyncIterator](): AsyncIterator<CodexAppServerEvent> {
68    return {
69      next: async (): Promise<IteratorResult<CodexAppServerEvent>> => {
70        const buffered = this.queue.shift();
71
72        if (buffered !== undefined) {
73          return {
74            done: false,
75            value: buffered
76          };
77        }
78
79        if (this.closed) {
80          return {
81            done: true,
82            value: undefined
83          };
84        }
85
86        return await new Promise<IteratorResult<CodexAppServerEvent>>((resolve) => {
87          this.waiters.push({ resolve });
88        });
89      }
90    };
91  }
92}