codex@macbookpro
·
2026-04-01
store.ts
1import {
2 DEFAULT_BAA_EXECUTION_JOURNAL_LIMIT,
3 type ControlPlaneRepository
4} from "../../../../packages/db/dist/index.js";
5
6import type { BaaInstructionDeduper } from "./dedupe.js";
7import type {
8 BaaLiveInstructionIngestSnapshot,
9 BaaLiveInstructionParseErrorSummary,
10 BaaLiveInstructionIngestSummary,
11 BaaLiveInstructionMessageDeduper
12} from "./ingest.js";
13import type { BaaInstructionEnvelope } from "./types.js";
14
15function cloneSummary(summary: BaaLiveInstructionIngestSummary): BaaLiveInstructionIngestSummary {
16 return {
17 ...summary,
18 duplicate_tools: [...summary.duplicate_tools],
19 executed_tools: [...summary.executed_tools],
20 instruction_tools: [...summary.instruction_tools],
21 parse_errors: summary.parse_errors.map((entry) => ({
22 ...entry
23 }))
24 };
25}
26
27function isStringArray(value: unknown): value is string[] {
28 return Array.isArray(value) && value.every((entry) => typeof entry === "string");
29}
30
31function isBaaLiveInstructionParseErrorSummary(
32 value: unknown
33): value is BaaLiveInstructionParseErrorSummary {
34 return (
35 value != null
36 && typeof value === "object"
37 && !Array.isArray(value)
38 && typeof (value as { block_index?: unknown }).block_index === "number"
39 && typeof (value as { message?: unknown }).message === "string"
40 && (
41 (value as { stage?: unknown }).stage === "normalize"
42 || (value as { stage?: unknown }).stage === "parse"
43 )
44 );
45}
46
47function normalizeSummary(value: unknown): BaaLiveInstructionIngestSummary | null {
48 if (value == null || typeof value !== "object" || Array.isArray(value)) {
49 return null;
50 }
51
52 const summary = value as Record<string, unknown>;
53
54 if (
55 typeof summary.assistant_message_id !== "string"
56 || typeof summary.block_count !== "number"
57 || (summary.conversation_id != null && typeof summary.conversation_id !== "string")
58 || typeof summary.duplicate_instruction_count !== "number"
59 || !isStringArray(summary.duplicate_tools)
60 || (summary.error_block_index != null && typeof summary.error_block_index !== "number")
61 || (summary.error_message != null && typeof summary.error_message !== "string")
62 || (summary.error_stage != null && typeof summary.error_stage !== "string")
63 || !isStringArray(summary.executed_tools)
64 || typeof summary.execution_count !== "number"
65 || typeof summary.execution_failed_count !== "number"
66 || typeof summary.execution_ok_count !== "number"
67 || typeof summary.ingested_at !== "number"
68 || typeof summary.instruction_count !== "number"
69 || !isStringArray(summary.instruction_tools)
70 || typeof summary.message_dedupe_key !== "string"
71 || (summary.observed_at != null && typeof summary.observed_at !== "number")
72 || typeof summary.platform !== "string"
73 || summary.source !== "browser.final_message"
74 || typeof summary.status !== "string"
75 ) {
76 return null;
77 }
78
79 if (
80 summary.parse_error_count != null
81 && (typeof summary.parse_error_count !== "number" || !Number.isFinite(summary.parse_error_count))
82 ) {
83 return null;
84 }
85
86 if (
87 summary.parse_errors != null
88 && (
89 !Array.isArray(summary.parse_errors)
90 || !summary.parse_errors.every((entry) => isBaaLiveInstructionParseErrorSummary(entry))
91 )
92 ) {
93 return null;
94 }
95
96 const parseErrors = Array.isArray(summary.parse_errors)
97 ? (summary.parse_errors as BaaLiveInstructionParseErrorSummary[]).map((entry) => ({
98 block_index: entry.block_index,
99 message: entry.message,
100 stage: entry.stage
101 }))
102 : [];
103
104 return {
105 assistant_message_id: summary.assistant_message_id,
106 block_count: summary.block_count,
107 conversation_id: summary.conversation_id as string | null,
108 duplicate_instruction_count: summary.duplicate_instruction_count,
109 duplicate_tools: [...summary.duplicate_tools],
110 error_block_index: (summary.error_block_index as number | null | undefined) ?? null,
111 error_message: (summary.error_message as string | null | undefined) ?? null,
112 error_stage: (summary.error_stage as string | null | undefined) ?? null,
113 executed_tools: [...summary.executed_tools],
114 execution_count: summary.execution_count,
115 execution_failed_count: summary.execution_failed_count,
116 execution_ok_count: summary.execution_ok_count,
117 ingested_at: summary.ingested_at,
118 instruction_count: summary.instruction_count,
119 instruction_tools: [...summary.instruction_tools],
120 message_dedupe_key: summary.message_dedupe_key,
121 observed_at: (summary.observed_at as number | null | undefined) ?? null,
122 parse_error_count:
123 typeof summary.parse_error_count === "number"
124 ? Math.max(0, Math.trunc(summary.parse_error_count))
125 : parseErrors.length,
126 parse_errors: parseErrors,
127 platform: summary.platform,
128 source: summary.source,
129 status: summary.status as BaaLiveInstructionIngestSummary["status"]
130 };
131}
132
133function parseSummary(summaryJson: string): BaaLiveInstructionIngestSummary | null {
134 try {
135 const parsed = JSON.parse(summaryJson) as unknown;
136 const normalized = normalizeSummary(parsed);
137 return normalized == null ? null : cloneSummary(normalized);
138 } catch {
139 return null;
140 }
141}
142
143export interface BaaLiveInstructionSnapshotStore {
144 appendSummary(kind: "execute" | "ingest", summary: BaaLiveInstructionIngestSummary): Promise<void>;
145 loadSnapshot(limit?: number): Promise<BaaLiveInstructionIngestSnapshot>;
146}
147
148export class PersistentBaaInstructionDeduper implements BaaInstructionDeduper {
149 constructor(
150 private readonly repository: ControlPlaneRepository,
151 private readonly now: () => number = Date.now
152 ) {}
153
154 async add(instruction: BaaInstructionEnvelope): Promise<void> {
155 await this.repository.putBaaInstructionDedupe({
156 assistantMessageId: instruction.assistantMessageId,
157 conversationId: instruction.conversationId,
158 createdAt: this.now(),
159 dedupeKey: instruction.dedupeKey,
160 instructionId: instruction.instructionId,
161 platform: instruction.platform,
162 target: instruction.target,
163 tool: instruction.tool
164 });
165 }
166
167 async has(dedupeKey: string): Promise<boolean> {
168 return this.repository.hasBaaInstructionDedupe(dedupeKey);
169 }
170}
171
172export class PersistentBaaLiveInstructionMessageDeduper
173 implements BaaLiveInstructionMessageDeduper
174{
175 constructor(
176 private readonly repository: ControlPlaneRepository,
177 private readonly now: () => number = Date.now
178 ) {}
179
180 async add(
181 dedupeKey: string,
182 metadata?: Pick<
183 BaaLiveInstructionIngestSummary,
184 | "assistant_message_id"
185 | "conversation_id"
186 | "ingested_at"
187 | "observed_at"
188 | "platform"
189 >
190 ): Promise<void> {
191 await this.repository.putBaaMessageDedupe({
192 assistantMessageId: metadata?.assistant_message_id ?? dedupeKey,
193 conversationId: metadata?.conversation_id ?? null,
194 createdAt: metadata?.ingested_at ?? this.now(),
195 dedupeKey,
196 observedAt: metadata?.observed_at ?? null,
197 platform: metadata?.platform ?? "unknown"
198 });
199 }
200
201 async has(dedupeKey: string): Promise<boolean> {
202 return this.repository.hasBaaMessageDedupe(dedupeKey);
203 }
204}
205
206export class PersistentBaaLiveInstructionSnapshotStore implements BaaLiveInstructionSnapshotStore {
207 private readonly historyLimit: number;
208
209 constructor(
210 private readonly repository: ControlPlaneRepository,
211 historyLimit: number = DEFAULT_BAA_EXECUTION_JOURNAL_LIMIT
212 ) {
213 this.historyLimit = Number.isFinite(historyLimit) && historyLimit > 0
214 ? Math.trunc(historyLimit)
215 : DEFAULT_BAA_EXECUTION_JOURNAL_LIMIT;
216 }
217
218 async appendSummary(
219 kind: "execute" | "ingest",
220 summary: BaaLiveInstructionIngestSummary
221 ): Promise<void> {
222 await this.repository.appendBaaExecutionJournal({
223 assistantMessageId: summary.assistant_message_id,
224 conversationId: summary.conversation_id,
225 ingestedAt: summary.ingested_at,
226 kind,
227 messageDedupeKey: summary.message_dedupe_key,
228 observedAt: summary.observed_at,
229 platform: summary.platform,
230 source: summary.source,
231 status: summary.status,
232 summaryJson: JSON.stringify(cloneSummary(summary))
233 });
234 }
235
236 async loadSnapshot(limit: number = this.historyLimit): Promise<BaaLiveInstructionIngestSnapshot> {
237 const normalizedLimit =
238 Number.isFinite(limit) && limit > 0 ? Math.trunc(limit) : this.historyLimit;
239 const [ingestRows, executeRows] = await Promise.all([
240 this.repository.listBaaExecutionJournal({
241 kind: "ingest",
242 limit: normalizedLimit
243 }),
244 this.repository.listBaaExecutionJournal({
245 kind: "execute",
246 limit: normalizedLimit
247 })
248 ]);
249 const recentIngests = ingestRows
250 .map((row) => parseSummary(row.summaryJson))
251 .filter((summary): summary is BaaLiveInstructionIngestSummary => summary != null);
252 const recentExecutes = executeRows
253 .map((row) => parseSummary(row.summaryJson))
254 .filter((summary): summary is BaaLiveInstructionIngestSummary => summary != null);
255
256 return {
257 last_execute: recentExecutes[0] ?? null,
258 last_ingest: recentIngests[0] ?? null,
259 recent_executes: recentExecutes,
260 recent_ingests: recentIngests
261 };
262 }
263}