baa-conductor


baa-conductor / apps / conductor-daemon / src / instructions
codex@macbookpro  ·  2026-04-01

ingest.ts

  1import { createHash } from "node:crypto";
  2import type {
  3  ArtifactStore,
  4  ConversationAutomationStatus
  5} from "../../../../packages/artifact-db/dist/index.js";
  6
  7import type { ConductorLocalApiContext } from "../local-api.js";
  8
  9import {
 10  BaaInstructionCenter,
 11  BaaInstructionCenterError,
 12  type BaaInstructionCenterOptions
 13} from "./loop.js";
 14import type { BaaInstructionPolicyConfig } from "./policy.js";
 15import type { BaaLiveInstructionSnapshotStore } from "./store.js";
 16import type {
 17  BaaInstructionParseErrorStage,
 18  BaaInstructionProcessResult,
 19  BaaInstructionProcessStatus
 20} from "./types.js";
 21import { stableStringifyBaaJson } from "./types.js";
 22
 23export type BaaLiveInstructionIngestStatus =
 24  | "automation_busy"
 25  | "automation_paused"
 26  | "denied_only"
 27  | "duplicate_message"
 28  | "duplicate_only"
 29  | "executed"
 30  | "failed"
 31  | "ignored_no_instructions"
 32  | "parse_error_only"
 33  | "system_paused";
 34
 35export interface BaaLiveInstructionIngestInput {
 36  assistantMessageId: string;
 37  conversationId?: string | null;
 38  conversationAutomationStatus?: ConversationAutomationStatus | null;
 39  executionGateReason?: "automation_busy" | null;
 40  localConversationId?: string | null;
 41  observedAt?: number | null;
 42  organizationId?: string | null;
 43  pageTitle?: string | null;
 44  pageUrl?: string | null;
 45  platform: string;
 46  source: "browser.final_message";
 47  text: string;
 48}
 49
 50export interface BaaLiveInstructionIngestSummary {
 51  assistant_message_id: string;
 52  block_count: number;
 53  conversation_id: string | null;
 54  duplicate_instruction_count: number;
 55  duplicate_tools: string[];
 56  error_block_index: number | null;
 57  error_message: string | null;
 58  error_stage: string | null;
 59  executed_tools: string[];
 60  execution_count: number;
 61  execution_failed_count: number;
 62  execution_ok_count: number;
 63  ingested_at: number;
 64  instruction_count: number;
 65  instruction_tools: string[];
 66  message_dedupe_key: string;
 67  observed_at: number | null;
 68  parse_error_count: number;
 69  parse_errors: BaaLiveInstructionParseErrorSummary[];
 70  platform: string;
 71  source: "browser.final_message";
 72  status: BaaLiveInstructionIngestStatus;
 73}
 74
 75export interface BaaLiveInstructionParseErrorSummary {
 76  block_index: number;
 77  message: string;
 78  stage: BaaInstructionParseErrorStage;
 79}
 80
 81export interface BaaLiveInstructionIngestSnapshot {
 82  last_execute: BaaLiveInstructionIngestSummary | null;
 83  last_ingest: BaaLiveInstructionIngestSummary | null;
 84  recent_executes: BaaLiveInstructionIngestSummary[];
 85  recent_ingests: BaaLiveInstructionIngestSummary[];
 86}
 87
 88export interface BaaLiveInstructionIngestResult {
 89  processResult: BaaInstructionProcessResult | null;
 90  summary: BaaLiveInstructionIngestSummary;
 91}
 92
 93export interface BaaLiveInstructionMessageDeduper {
 94  add(
 95    dedupeKey: string,
 96    metadata?: Pick<
 97      BaaLiveInstructionIngestSummary,
 98      | "assistant_message_id"
 99      | "conversation_id"
100      | "ingested_at"
101      | "observed_at"
102      | "platform"
103    >
104  ): Promise<void> | void;
105  has(dedupeKey: string): Promise<boolean> | boolean;
106}
107
108export interface BaaLiveInstructionIngestOptions {
109  center?: BaaInstructionCenter;
110  historyLimit?: number;
111  localApiContext?: ConductorLocalApiContext;
112  messageDeduper?: BaaLiveInstructionMessageDeduper;
113  now?: () => number;
114  policy?: BaaInstructionPolicyConfig | null;
115  snapshotStore?: BaaLiveInstructionSnapshotStore | null;
116}
117
118const DEFAULT_IN_MEMORY_BAA_LIVE_MESSAGE_DEDUPER_MAX_SIZE = 10_000;
119
120function normalizeInMemoryMessageDeduperMaxSize(maxSize: number | null | undefined): number {
121  if (typeof maxSize !== "number" || !Number.isFinite(maxSize)) {
122    return DEFAULT_IN_MEMORY_BAA_LIVE_MESSAGE_DEDUPER_MAX_SIZE;
123  }
124
125  const normalized = Math.trunc(maxSize);
126  return normalized > 0 ? normalized : DEFAULT_IN_MEMORY_BAA_LIVE_MESSAGE_DEDUPER_MAX_SIZE;
127}
128
129export interface InMemoryBaaLiveInstructionMessageDeduperOptions {
130  maxSize?: number;
131}
132
133export class InMemoryBaaLiveInstructionMessageDeduper implements BaaLiveInstructionMessageDeduper {
134  private readonly keys = new Set<string>();
135  private readonly maxSize: number;
136
137  constructor(options: InMemoryBaaLiveInstructionMessageDeduperOptions = {}) {
138    this.maxSize = normalizeInMemoryMessageDeduperMaxSize(options.maxSize);
139  }
140
141  add(dedupeKey: string): void {
142    this.keys.delete(dedupeKey);
143    this.keys.add(dedupeKey);
144    this.evictOverflow();
145  }
146
147  clear(): void {
148    this.keys.clear();
149  }
150
151  has(dedupeKey: string): boolean {
152    return this.keys.has(dedupeKey);
153  }
154
155  private evictOverflow(): void {
156    while (this.keys.size > this.maxSize) {
157      const oldestKey = this.keys.values().next().value;
158
159      if (typeof oldestKey !== "string") {
160        return;
161      }
162
163      this.keys.delete(oldestKey);
164    }
165  }
166}
167
168function buildInstructionDescriptor(target: string, tool: string): string {
169  return `${target}::${tool}`;
170}
171
172function classifyProcessStatus(status: BaaInstructionProcessStatus): BaaLiveInstructionIngestStatus {
173  switch (status) {
174    case "automation_busy":
175      return "automation_busy";
176    case "automation_paused":
177      return "automation_paused";
178    case "denied_only":
179      return "denied_only";
180    case "duplicate_only":
181      return "duplicate_only";
182    case "executed":
183      return "executed";
184    case "no_instructions":
185      return "ignored_no_instructions";
186    case "parse_error_only":
187      return "parse_error_only";
188    case "system_paused":
189      return "system_paused";
190  }
191}
192
193function shouldUpdateExecutionSummary(status: BaaLiveInstructionIngestStatus): boolean {
194  return (
195    status === "automation_busy"
196    || status === "automation_paused"
197    || status === "denied_only"
198    || status === "duplicate_only"
199    || status === "executed"
200    || status === "failed"
201    || status === "parse_error_only"
202    || status === "system_paused"
203  );
204}
205
206function normalizeOptionalString(value: string | null | undefined): string | null {
207  if (typeof value !== "string") {
208    return null;
209  }
210
211  const normalized = value.trim();
212  return normalized === "" ? null : normalized;
213}
214
215function isDuplicateArtifactMessageError(error: unknown): boolean {
216  return (
217    error instanceof Error
218    && error.message.includes("UNIQUE constraint failed: messages.id")
219  );
220}
221
222function cloneSummary(summary: BaaLiveInstructionIngestSummary): BaaLiveInstructionIngestSummary {
223  return {
224    ...summary,
225    duplicate_tools: [...summary.duplicate_tools],
226    executed_tools: [...summary.executed_tools],
227    instruction_tools: [...summary.instruction_tools],
228    parse_errors: summary.parse_errors.map((entry) => ({
229      ...entry
230    }))
231  };
232}
233
234function cloneSummaryList(
235  summaries: readonly BaaLiveInstructionIngestSummary[]
236): BaaLiveInstructionIngestSummary[] {
237  return summaries.map((summary) => cloneSummary(summary));
238}
239
240function normalizeHistoryLimit(limit: number | null | undefined): number {
241  if (typeof limit !== "number" || !Number.isFinite(limit)) {
242    return 20;
243  }
244
245  const normalized = Math.trunc(limit);
246  return normalized > 0 ? normalized : 20;
247}
248
249export function buildBaaLiveInstructionMessageDedupeKey(input: {
250  assistantMessageId: string;
251  platform: string;
252  text: string;
253}): string {
254  return `sha256:${createHash("sha256")
255    .update(
256      stableStringifyBaaJson({
257        assistant_message_id: input.assistantMessageId,
258        platform: input.platform,
259        raw_text: input.text,
260        version: "baa.live.v1"
261      })
262    )
263    .digest("hex")}`;
264}
265
266export class BaaLiveInstructionIngest {
267  private readonly artifactStore: ArtifactStore | null;
268  private readonly center: BaaInstructionCenter;
269  private readonly historyLimit: number;
270  private readonly messageDeduper: BaaLiveInstructionMessageDeduper;
271  private readonly now: () => number;
272  private readonly snapshotStore: BaaLiveInstructionSnapshotStore | null;
273  private lastExecute: BaaLiveInstructionIngestSummary | null = null;
274  private lastIngest: BaaLiveInstructionIngestSummary | null = null;
275  private recentExecutes: BaaLiveInstructionIngestSummary[] = [];
276  private recentIngests: BaaLiveInstructionIngestSummary[] = [];
277  private initializedSnapshotPromise: Promise<void> | null = null;
278  private readonly pendingKeys = new Set<string>();
279
280  constructor(options: BaaLiveInstructionIngestOptions) {
281    if (options.center == null && options.localApiContext == null) {
282      throw new Error("BaaLiveInstructionIngest requires either center or localApiContext.");
283    }
284
285    this.center =
286      options.center
287      ?? new BaaInstructionCenter({
288        localApiContext: options.localApiContext as BaaInstructionCenterOptions["localApiContext"],
289        policy: options.policy ?? options.localApiContext?.instructionPolicy
290      });
291    this.artifactStore = options.localApiContext?.artifactStore ?? null;
292    this.historyLimit = normalizeHistoryLimit(options.historyLimit);
293    this.messageDeduper = options.messageDeduper ?? new InMemoryBaaLiveInstructionMessageDeduper();
294    this.now = options.now ?? Date.now;
295    this.snapshotStore = options.snapshotStore ?? null;
296  }
297
298  getSnapshot(): BaaLiveInstructionIngestSnapshot {
299    return {
300      last_execute: this.lastExecute == null ? null : cloneSummary(this.lastExecute),
301      last_ingest: this.lastIngest == null ? null : cloneSummary(this.lastIngest),
302      recent_executes: cloneSummaryList(this.recentExecutes),
303      recent_ingests: cloneSummaryList(this.recentIngests)
304    };
305  }
306
307  async initialize(): Promise<void> {
308    if (this.snapshotStore == null) {
309      return;
310    }
311
312    if (this.initializedSnapshotPromise == null) {
313      this.initializedSnapshotPromise = this.loadPersistedSnapshot();
314    }
315
316    await this.initializedSnapshotPromise;
317  }
318
319  async ingestAssistantFinalMessage(
320    input: BaaLiveInstructionIngestInput
321  ): Promise<BaaLiveInstructionIngestResult> {
322    await this.initialize();
323    await this.persistMessageArtifact(input);
324
325    const messageDedupeKey = buildBaaLiveInstructionMessageDedupeKey({
326      assistantMessageId: input.assistantMessageId,
327      platform: input.platform,
328      text: input.text
329    });
330    const baseSummary = {
331      assistant_message_id: input.assistantMessageId,
332      conversation_id: normalizeOptionalString(input.conversationId),
333      ingested_at: this.now(),
334      message_dedupe_key: messageDedupeKey,
335      observed_at: typeof input.observedAt === "number" && Number.isFinite(input.observedAt) ? input.observedAt : null,
336      platform: input.platform,
337      source: input.source
338    } as const;
339
340    if (this.pendingKeys.has(messageDedupeKey) || await this.messageDeduper.has(messageDedupeKey)) {
341      const summary: BaaLiveInstructionIngestSummary = {
342        ...baseSummary,
343        block_count: 0,
344        duplicate_instruction_count: 0,
345        duplicate_tools: [],
346        error_block_index: null,
347        error_message: null,
348        error_stage: null,
349        executed_tools: [],
350        execution_count: 0,
351        execution_failed_count: 0,
352        execution_ok_count: 0,
353        instruction_count: 0,
354        instruction_tools: [],
355        parse_error_count: 0,
356        parse_errors: [],
357        status: "duplicate_message"
358      };
359      await this.publishSummary("ingest", summary);
360      return {
361        processResult: null,
362        summary
363      };
364    }
365
366    this.pendingKeys.add(messageDedupeKey);
367
368    try {
369      const processResult = await this.center.processAssistantMessage({
370        assistantMessageId: input.assistantMessageId,
371        conversationId: normalizeOptionalString(input.conversationId),
372        platform: input.platform,
373        text: input.text
374      }, {
375        conversationAutomationStatus: input.conversationAutomationStatus ?? null,
376        executionGateReason: input.executionGateReason ?? null,
377        localConversationId: normalizeOptionalString(input.localConversationId)
378      });
379      const summary = this.buildSuccessSummary(baseSummary, processResult);
380
381      await this.messageDeduper.add(messageDedupeKey, summary);
382      await this.publishSummary("ingest", summary);
383
384      if (shouldUpdateExecutionSummary(summary.status)) {
385        await this.publishSummary("execute", summary);
386      }
387
388      return {
389        processResult,
390        summary
391      };
392    } catch (error) {
393      const summary = this.buildFailureSummary(baseSummary, error);
394
395      if (error instanceof BaaInstructionCenterError) {
396        await this.messageDeduper.add(messageDedupeKey, summary);
397      }
398
399      await this.publishSummary("ingest", summary);
400      await this.publishSummary("execute", summary);
401
402      return {
403        processResult: null,
404        summary
405      };
406    } finally {
407      this.pendingKeys.delete(messageDedupeKey);
408    }
409  }
410
411  private async persistMessageArtifact(input: BaaLiveInstructionIngestInput): Promise<void> {
412    if (this.artifactStore == null) {
413      return;
414    }
415
416    try {
417      await this.artifactStore.insertMessage({
418        conversationId: normalizeOptionalString(input.conversationId),
419        id: input.assistantMessageId,
420        observedAt:
421          typeof input.observedAt === "number" && Number.isFinite(input.observedAt)
422            ? input.observedAt
423            : this.now(),
424        organizationId: normalizeOptionalString(input.organizationId),
425        pageTitle: normalizeOptionalString(input.pageTitle),
426        pageUrl: normalizeOptionalString(input.pageUrl),
427        platform: input.platform,
428        rawText: input.text,
429        role: "assistant"
430      });
431    } catch (error) {
432      if (isDuplicateArtifactMessageError(error)) {
433        return;
434      }
435
436      const message = error instanceof Error ? error.stack ?? error.message : String(error);
437      console.error(`[artifact] failed to persist message ${input.assistantMessageId}: ${message}`);
438    }
439  }
440
441  private async loadPersistedSnapshot(): Promise<void> {
442    if (this.snapshotStore == null) {
443      return;
444    }
445
446    const snapshot = await this.snapshotStore.loadSnapshot(this.historyLimit);
447    this.lastExecute = snapshot.last_execute == null ? null : cloneSummary(snapshot.last_execute);
448    this.lastIngest = snapshot.last_ingest == null ? null : cloneSummary(snapshot.last_ingest);
449    this.recentExecutes = cloneSummaryList(snapshot.recent_executes).slice(0, this.historyLimit);
450    this.recentIngests = cloneSummaryList(snapshot.recent_ingests).slice(0, this.historyLimit);
451  }
452
453  private async publishSummary(
454    kind: "execute" | "ingest",
455    summary: BaaLiveInstructionIngestSummary
456  ): Promise<void> {
457    if (this.snapshotStore != null) {
458      await this.snapshotStore.appendSummary(kind, summary);
459    }
460
461    if (kind === "execute") {
462      this.lastExecute = cloneSummary(summary);
463      this.recentExecutes = this.pushRecent(this.recentExecutes, summary);
464      return;
465    }
466
467    this.lastIngest = cloneSummary(summary);
468    this.recentIngests = this.pushRecent(this.recentIngests, summary);
469  }
470
471  private pushRecent(
472    summaries: readonly BaaLiveInstructionIngestSummary[],
473    summary: BaaLiveInstructionIngestSummary
474  ): BaaLiveInstructionIngestSummary[] {
475    return [cloneSummary(summary), ...cloneSummaryList(summaries)].slice(0, this.historyLimit);
476  }
477
478  private buildFailureSummary(
479    baseSummary: Pick<
480      BaaLiveInstructionIngestSummary,
481      | "assistant_message_id"
482      | "conversation_id"
483      | "ingested_at"
484      | "message_dedupe_key"
485      | "observed_at"
486      | "platform"
487      | "source"
488    >,
489    error: unknown
490  ): BaaLiveInstructionIngestSummary {
491    const parseErrors =
492      error instanceof BaaInstructionCenterError
493      && error.blockIndex != null
494      && (error.stage === "normalize" || error.stage === "parse")
495        ? [
496          {
497            block_index: error.blockIndex,
498            message: error.message,
499            stage: error.stage
500          } satisfies BaaLiveInstructionParseErrorSummary
501        ]
502        : [];
503
504    return {
505      ...baseSummary,
506      block_count: 0,
507      duplicate_instruction_count: 0,
508      duplicate_tools: [],
509      error_block_index:
510        error instanceof BaaInstructionCenterError ? error.blockIndex : null,
511      error_message: error instanceof Error ? error.message : String(error),
512      error_stage: error instanceof BaaInstructionCenterError ? error.stage : "internal",
513      executed_tools: [],
514      execution_count: 0,
515      execution_failed_count: 0,
516      execution_ok_count: 0,
517      instruction_count: 0,
518      instruction_tools: [],
519      parse_error_count: parseErrors.length,
520      parse_errors: parseErrors,
521      status: "failed"
522    };
523  }
524
525  private buildSuccessSummary(
526    baseSummary: Pick<
527      BaaLiveInstructionIngestSummary,
528      | "assistant_message_id"
529      | "conversation_id"
530      | "ingested_at"
531      | "message_dedupe_key"
532      | "observed_at"
533      | "platform"
534      | "source"
535    >,
536    processResult: BaaInstructionProcessResult
537  ): BaaLiveInstructionIngestSummary {
538    const parseErrors = processResult.parseErrors.map((entry) => ({
539      block_index: entry.blockIndex,
540      message: entry.message,
541      stage: entry.stage
542    }));
543
544    return {
545      ...baseSummary,
546      block_count: processResult.blocks.length,
547      duplicate_instruction_count: processResult.duplicates.length,
548      duplicate_tools: processResult.duplicates.map((instruction) =>
549        buildInstructionDescriptor(instruction.target, instruction.tool)
550      ),
551      error_block_index: null,
552      error_message: null,
553      error_stage: null,
554      executed_tools: processResult.executions.map((execution) =>
555        buildInstructionDescriptor(execution.target, execution.tool)
556      ),
557      execution_count: processResult.executions.length,
558      execution_failed_count: processResult.executions.filter((execution) => execution.ok === false).length,
559      execution_ok_count: processResult.executions.filter((execution) => execution.ok === true).length,
560      instruction_count: processResult.instructions.length,
561      instruction_tools: processResult.instructions.map((instruction) =>
562        buildInstructionDescriptor(instruction.target, instruction.tool)
563      ),
564      parse_error_count: parseErrors.length,
565      parse_errors: parseErrors,
566      status: classifyProcessStatus(processResult.status)
567    };
568  }
569}