codex@macbookpro
·
2026-04-01
ingest.ts
1import { createHash } from "node:crypto";
2import type {
3 ArtifactStore,
4 ConversationAutomationStatus
5} from "../../../../packages/artifact-db/dist/index.js";
6
7import type { ConductorLocalApiContext } from "../local-api.js";
8
9import {
10 BaaInstructionCenter,
11 BaaInstructionCenterError,
12 type BaaInstructionCenterOptions
13} from "./loop.js";
14import type { BaaInstructionPolicyConfig } from "./policy.js";
15import type { BaaLiveInstructionSnapshotStore } from "./store.js";
16import type {
17 BaaInstructionParseErrorStage,
18 BaaInstructionProcessResult,
19 BaaInstructionProcessStatus
20} from "./types.js";
21import { stableStringifyBaaJson } from "./types.js";
22
23export type BaaLiveInstructionIngestStatus =
24 | "automation_busy"
25 | "automation_paused"
26 | "denied_only"
27 | "duplicate_message"
28 | "duplicate_only"
29 | "executed"
30 | "failed"
31 | "ignored_no_instructions"
32 | "parse_error_only"
33 | "system_paused";
34
35export interface BaaLiveInstructionIngestInput {
36 assistantMessageId: string;
37 conversationId?: string | null;
38 conversationAutomationStatus?: ConversationAutomationStatus | null;
39 executionGateReason?: "automation_busy" | null;
40 localConversationId?: string | null;
41 observedAt?: number | null;
42 organizationId?: string | null;
43 pageTitle?: string | null;
44 pageUrl?: string | null;
45 platform: string;
46 source: "browser.final_message";
47 text: string;
48}
49
50export interface BaaLiveInstructionIngestSummary {
51 assistant_message_id: string;
52 block_count: number;
53 conversation_id: string | null;
54 duplicate_instruction_count: number;
55 duplicate_tools: string[];
56 error_block_index: number | null;
57 error_message: string | null;
58 error_stage: string | null;
59 executed_tools: string[];
60 execution_count: number;
61 execution_failed_count: number;
62 execution_ok_count: number;
63 ingested_at: number;
64 instruction_count: number;
65 instruction_tools: string[];
66 message_dedupe_key: string;
67 observed_at: number | null;
68 parse_error_count: number;
69 parse_errors: BaaLiveInstructionParseErrorSummary[];
70 platform: string;
71 source: "browser.final_message";
72 status: BaaLiveInstructionIngestStatus;
73}
74
75export interface BaaLiveInstructionParseErrorSummary {
76 block_index: number;
77 message: string;
78 stage: BaaInstructionParseErrorStage;
79}
80
81export interface BaaLiveInstructionIngestSnapshot {
82 last_execute: BaaLiveInstructionIngestSummary | null;
83 last_ingest: BaaLiveInstructionIngestSummary | null;
84 recent_executes: BaaLiveInstructionIngestSummary[];
85 recent_ingests: BaaLiveInstructionIngestSummary[];
86}
87
88export interface BaaLiveInstructionIngestResult {
89 processResult: BaaInstructionProcessResult | null;
90 summary: BaaLiveInstructionIngestSummary;
91}
92
93export interface BaaLiveInstructionMessageDeduper {
94 add(
95 dedupeKey: string,
96 metadata?: Pick<
97 BaaLiveInstructionIngestSummary,
98 | "assistant_message_id"
99 | "conversation_id"
100 | "ingested_at"
101 | "observed_at"
102 | "platform"
103 >
104 ): Promise<void> | void;
105 has(dedupeKey: string): Promise<boolean> | boolean;
106}
107
108export interface BaaLiveInstructionIngestOptions {
109 center?: BaaInstructionCenter;
110 historyLimit?: number;
111 localApiContext?: ConductorLocalApiContext;
112 messageDeduper?: BaaLiveInstructionMessageDeduper;
113 now?: () => number;
114 policy?: BaaInstructionPolicyConfig | null;
115 snapshotStore?: BaaLiveInstructionSnapshotStore | null;
116}
117
118const DEFAULT_IN_MEMORY_BAA_LIVE_MESSAGE_DEDUPER_MAX_SIZE = 10_000;
119
120function normalizeInMemoryMessageDeduperMaxSize(maxSize: number | null | undefined): number {
121 if (typeof maxSize !== "number" || !Number.isFinite(maxSize)) {
122 return DEFAULT_IN_MEMORY_BAA_LIVE_MESSAGE_DEDUPER_MAX_SIZE;
123 }
124
125 const normalized = Math.trunc(maxSize);
126 return normalized > 0 ? normalized : DEFAULT_IN_MEMORY_BAA_LIVE_MESSAGE_DEDUPER_MAX_SIZE;
127}
128
129export interface InMemoryBaaLiveInstructionMessageDeduperOptions {
130 maxSize?: number;
131}
132
133export class InMemoryBaaLiveInstructionMessageDeduper implements BaaLiveInstructionMessageDeduper {
134 private readonly keys = new Set<string>();
135 private readonly maxSize: number;
136
137 constructor(options: InMemoryBaaLiveInstructionMessageDeduperOptions = {}) {
138 this.maxSize = normalizeInMemoryMessageDeduperMaxSize(options.maxSize);
139 }
140
141 add(dedupeKey: string): void {
142 this.keys.delete(dedupeKey);
143 this.keys.add(dedupeKey);
144 this.evictOverflow();
145 }
146
147 clear(): void {
148 this.keys.clear();
149 }
150
151 has(dedupeKey: string): boolean {
152 return this.keys.has(dedupeKey);
153 }
154
155 private evictOverflow(): void {
156 while (this.keys.size > this.maxSize) {
157 const oldestKey = this.keys.values().next().value;
158
159 if (typeof oldestKey !== "string") {
160 return;
161 }
162
163 this.keys.delete(oldestKey);
164 }
165 }
166}
167
168function buildInstructionDescriptor(target: string, tool: string): string {
169 return `${target}::${tool}`;
170}
171
172function classifyProcessStatus(status: BaaInstructionProcessStatus): BaaLiveInstructionIngestStatus {
173 switch (status) {
174 case "automation_busy":
175 return "automation_busy";
176 case "automation_paused":
177 return "automation_paused";
178 case "denied_only":
179 return "denied_only";
180 case "duplicate_only":
181 return "duplicate_only";
182 case "executed":
183 return "executed";
184 case "no_instructions":
185 return "ignored_no_instructions";
186 case "parse_error_only":
187 return "parse_error_only";
188 case "system_paused":
189 return "system_paused";
190 }
191}
192
193function shouldUpdateExecutionSummary(status: BaaLiveInstructionIngestStatus): boolean {
194 return (
195 status === "automation_busy"
196 || status === "automation_paused"
197 || status === "denied_only"
198 || status === "duplicate_only"
199 || status === "executed"
200 || status === "failed"
201 || status === "parse_error_only"
202 || status === "system_paused"
203 );
204}
205
206function normalizeOptionalString(value: string | null | undefined): string | null {
207 if (typeof value !== "string") {
208 return null;
209 }
210
211 const normalized = value.trim();
212 return normalized === "" ? null : normalized;
213}
214
215function isDuplicateArtifactMessageError(error: unknown): boolean {
216 return (
217 error instanceof Error
218 && error.message.includes("UNIQUE constraint failed: messages.id")
219 );
220}
221
222function cloneSummary(summary: BaaLiveInstructionIngestSummary): BaaLiveInstructionIngestSummary {
223 return {
224 ...summary,
225 duplicate_tools: [...summary.duplicate_tools],
226 executed_tools: [...summary.executed_tools],
227 instruction_tools: [...summary.instruction_tools],
228 parse_errors: summary.parse_errors.map((entry) => ({
229 ...entry
230 }))
231 };
232}
233
234function cloneSummaryList(
235 summaries: readonly BaaLiveInstructionIngestSummary[]
236): BaaLiveInstructionIngestSummary[] {
237 return summaries.map((summary) => cloneSummary(summary));
238}
239
240function normalizeHistoryLimit(limit: number | null | undefined): number {
241 if (typeof limit !== "number" || !Number.isFinite(limit)) {
242 return 20;
243 }
244
245 const normalized = Math.trunc(limit);
246 return normalized > 0 ? normalized : 20;
247}
248
249export function buildBaaLiveInstructionMessageDedupeKey(input: {
250 assistantMessageId: string;
251 platform: string;
252 text: string;
253}): string {
254 return `sha256:${createHash("sha256")
255 .update(
256 stableStringifyBaaJson({
257 assistant_message_id: input.assistantMessageId,
258 platform: input.platform,
259 raw_text: input.text,
260 version: "baa.live.v1"
261 })
262 )
263 .digest("hex")}`;
264}
265
266export class BaaLiveInstructionIngest {
267 private readonly artifactStore: ArtifactStore | null;
268 private readonly center: BaaInstructionCenter;
269 private readonly historyLimit: number;
270 private readonly messageDeduper: BaaLiveInstructionMessageDeduper;
271 private readonly now: () => number;
272 private readonly snapshotStore: BaaLiveInstructionSnapshotStore | null;
273 private lastExecute: BaaLiveInstructionIngestSummary | null = null;
274 private lastIngest: BaaLiveInstructionIngestSummary | null = null;
275 private recentExecutes: BaaLiveInstructionIngestSummary[] = [];
276 private recentIngests: BaaLiveInstructionIngestSummary[] = [];
277 private initializedSnapshotPromise: Promise<void> | null = null;
278 private readonly pendingKeys = new Set<string>();
279
280 constructor(options: BaaLiveInstructionIngestOptions) {
281 if (options.center == null && options.localApiContext == null) {
282 throw new Error("BaaLiveInstructionIngest requires either center or localApiContext.");
283 }
284
285 this.center =
286 options.center
287 ?? new BaaInstructionCenter({
288 localApiContext: options.localApiContext as BaaInstructionCenterOptions["localApiContext"],
289 policy: options.policy ?? options.localApiContext?.instructionPolicy
290 });
291 this.artifactStore = options.localApiContext?.artifactStore ?? null;
292 this.historyLimit = normalizeHistoryLimit(options.historyLimit);
293 this.messageDeduper = options.messageDeduper ?? new InMemoryBaaLiveInstructionMessageDeduper();
294 this.now = options.now ?? Date.now;
295 this.snapshotStore = options.snapshotStore ?? null;
296 }
297
298 getSnapshot(): BaaLiveInstructionIngestSnapshot {
299 return {
300 last_execute: this.lastExecute == null ? null : cloneSummary(this.lastExecute),
301 last_ingest: this.lastIngest == null ? null : cloneSummary(this.lastIngest),
302 recent_executes: cloneSummaryList(this.recentExecutes),
303 recent_ingests: cloneSummaryList(this.recentIngests)
304 };
305 }
306
307 async initialize(): Promise<void> {
308 if (this.snapshotStore == null) {
309 return;
310 }
311
312 if (this.initializedSnapshotPromise == null) {
313 this.initializedSnapshotPromise = this.loadPersistedSnapshot();
314 }
315
316 await this.initializedSnapshotPromise;
317 }
318
319 async ingestAssistantFinalMessage(
320 input: BaaLiveInstructionIngestInput
321 ): Promise<BaaLiveInstructionIngestResult> {
322 await this.initialize();
323 await this.persistMessageArtifact(input);
324
325 const messageDedupeKey = buildBaaLiveInstructionMessageDedupeKey({
326 assistantMessageId: input.assistantMessageId,
327 platform: input.platform,
328 text: input.text
329 });
330 const baseSummary = {
331 assistant_message_id: input.assistantMessageId,
332 conversation_id: normalizeOptionalString(input.conversationId),
333 ingested_at: this.now(),
334 message_dedupe_key: messageDedupeKey,
335 observed_at: typeof input.observedAt === "number" && Number.isFinite(input.observedAt) ? input.observedAt : null,
336 platform: input.platform,
337 source: input.source
338 } as const;
339
340 if (this.pendingKeys.has(messageDedupeKey) || await this.messageDeduper.has(messageDedupeKey)) {
341 const summary: BaaLiveInstructionIngestSummary = {
342 ...baseSummary,
343 block_count: 0,
344 duplicate_instruction_count: 0,
345 duplicate_tools: [],
346 error_block_index: null,
347 error_message: null,
348 error_stage: null,
349 executed_tools: [],
350 execution_count: 0,
351 execution_failed_count: 0,
352 execution_ok_count: 0,
353 instruction_count: 0,
354 instruction_tools: [],
355 parse_error_count: 0,
356 parse_errors: [],
357 status: "duplicate_message"
358 };
359 await this.publishSummary("ingest", summary);
360 return {
361 processResult: null,
362 summary
363 };
364 }
365
366 this.pendingKeys.add(messageDedupeKey);
367
368 try {
369 const processResult = await this.center.processAssistantMessage({
370 assistantMessageId: input.assistantMessageId,
371 conversationId: normalizeOptionalString(input.conversationId),
372 platform: input.platform,
373 text: input.text
374 }, {
375 conversationAutomationStatus: input.conversationAutomationStatus ?? null,
376 executionGateReason: input.executionGateReason ?? null,
377 localConversationId: normalizeOptionalString(input.localConversationId)
378 });
379 const summary = this.buildSuccessSummary(baseSummary, processResult);
380
381 await this.messageDeduper.add(messageDedupeKey, summary);
382 await this.publishSummary("ingest", summary);
383
384 if (shouldUpdateExecutionSummary(summary.status)) {
385 await this.publishSummary("execute", summary);
386 }
387
388 return {
389 processResult,
390 summary
391 };
392 } catch (error) {
393 const summary = this.buildFailureSummary(baseSummary, error);
394
395 if (error instanceof BaaInstructionCenterError) {
396 await this.messageDeduper.add(messageDedupeKey, summary);
397 }
398
399 await this.publishSummary("ingest", summary);
400 await this.publishSummary("execute", summary);
401
402 return {
403 processResult: null,
404 summary
405 };
406 } finally {
407 this.pendingKeys.delete(messageDedupeKey);
408 }
409 }
410
411 private async persistMessageArtifact(input: BaaLiveInstructionIngestInput): Promise<void> {
412 if (this.artifactStore == null) {
413 return;
414 }
415
416 try {
417 await this.artifactStore.insertMessage({
418 conversationId: normalizeOptionalString(input.conversationId),
419 id: input.assistantMessageId,
420 observedAt:
421 typeof input.observedAt === "number" && Number.isFinite(input.observedAt)
422 ? input.observedAt
423 : this.now(),
424 organizationId: normalizeOptionalString(input.organizationId),
425 pageTitle: normalizeOptionalString(input.pageTitle),
426 pageUrl: normalizeOptionalString(input.pageUrl),
427 platform: input.platform,
428 rawText: input.text,
429 role: "assistant"
430 });
431 } catch (error) {
432 if (isDuplicateArtifactMessageError(error)) {
433 return;
434 }
435
436 const message = error instanceof Error ? error.stack ?? error.message : String(error);
437 console.error(`[artifact] failed to persist message ${input.assistantMessageId}: ${message}`);
438 }
439 }
440
441 private async loadPersistedSnapshot(): Promise<void> {
442 if (this.snapshotStore == null) {
443 return;
444 }
445
446 const snapshot = await this.snapshotStore.loadSnapshot(this.historyLimit);
447 this.lastExecute = snapshot.last_execute == null ? null : cloneSummary(snapshot.last_execute);
448 this.lastIngest = snapshot.last_ingest == null ? null : cloneSummary(snapshot.last_ingest);
449 this.recentExecutes = cloneSummaryList(snapshot.recent_executes).slice(0, this.historyLimit);
450 this.recentIngests = cloneSummaryList(snapshot.recent_ingests).slice(0, this.historyLimit);
451 }
452
453 private async publishSummary(
454 kind: "execute" | "ingest",
455 summary: BaaLiveInstructionIngestSummary
456 ): Promise<void> {
457 if (this.snapshotStore != null) {
458 await this.snapshotStore.appendSummary(kind, summary);
459 }
460
461 if (kind === "execute") {
462 this.lastExecute = cloneSummary(summary);
463 this.recentExecutes = this.pushRecent(this.recentExecutes, summary);
464 return;
465 }
466
467 this.lastIngest = cloneSummary(summary);
468 this.recentIngests = this.pushRecent(this.recentIngests, summary);
469 }
470
471 private pushRecent(
472 summaries: readonly BaaLiveInstructionIngestSummary[],
473 summary: BaaLiveInstructionIngestSummary
474 ): BaaLiveInstructionIngestSummary[] {
475 return [cloneSummary(summary), ...cloneSummaryList(summaries)].slice(0, this.historyLimit);
476 }
477
478 private buildFailureSummary(
479 baseSummary: Pick<
480 BaaLiveInstructionIngestSummary,
481 | "assistant_message_id"
482 | "conversation_id"
483 | "ingested_at"
484 | "message_dedupe_key"
485 | "observed_at"
486 | "platform"
487 | "source"
488 >,
489 error: unknown
490 ): BaaLiveInstructionIngestSummary {
491 const parseErrors =
492 error instanceof BaaInstructionCenterError
493 && error.blockIndex != null
494 && (error.stage === "normalize" || error.stage === "parse")
495 ? [
496 {
497 block_index: error.blockIndex,
498 message: error.message,
499 stage: error.stage
500 } satisfies BaaLiveInstructionParseErrorSummary
501 ]
502 : [];
503
504 return {
505 ...baseSummary,
506 block_count: 0,
507 duplicate_instruction_count: 0,
508 duplicate_tools: [],
509 error_block_index:
510 error instanceof BaaInstructionCenterError ? error.blockIndex : null,
511 error_message: error instanceof Error ? error.message : String(error),
512 error_stage: error instanceof BaaInstructionCenterError ? error.stage : "internal",
513 executed_tools: [],
514 execution_count: 0,
515 execution_failed_count: 0,
516 execution_ok_count: 0,
517 instruction_count: 0,
518 instruction_tools: [],
519 parse_error_count: parseErrors.length,
520 parse_errors: parseErrors,
521 status: "failed"
522 };
523 }
524
525 private buildSuccessSummary(
526 baseSummary: Pick<
527 BaaLiveInstructionIngestSummary,
528 | "assistant_message_id"
529 | "conversation_id"
530 | "ingested_at"
531 | "message_dedupe_key"
532 | "observed_at"
533 | "platform"
534 | "source"
535 >,
536 processResult: BaaInstructionProcessResult
537 ): BaaLiveInstructionIngestSummary {
538 const parseErrors = processResult.parseErrors.map((entry) => ({
539 block_index: entry.blockIndex,
540 message: entry.message,
541 stage: entry.stage
542 }));
543
544 return {
545 ...baseSummary,
546 block_count: processResult.blocks.length,
547 duplicate_instruction_count: processResult.duplicates.length,
548 duplicate_tools: processResult.duplicates.map((instruction) =>
549 buildInstructionDescriptor(instruction.target, instruction.tool)
550 ),
551 error_block_index: null,
552 error_message: null,
553 error_stage: null,
554 executed_tools: processResult.executions.map((execution) =>
555 buildInstructionDescriptor(execution.target, execution.tool)
556 ),
557 execution_count: processResult.executions.length,
558 execution_failed_count: processResult.executions.filter((execution) => execution.ok === false).length,
559 execution_ok_count: processResult.executions.filter((execution) => execution.ok === true).length,
560 instruction_count: processResult.instructions.length,
561 instruction_tools: processResult.instructions.map((instruction) =>
562 buildInstructionDescriptor(instruction.target, instruction.tool)
563 ),
564 parse_error_count: parseErrors.length,
565 parse_errors: parseErrors,
566 status: classifyProcessStatus(processResult.status)
567 };
568 }
569}