baa-conductor


baa-conductor / apps / conductor-daemon / src / instructions
codex@macbookpro  ·  2026-04-01

store.ts

  1import {
  2  DEFAULT_BAA_EXECUTION_JOURNAL_LIMIT,
  3  type ControlPlaneRepository
  4} from "../../../../packages/db/dist/index.js";
  5
  6import type { BaaInstructionDeduper } from "./dedupe.js";
  7import type {
  8  BaaLiveInstructionIngestSnapshot,
  9  BaaLiveInstructionParseErrorSummary,
 10  BaaLiveInstructionIngestSummary,
 11  BaaLiveInstructionMessageDeduper
 12} from "./ingest.js";
 13import type { BaaInstructionEnvelope } from "./types.js";
 14
 15function cloneSummary(summary: BaaLiveInstructionIngestSummary): BaaLiveInstructionIngestSummary {
 16  return {
 17    ...summary,
 18    duplicate_tools: [...summary.duplicate_tools],
 19    executed_tools: [...summary.executed_tools],
 20    instruction_tools: [...summary.instruction_tools],
 21    parse_errors: summary.parse_errors.map((entry) => ({
 22      ...entry
 23    }))
 24  };
 25}
 26
 27function isStringArray(value: unknown): value is string[] {
 28  return Array.isArray(value) && value.every((entry) => typeof entry === "string");
 29}
 30
 31function isBaaLiveInstructionParseErrorSummary(
 32  value: unknown
 33): value is BaaLiveInstructionParseErrorSummary {
 34  return (
 35    value != null
 36    && typeof value === "object"
 37    && !Array.isArray(value)
 38    && typeof (value as { block_index?: unknown }).block_index === "number"
 39    && typeof (value as { message?: unknown }).message === "string"
 40    && (
 41      (value as { stage?: unknown }).stage === "normalize"
 42      || (value as { stage?: unknown }).stage === "parse"
 43    )
 44  );
 45}
 46
 47function normalizeSummary(value: unknown): BaaLiveInstructionIngestSummary | null {
 48  if (value == null || typeof value !== "object" || Array.isArray(value)) {
 49    return null;
 50  }
 51
 52  const summary = value as Record<string, unknown>;
 53
 54  if (
 55    typeof summary.assistant_message_id !== "string"
 56    || typeof summary.block_count !== "number"
 57    || (summary.conversation_id != null && typeof summary.conversation_id !== "string")
 58    || typeof summary.duplicate_instruction_count !== "number"
 59    || !isStringArray(summary.duplicate_tools)
 60    || (summary.error_block_index != null && typeof summary.error_block_index !== "number")
 61    || (summary.error_message != null && typeof summary.error_message !== "string")
 62    || (summary.error_stage != null && typeof summary.error_stage !== "string")
 63    || !isStringArray(summary.executed_tools)
 64    || typeof summary.execution_count !== "number"
 65    || typeof summary.execution_failed_count !== "number"
 66    || typeof summary.execution_ok_count !== "number"
 67    || typeof summary.ingested_at !== "number"
 68    || typeof summary.instruction_count !== "number"
 69    || !isStringArray(summary.instruction_tools)
 70    || typeof summary.message_dedupe_key !== "string"
 71    || (summary.observed_at != null && typeof summary.observed_at !== "number")
 72    || typeof summary.platform !== "string"
 73    || summary.source !== "browser.final_message"
 74    || typeof summary.status !== "string"
 75  ) {
 76    return null;
 77  }
 78
 79  if (
 80    summary.parse_error_count != null
 81    && (typeof summary.parse_error_count !== "number" || !Number.isFinite(summary.parse_error_count))
 82  ) {
 83    return null;
 84  }
 85
 86  if (
 87    summary.parse_errors != null
 88    && (
 89      !Array.isArray(summary.parse_errors)
 90      || !summary.parse_errors.every((entry) => isBaaLiveInstructionParseErrorSummary(entry))
 91    )
 92  ) {
 93    return null;
 94  }
 95
 96  const parseErrors = Array.isArray(summary.parse_errors)
 97    ? (summary.parse_errors as BaaLiveInstructionParseErrorSummary[]).map((entry) => ({
 98      block_index: entry.block_index,
 99      message: entry.message,
100      stage: entry.stage
101    }))
102    : [];
103
104  return {
105    assistant_message_id: summary.assistant_message_id,
106    block_count: summary.block_count,
107    conversation_id: summary.conversation_id as string | null,
108    duplicate_instruction_count: summary.duplicate_instruction_count,
109    duplicate_tools: [...summary.duplicate_tools],
110    error_block_index: (summary.error_block_index as number | null | undefined) ?? null,
111    error_message: (summary.error_message as string | null | undefined) ?? null,
112    error_stage: (summary.error_stage as string | null | undefined) ?? null,
113    executed_tools: [...summary.executed_tools],
114    execution_count: summary.execution_count,
115    execution_failed_count: summary.execution_failed_count,
116    execution_ok_count: summary.execution_ok_count,
117    ingested_at: summary.ingested_at,
118    instruction_count: summary.instruction_count,
119    instruction_tools: [...summary.instruction_tools],
120    message_dedupe_key: summary.message_dedupe_key,
121    observed_at: (summary.observed_at as number | null | undefined) ?? null,
122    parse_error_count:
123      typeof summary.parse_error_count === "number"
124        ? Math.max(0, Math.trunc(summary.parse_error_count))
125        : parseErrors.length,
126    parse_errors: parseErrors,
127    platform: summary.platform,
128    source: summary.source,
129    status: summary.status as BaaLiveInstructionIngestSummary["status"]
130  };
131}
132
133function parseSummary(summaryJson: string): BaaLiveInstructionIngestSummary | null {
134  try {
135    const parsed = JSON.parse(summaryJson) as unknown;
136    const normalized = normalizeSummary(parsed);
137    return normalized == null ? null : cloneSummary(normalized);
138  } catch {
139    return null;
140  }
141}
142
143export interface BaaLiveInstructionSnapshotStore {
144  appendSummary(kind: "execute" | "ingest", summary: BaaLiveInstructionIngestSummary): Promise<void>;
145  loadSnapshot(limit?: number): Promise<BaaLiveInstructionIngestSnapshot>;
146}
147
148export class PersistentBaaInstructionDeduper implements BaaInstructionDeduper {
149  constructor(
150    private readonly repository: ControlPlaneRepository,
151    private readonly now: () => number = Date.now
152  ) {}
153
154  async add(instruction: BaaInstructionEnvelope): Promise<void> {
155    await this.repository.putBaaInstructionDedupe({
156      assistantMessageId: instruction.assistantMessageId,
157      conversationId: instruction.conversationId,
158      createdAt: this.now(),
159      dedupeKey: instruction.dedupeKey,
160      instructionId: instruction.instructionId,
161      platform: instruction.platform,
162      target: instruction.target,
163      tool: instruction.tool
164    });
165  }
166
167  async has(dedupeKey: string): Promise<boolean> {
168    return this.repository.hasBaaInstructionDedupe(dedupeKey);
169  }
170}
171
172export class PersistentBaaLiveInstructionMessageDeduper
173  implements BaaLiveInstructionMessageDeduper
174{
175  constructor(
176    private readonly repository: ControlPlaneRepository,
177    private readonly now: () => number = Date.now
178  ) {}
179
180  async add(
181    dedupeKey: string,
182    metadata?: Pick<
183      BaaLiveInstructionIngestSummary,
184      | "assistant_message_id"
185      | "conversation_id"
186      | "ingested_at"
187      | "observed_at"
188      | "platform"
189    >
190  ): Promise<void> {
191    await this.repository.putBaaMessageDedupe({
192      assistantMessageId: metadata?.assistant_message_id ?? dedupeKey,
193      conversationId: metadata?.conversation_id ?? null,
194      createdAt: metadata?.ingested_at ?? this.now(),
195      dedupeKey,
196      observedAt: metadata?.observed_at ?? null,
197      platform: metadata?.platform ?? "unknown"
198    });
199  }
200
201  async has(dedupeKey: string): Promise<boolean> {
202    return this.repository.hasBaaMessageDedupe(dedupeKey);
203  }
204}
205
206export class PersistentBaaLiveInstructionSnapshotStore implements BaaLiveInstructionSnapshotStore {
207  private readonly historyLimit: number;
208
209  constructor(
210    private readonly repository: ControlPlaneRepository,
211    historyLimit: number = DEFAULT_BAA_EXECUTION_JOURNAL_LIMIT
212  ) {
213    this.historyLimit = Number.isFinite(historyLimit) && historyLimit > 0
214      ? Math.trunc(historyLimit)
215      : DEFAULT_BAA_EXECUTION_JOURNAL_LIMIT;
216  }
217
218  async appendSummary(
219    kind: "execute" | "ingest",
220    summary: BaaLiveInstructionIngestSummary
221  ): Promise<void> {
222    await this.repository.appendBaaExecutionJournal({
223      assistantMessageId: summary.assistant_message_id,
224      conversationId: summary.conversation_id,
225      ingestedAt: summary.ingested_at,
226      kind,
227      messageDedupeKey: summary.message_dedupe_key,
228      observedAt: summary.observed_at,
229      platform: summary.platform,
230      source: summary.source,
231      status: summary.status,
232      summaryJson: JSON.stringify(cloneSummary(summary))
233    });
234  }
235
236  async loadSnapshot(limit: number = this.historyLimit): Promise<BaaLiveInstructionIngestSnapshot> {
237    const normalizedLimit =
238      Number.isFinite(limit) && limit > 0 ? Math.trunc(limit) : this.historyLimit;
239    const [ingestRows, executeRows] = await Promise.all([
240      this.repository.listBaaExecutionJournal({
241        kind: "ingest",
242        limit: normalizedLimit
243      }),
244      this.repository.listBaaExecutionJournal({
245        kind: "execute",
246        limit: normalizedLimit
247      })
248    ]);
249    const recentIngests = ingestRows
250      .map((row) => parseSummary(row.summaryJson))
251      .filter((summary): summary is BaaLiveInstructionIngestSummary => summary != null);
252    const recentExecutes = executeRows
253      .map((row) => parseSummary(row.summaryJson))
254      .filter((summary): summary is BaaLiveInstructionIngestSummary => summary != null);
255
256    return {
257      last_execute: recentExecutes[0] ?? null,
258      last_ingest: recentIngests[0] ?? null,
259      recent_executes: recentExecutes,
260      recent_ingests: recentIngests
261    };
262  }
263}