baa-conductor


baa-conductor / apps / conductor-daemon / src / instructions
codex@macbookpro  ·  2026-04-01

loop.ts

  1import type { ConversationAutomationStatus } from "../../../../packages/artifact-db/dist/index.js";
  2import { DEFAULT_AUTOMATION_MODE } from "../../../../packages/db/dist/index.js";
  3
  4import type { ConductorLocalApiContext } from "../local-api.js";
  5import {
  6  recordAutomationFailureSignal,
  7  recordAutomationSuccessSignal
  8} from "../renewal/automation.js";
  9
 10import { InMemoryBaaInstructionDeduper, type BaaInstructionDeduper } from "./dedupe.js";
 11import { executeBaaInstruction } from "./executor.js";
 12import { extractBaaInstructionBlocks } from "./extract.js";
 13import {
 14  normalizeBaaInstruction,
 15  normalizeBaaInstructionSourceMessage
 16} from "./normalize.js";
 17import { parseBaaInstructionBlock } from "./parse.js";
 18import {
 19  evaluateBaaInstructionPolicy,
 20  resolveBaaInstructionPolicy,
 21  type BaaInstructionPolicy,
 22  type BaaInstructionPolicyConfig
 23} from "./policy.js";
 24import { isAutomationControlInstruction, routeBaaInstruction } from "./router.js";
 25import type {
 26  BaaAssistantMessageInput,
 27  BaaInstructionDeniedResult,
 28  BaaInstructionEnvelope,
 29  BaaInstructionParseErrorRecord,
 30  BaaInstructionProcessResult,
 31  BaaInstructionRoute
 32} from "./types.js";
 33
 34type BaaInstructionStage = "extract" | "normalize" | "parse" | "policy" | "route";
 35
 36export class BaaInstructionCenterError extends Error {
 37  readonly blockIndex: number | null;
 38  readonly stage: BaaInstructionStage;
 39
 40  constructor(stage: BaaInstructionStage, message: string, blockIndex: number | null = null) {
 41    super(message);
 42    this.blockIndex = blockIndex;
 43    this.stage = stage;
 44  }
 45}
 46
 47export interface BaaInstructionCenterOptions {
 48  deduper?: BaaInstructionDeduper;
 49  localApiContext: ConductorLocalApiContext;
 50  policy?: BaaInstructionPolicyConfig | null;
 51}
 52
 53export interface BaaInstructionProcessOptions {
 54  conversationAutomationStatus?: ConversationAutomationStatus | null;
 55  executionGateReason?: "automation_busy" | null;
 56  localConversationId?: string | null;
 57}
 58
 59export class BaaInstructionCenter {
 60  private readonly deduper: BaaInstructionDeduper;
 61  private readonly localApiContext: ConductorLocalApiContext;
 62  private readonly policy: BaaInstructionPolicy;
 63
 64  constructor(options: BaaInstructionCenterOptions) {
 65    this.deduper = options.deduper ?? new InMemoryBaaInstructionDeduper();
 66    this.localApiContext = options.localApiContext;
 67    this.policy = resolveBaaInstructionPolicy(options.policy ?? options.localApiContext.instructionPolicy);
 68  }
 69
 70  async processAssistantMessage(
 71    input: BaaAssistantMessageInput,
 72    options: BaaInstructionProcessOptions = {}
 73  ): Promise<BaaInstructionProcessResult> {
 74    const blocks = this.extract(input.text);
 75
 76    if (blocks.length === 0) {
 77      return {
 78        blocks,
 79        denied: [],
 80        duplicates: [],
 81        executions: [],
 82        instructions: [],
 83        parseErrors: [],
 84        status: "no_instructions"
 85      };
 86    }
 87
 88    const {
 89      instructions,
 90      parseErrors
 91    } = this.normalize(input, blocks);
 92    const controlInstructions = instructions.filter((instruction) => isAutomationControlInstruction(instruction));
 93    const selectedInstructions = controlInstructions.length > 0 ? controlInstructions : instructions;
 94
 95    if (instructions.length === 0) {
 96      return {
 97        blocks,
 98        denied: [],
 99        duplicates: [],
100        executions: [],
101        instructions,
102        parseErrors,
103        status: "parse_error_only"
104      };
105    }
106
107    const systemAutomationMode = await this.loadSystemAutomationMode();
108
109    if (systemAutomationMode === "paused" && controlInstructions.length === 0) {
110      return {
111        blocks,
112        denied: [],
113        duplicates: [],
114        executions: [],
115        instructions: selectedInstructions,
116        parseErrors,
117        status: "system_paused"
118      };
119    }
120
121    if ((options.conversationAutomationStatus ?? null) === "paused" && controlInstructions.length === 0) {
122      return {
123        blocks,
124        denied: [],
125        duplicates: [],
126        executions: [],
127        instructions: selectedInstructions,
128        parseErrors,
129        status: "automation_paused"
130      };
131    }
132
133    if (options.executionGateReason != null) {
134      return {
135        blocks,
136        denied: [],
137        duplicates: [],
138        executions: [],
139        instructions: selectedInstructions,
140        parseErrors,
141        status: options.executionGateReason
142      };
143    }
144
145    const duplicates: BaaInstructionEnvelope[] = [];
146    const pending: BaaInstructionEnvelope[] = [];
147
148    for (const instruction of selectedInstructions) {
149      if (await this.deduper.has(instruction.dedupeKey)) {
150        duplicates.push(instruction);
151        continue;
152      }
153
154      pending.push(instruction);
155    }
156
157    if (pending.length === 0) {
158      return {
159        blocks,
160        denied: [],
161        duplicates,
162        executions: [],
163        instructions: selectedInstructions,
164        parseErrors,
165        status: "duplicate_only"
166      };
167    }
168
169    const { denied, routed } = this.preflight(pending);
170
171    for (const instruction of pending) {
172      await this.deduper.add(instruction);
173    }
174
175    const executions: BaaInstructionProcessResult["executions"] = [];
176
177    for (const { instruction, route } of routed) {
178      executions.push(await executeBaaInstruction(instruction, route, this.localApiContext));
179    }
180
181    await this.recordAutomationOutcome(options.localConversationId ?? null, executions);
182
183    return {
184      blocks,
185      denied,
186      duplicates,
187      executions,
188      instructions: selectedInstructions,
189      parseErrors,
190      status: executions.length === 0 && denied.length > 0 ? "denied_only" : "executed"
191    };
192  }
193
194  private async recordAutomationOutcome(
195    localConversationId: string | null,
196    executions: BaaInstructionProcessResult["executions"]
197  ): Promise<void> {
198    if (localConversationId == null || executions.length === 0 || this.localApiContext.artifactStore == null) {
199      return;
200    }
201
202    const conversation = await this.localApiContext.artifactStore.getLocalConversation(localConversationId);
203
204    if (conversation == null) {
205      return;
206    }
207
208    const failedExecution = executions.find((execution) => execution.ok === false);
209    const observedAt = this.localApiContext.now?.() ?? Date.now();
210
211    if (failedExecution != null) {
212      await recordAutomationFailureSignal({
213        conversation,
214        errorMessage: failedExecution.error ?? failedExecution.message ?? "instruction_execution_failed",
215        observedAt,
216        store: this.localApiContext.artifactStore
217      });
218      return;
219    }
220
221    await recordAutomationSuccessSignal({
222      conversation,
223      observedAt,
224      store: this.localApiContext.artifactStore
225    });
226  }
227
228  private async loadSystemAutomationMode(): Promise<"draining" | "paused" | "running"> {
229    const repository = this.localApiContext.repository;
230
231    if (repository == null) {
232      return DEFAULT_AUTOMATION_MODE;
233    }
234
235    const automationState = await repository.getAutomationState();
236    return automationState?.mode ?? DEFAULT_AUTOMATION_MODE;
237  }
238
239  private extract(text: string) {
240    try {
241      return extractBaaInstructionBlocks(text);
242    } catch (error) {
243      const message = error instanceof Error ? error.message : String(error);
244      throw new BaaInstructionCenterError("extract", message);
245    }
246  }
247
248  private normalize(
249    input: BaaAssistantMessageInput,
250    blocks: BaaInstructionProcessResult["blocks"]
251  ): {
252    instructions: BaaInstructionEnvelope[];
253    parseErrors: BaaInstructionParseErrorRecord[];
254  } {
255    try {
256      const source = normalizeBaaInstructionSourceMessage({
257        assistantMessageId: input.assistantMessageId,
258        conversationId: input.conversationId,
259        platform: input.platform
260      });
261      const instructions: BaaInstructionEnvelope[] = [];
262      const parseErrors: BaaInstructionParseErrorRecord[] = [];
263
264      for (const block of blocks) {
265        try {
266          instructions.push(normalizeBaaInstruction(source, parseBaaInstructionBlock(block)));
267        } catch (error) {
268          const message = error instanceof Error ? error.message : String(error);
269          const blockIndex =
270            error != null &&
271            typeof error === "object" &&
272            "blockIndex" in error &&
273            typeof (error as { blockIndex?: unknown }).blockIndex === "number"
274              ? (error as { blockIndex: number }).blockIndex
275              : block.blockIndex;
276          const stage =
277            error != null &&
278            typeof error === "object" &&
279            "stage" in error &&
280            (error as { stage?: unknown }).stage === "parse"
281              ? "parse"
282              : "normalize";
283
284          parseErrors.push({
285            blockIndex,
286            message,
287            stage
288          });
289        }
290      }
291
292      return {
293        instructions,
294        parseErrors
295      };
296    } catch (error) {
297      if (error instanceof BaaInstructionCenterError) {
298        throw error;
299      }
300
301      const message = error instanceof Error ? error.message : String(error);
302      const blockIndex =
303        error != null &&
304        typeof error === "object" &&
305        "blockIndex" in error &&
306        typeof (error as { blockIndex?: unknown }).blockIndex === "number"
307          ? (error as { blockIndex: number }).blockIndex
308          : null;
309      const stage =
310        error != null &&
311        typeof error === "object" &&
312        "stage" in error &&
313        (error as { stage?: unknown }).stage === "parse"
314          ? "parse"
315          : "normalize";
316
317      throw new BaaInstructionCenterError(stage, message, blockIndex);
318    }
319  }
320
321  private preflight(
322    instructions: BaaInstructionEnvelope[]
323  ): {
324    denied: BaaInstructionDeniedResult[];
325    routed: Array<{ instruction: BaaInstructionEnvelope; route: BaaInstructionRoute }>;
326  } {
327    const denied: BaaInstructionDeniedResult[] = [];
328    const routed: Array<{ instruction: BaaInstructionEnvelope; route: BaaInstructionRoute }> = [];
329
330    for (const instruction of instructions) {
331      const decision = evaluateBaaInstructionPolicy(instruction, this.policy);
332
333      if (!decision.ok) {
334        denied.push({
335          blockIndex: instruction.blockIndex,
336          code: decision.code,
337          instruction,
338          reason: decision.message ?? "BAA instruction was denied by policy.",
339          stage: "policy"
340        });
341        continue;
342      }
343
344      try {
345        routed.push({
346          instruction,
347          route: routeBaaInstruction(instruction)
348        });
349      } catch (error) {
350        const message = error instanceof Error ? error.message : String(error);
351        denied.push({
352          blockIndex: instruction.blockIndex,
353          code: null,
354          instruction,
355          reason: message,
356          stage: "route"
357        });
358      }
359    }
360
361    return {
362      denied,
363      routed
364    };
365  }
366}