baa-conductor


baa-conductor / apps / conductor-daemon / src / renewal
im_wower  ·  2026-04-03

projector.ts

  1import { join } from "node:path";
  2
  3import {
  4  buildArtifactPublicUrl,
  5  type ArtifactStore,
  6  type ConversationLinkRecord,
  7  type LocalConversationRecord,
  8  type MessageRecord,
  9  type MessageScanCursor
 10} from "../../../../packages/artifact-db/dist/index.js";
 11import type { ControlPlaneRepository } from "../../../../packages/db/dist/index.js";
 12
 13import type { TimedJobRunner, TimedJobRunnerResult, TimedJobTickContext } from "../timed-jobs/index.js";
 14import { extractBaaInstructionBlocks } from "../instructions/extract.js";
 15
 16import {
 17  recordAutomationFailureSignal,
 18  recordRenewalPayloadSignal
 19} from "./automation.js";
 20import {
 21  isPlainRecord,
 22  normalizeOptionalString,
 23  normalizeRequiredString,
 24  parseJsonValue
 25} from "./utils.js";
 26
 27const DEFAULT_CURSOR_STATE_KEY = "renewal.projector.cursor";
 28const DEFAULT_MESSAGE_ROLE = "assistant";
 29const DEFAULT_PAYLOAD_TEMPLATE = "summary_with_link";
 30const DEFAULT_SUMMARY_LIMIT = 200;
 31const RUNNER_NAME = "renewal.projector";
 32const TAB_TARGET_ID_PATTERN = /^tab:(\d+)$/u;
 33
 34export type RenewalProjectorSkipReason =
 35  | "automation_manual"
 36  | "automation_paused"
 37  | "cooldown_active"
 38  | "duplicate_job"
 39  | "instruction_message"
 40  | "missing_active_link"
 41  | "missing_local_conversation"
 42  | "missing_remote_conversation_id"
 43  | "repeated_renewal"
 44  | "route_unavailable";
 45
 46export type RenewalRouteUnavailableReason =
 47  | "inactive_link"
 48  | "missing_delivery_context"
 49  | "shell_page"
 50  | "target_kind_not_proxy_delivery";
 51
 52export interface RenewalProjectorCursor extends MessageScanCursor {}
 53
 54export interface RenewalProjectorPayload {
 55  kind: "renewal.message";
 56  linkUrl: string | null;
 57  sourceMessage: {
 58    artifactUrl: string | null;
 59    conversationId: string | null;
 60    id: string;
 61    observedAt: number;
 62    platform: string;
 63    summary: string;
 64  };
 65  summary: string;
 66  template: string;
 67  text: string;
 68  version: 1;
 69}
 70
 71export interface RenewalProjectorTargetSnapshot {
 72  clientId: string | null;
 73  linkId: string;
 74  pageTitle: string | null;
 75  pageUrl: string | null;
 76  platform: string;
 77  route: {
 78    params: Record<string, unknown> | null;
 79    path: string | null;
 80    pattern: string | null;
 81  };
 82  target: {
 83    id: string | null;
 84    kind: string;
 85    payload: Record<string, unknown> | null;
 86  };
 87}
 88
 89export interface RenewalShouldRenewDecision {
 90  eligible: boolean;
 91  existingJobId?: string | null;
 92  reason: "eligible" | RenewalProjectorSkipReason;
 93  routeUnavailableReason?: RenewalRouteUnavailableReason;
 94}
 95
 96export interface RenewalProjectorRunnerOptions {
 97  cursorStateKey?: string;
 98  now?: () => number;
 99  repository: Pick<ControlPlaneRepository, "getSystemState" | "putSystemState">;
100}
101
102interface RenewalCandidate {
103  conversation: LocalConversationRecord;
104  link: ConversationLinkRecord;
105  message: MessageRecord;
106}
107
108interface RenewalCandidateResolution {
109  candidate: RenewalCandidate | null;
110  reason: RenewalProjectorSkipReason | null;
111}
112
113interface CursorStateValue {
114  message_id: string;
115  observed_at: number;
116}
117
118interface RenewalRouteAvailabilityResult {
119  available: boolean;
120  reason: RenewalRouteUnavailableReason | null;
121}
122
123export function createRenewalProjectorRunner(
124  options: RenewalProjectorRunnerOptions
125): TimedJobRunner {
126  return {
127    name: RUNNER_NAME,
128    async run(context) {
129      return runRenewalProjector(context, options);
130    }
131  };
132}
133
134export async function runRenewalProjector(
135  context: TimedJobTickContext,
136  options: RenewalProjectorRunnerOptions
137): Promise<TimedJobRunnerResult> {
138  const artifactStore = context.artifactStore;
139  const now = options.now ?? (() => Date.now());
140
141  if (artifactStore == null) {
142    context.log({
143      stage: "scan_skipped",
144      result: "no_artifact_store"
145    });
146    return {
147      details: {
148        cursor_after: null,
149        cursor_before: null,
150        projected_jobs: 0,
151        scanned_messages: 0,
152        skipped_messages: 0
153      },
154      result: "no_artifact_store"
155    };
156  }
157
158  const nowMs = now();
159  const settleCutoff = Math.max(0, nowMs - context.settleDelayMs);
160  const cursorStateKey = normalizeCursorStateKey(options.cursorStateKey);
161  const cursorBefore = await loadCursor(options.repository, cursorStateKey);
162
163  context.log({
164    stage: "scan_window",
165    result: "ok",
166    details: {
167      cursor_before: formatCursor(cursorBefore),
168      settle_cutoff: settleCutoff
169    }
170  });
171
172  const messages = await artifactStore.scanMessages({
173    after: cursorBefore,
174    limit: context.maxMessagesPerTick,
175    observedAtLte: settleCutoff,
176    role: DEFAULT_MESSAGE_ROLE
177  });
178
179  if (messages.length === 0) {
180    context.log({
181      stage: "scan_completed",
182      result: "idle",
183      details: {
184        cursor_after: formatCursor(cursorBefore),
185        cursor_before: formatCursor(cursorBefore),
186        projected_jobs: 0,
187        scanned_messages: 0,
188        skipped_messages: 0
189      }
190    });
191    return {
192      details: {
193        cursor_after: formatCursor(cursorBefore),
194        cursor_before: formatCursor(cursorBefore),
195        projected_jobs: 0,
196        scanned_messages: 0,
197        skipped_messages: 0
198      },
199      result: "idle"
200    };
201  }
202
203  let cursorAfter = cursorBefore;
204  let projectedJobs = 0;
205  let scannedMessages = 0;
206  let skippedMessages = 0;
207  let limitHit = false;
208
209  for (const message of messages) {
210    scannedMessages += 1;
211    cursorAfter = {
212      id: message.id,
213      observedAt: message.observedAt
214    };
215
216    const resolution = await resolveRenewalCandidate(artifactStore, message);
217
218    if (resolution.candidate == null) {
219      skippedMessages += 1;
220      context.log({
221        stage: "message_skipped",
222        result: resolution.reason ?? "missing_local_conversation",
223        details: {
224          cursor: formatCursor(cursorAfter),
225          message_id: message.id,
226          platform: message.platform
227        }
228      });
229      continue;
230    }
231
232    const decision = await shouldRenew({
233      candidate: resolution.candidate,
234      now: nowMs,
235      store: artifactStore
236    });
237
238    if (!decision.eligible) {
239      if (decision.reason === "route_unavailable") {
240        await recordAutomationFailureSignal({
241          conversation: resolution.candidate.conversation,
242          errorMessage: `route_unavailable:${decision.routeUnavailableReason ?? "unknown"}`,
243          observedAt: nowMs,
244          store: artifactStore
245        });
246      }
247      skippedMessages += 1;
248      context.log({
249        stage: "message_skipped",
250        result: decision.reason,
251        details: {
252          cursor: formatCursor(cursorAfter),
253          existing_job_id: decision.existingJobId ?? null,
254          local_conversation_id: resolution.candidate.conversation.localConversationId,
255          message_id: message.id,
256          ...(decision.routeUnavailableReason == null
257            ? {}
258            : {
259                route_unavailable_reason: decision.routeUnavailableReason
260              })
261        }
262      });
263      continue;
264    }
265
266    const payload = buildRenewalPayload(message, artifactStore, resolution.candidate.link.pageUrl);
267    const updatedConversation = await recordRenewalPayloadSignal({
268      conversation: resolution.candidate.conversation,
269      observedAt: nowMs,
270      payloadText: payload.text,
271      store: artifactStore
272    });
273
274    if (updatedConversation.automationStatus === "paused" && updatedConversation.pauseReason === "repeated_renewal") {
275      skippedMessages += 1;
276      context.log({
277        stage: "message_skipped",
278        result: "repeated_renewal",
279        details: {
280          cursor: formatCursor(cursorAfter),
281          local_conversation_id: updatedConversation.localConversationId,
282          message_id: message.id
283        }
284      });
285      continue;
286    }
287
288    try {
289      const job = await artifactStore.insertRenewalJob({
290        jobId: buildRenewalJobId(message.id),
291        localConversationId: updatedConversation.localConversationId,
292        logPath: buildRunnerLogPath(context.logDir, nowMs),
293        messageId: message.id,
294        nextAttemptAt: nowMs,
295        payload: JSON.stringify(payload),
296        payloadKind: "json",
297        targetSnapshot: buildRenewalTargetSnapshot(resolution.candidate.link)
298      });
299
300      projectedJobs += 1;
301      context.log({
302        stage: "job_projected",
303        result: "projected",
304        details: {
305          cursor: formatCursor(cursorAfter),
306          job_id: job.jobId,
307          local_conversation_id: job.localConversationId,
308          message_id: job.messageId
309        }
310      });
311    } catch (error) {
312      if (isDuplicateRenewalJobError(error)) {
313        skippedMessages += 1;
314        context.log({
315          stage: "message_skipped",
316          result: "duplicate_job",
317          details: {
318            cursor: formatCursor(cursorAfter),
319            local_conversation_id: resolution.candidate.conversation.localConversationId,
320            message_id: message.id
321          }
322        });
323        continue;
324      }
325
326      throw error;
327    }
328
329    if (projectedJobs >= context.maxTasksPerTick) {
330      limitHit = true;
331      break;
332    }
333  }
334
335  if (!equalCursor(cursorBefore, cursorAfter) && cursorAfter != null) {
336    await saveCursor(options.repository, cursorStateKey, cursorAfter, nowMs);
337  }
338
339  const result = limitHit ? "task_limit_reached" : "ok";
340  const details = {
341    cursor_after: formatCursor(cursorAfter),
342    cursor_before: formatCursor(cursorBefore),
343    projected_jobs: projectedJobs,
344    scanned_messages: scannedMessages,
345    skipped_messages: skippedMessages
346  };
347
348  context.log({
349    stage: "scan_completed",
350    result,
351    details
352  });
353
354  return {
355    details,
356    result
357  };
358}
359
360export async function shouldRenew(input: {
361  candidate: RenewalCandidate;
362  now: number;
363  store: Pick<ArtifactStore, "listRenewalJobs">;
364}): Promise<RenewalShouldRenewDecision> {
365  const { candidate } = input;
366
367  if (hasBaaInstructionBlocks(candidate.message.rawText)) {
368    return {
369      eligible: false,
370      reason: "instruction_message"
371    };
372  }
373
374  const automationStatus = candidate.conversation.automationStatus;
375
376  if (automationStatus === "manual") {
377    return {
378      eligible: false,
379      reason: "automation_manual"
380    };
381  }
382
383  if (automationStatus === "paused") {
384    return {
385      eligible: false,
386      reason: "automation_paused"
387    };
388  }
389
390  if (
391    candidate.conversation.cooldownUntil != null
392    && candidate.conversation.cooldownUntil > input.now
393  ) {
394    return {
395      eligible: false,
396      reason: "cooldown_active"
397    };
398  }
399
400  const routeAvailability = hasAvailableRoute(candidate.link);
401
402  if (!routeAvailability.available) {
403    return {
404      eligible: false,
405      reason: "route_unavailable",
406      routeUnavailableReason: routeAvailability.reason ?? undefined
407    };
408  }
409
410  const existingJobs = await input.store.listRenewalJobs({
411    limit: 1,
412    messageId: candidate.message.id
413  });
414
415  if (existingJobs[0] != null) {
416    return {
417      eligible: false,
418      existingJobId: existingJobs[0].jobId,
419      reason: "duplicate_job"
420    };
421  }
422
423  return {
424    eligible: true,
425    reason: "eligible"
426  };
427}
428
429export function buildRenewalPayload(
430  message: MessageRecord,
431  store: Pick<ArtifactStore, "getPublicBaseUrl">,
432  fallbackLinkUrl: string | null = null
433): RenewalProjectorPayload {
434  const artifactUrl = buildArtifactPublicUrl(store.getPublicBaseUrl(), message.staticPath);
435  const linkUrl = normalizeOptionalString(message.pageUrl)
436    ?? normalizeOptionalString(fallbackLinkUrl)
437    ?? artifactUrl;
438  const summary = truncateText(message.summary ?? message.rawText, DEFAULT_SUMMARY_LIMIT);
439  const lines = [`[renewal] Context summary: ${summary}`];
440
441  if (linkUrl != null) {
442    lines.push(`Conversation link: ${linkUrl}`);
443  } else {
444    lines.push(`Message id: ${message.id}`);
445  }
446
447  return {
448    kind: "renewal.message",
449    linkUrl,
450    sourceMessage: {
451      artifactUrl,
452      conversationId: message.conversationId,
453      id: message.id,
454      observedAt: message.observedAt,
455      platform: message.platform,
456      summary
457    },
458    summary,
459    template: DEFAULT_PAYLOAD_TEMPLATE,
460    text: lines.join("\n"),
461    version: 1
462  };
463}
464
465export function buildRenewalTargetSnapshot(
466  link: ConversationLinkRecord
467): RenewalProjectorTargetSnapshot {
468  return {
469    clientId: link.clientId,
470    linkId: link.linkId,
471    pageTitle: link.pageTitle,
472    pageUrl: link.pageUrl,
473    platform: link.platform,
474    route: {
475      params: parseJsonRecord(link.routeParams),
476      path: link.routePath,
477      pattern: link.routePattern
478    },
479    target: {
480      id: link.targetId,
481      kind: normalizeRequiredString(link.targetKind, "link.targetKind"),
482      payload: parseJsonRecord(link.targetPayload)
483    }
484  };
485}
486
487async function resolveRenewalCandidate(
488  store: Pick<ArtifactStore, "getLocalConversation" | "listConversationLinks">,
489  message: MessageRecord
490): Promise<RenewalCandidateResolution> {
491  const remoteConversationId = normalizeOptionalString(message.conversationId);
492
493  if (remoteConversationId == null) {
494    return {
495      candidate: null,
496      reason: "missing_remote_conversation_id"
497    };
498  }
499
500  const links = await store.listConversationLinks({
501    isActive: true,
502    limit: 1,
503    platform: message.platform,
504    remoteConversationId
505  });
506  const link = links[0] ?? null;
507
508  if (link == null) {
509    return {
510      candidate: null,
511      reason: "missing_active_link"
512    };
513  }
514
515  const conversation = await store.getLocalConversation(link.localConversationId);
516
517  if (conversation == null) {
518    return {
519      candidate: null,
520      reason: "missing_local_conversation"
521    };
522  }
523
524  return {
525    candidate: {
526      conversation,
527      link,
528      message
529    },
530    reason: null
531  };
532}
533
534function buildRenewalJobId(messageId: string): string {
535  return `renewal_${normalizeRequiredString(messageId, "messageId")}`;
536}
537
538function buildRunnerLogPath(logDir: string | null, now: number): string | null {
539  if (logDir == null) {
540    return null;
541  }
542
543  return join(logDir, `${new Date(now).toISOString().slice(0, 10)}.jsonl`);
544}
545
546function equalCursor(
547  left: RenewalProjectorCursor | null,
548  right: RenewalProjectorCursor | null
549): boolean {
550  return left?.id === right?.id && left?.observedAt === right?.observedAt;
551}
552
553function formatCursor(cursor: RenewalProjectorCursor | null): string | null {
554  if (cursor == null) {
555    return null;
556  }
557
558  return `message:${cursor.observedAt}:${cursor.id}`;
559}
560
561function hasAvailableRoute(link: ConversationLinkRecord): RenewalRouteAvailabilityResult {
562  if (link.isActive !== true) {
563    return {
564      available: false,
565      reason: "inactive_link"
566    };
567  }
568
569  if (normalizeOptionalString(link.targetKind) !== "browser.proxy_delivery") {
570    return {
571      available: false,
572      reason: "target_kind_not_proxy_delivery"
573    };
574  }
575
576  const targetPayload = parseJsonRecord(link.targetPayload);
577  const shellPage = targetPayload?.shellPage === true;
578  const conversationId =
579    normalizeOptionalString(typeof targetPayload?.conversationId === "string" ? targetPayload.conversationId : null)
580    ?? normalizeOptionalString(link.remoteConversationId);
581  const pageUrl =
582    normalizeOptionalString(typeof targetPayload?.pageUrl === "string" ? targetPayload.pageUrl : null)
583    ?? normalizeOptionalString(link.pageUrl);
584  const tabId =
585    (typeof targetPayload?.tabId === "number" && Number.isInteger(targetPayload.tabId) && targetPayload.tabId > 0)
586      ? targetPayload.tabId
587      : parseTargetTabId(link.targetId);
588
589  if (shellPage) {
590    return {
591      available: false,
592      reason: "shell_page"
593    };
594  }
595
596  if (conversationId == null && pageUrl == null && tabId == null) {
597    return {
598      available: false,
599      reason: "missing_delivery_context"
600    };
601  }
602
603  return {
604    available: true,
605    reason: null
606  };
607}
608
609function hasBaaInstructionBlocks(text: string): boolean {
610  try {
611    return extractBaaInstructionBlocks(text).length > 0;
612  } catch {
613    return true;
614  }
615}
616
617function isCursorStateValue(value: unknown): value is CursorStateValue {
618  return (
619    typeof value === "object"
620    && value != null
621    && typeof Reflect.get(value, "message_id") === "string"
622    && Number.isInteger(Reflect.get(value, "observed_at"))
623  );
624}
625
626function isDuplicateRenewalJobError(error: unknown): boolean {
627  return (
628    error instanceof Error
629    && error.message.includes("UNIQUE constraint failed: renewal_jobs.message_id")
630  );
631}
632
633async function loadCursor(
634  repository: Pick<ControlPlaneRepository, "getSystemState">,
635  cursorStateKey: string
636): Promise<RenewalProjectorCursor | null> {
637  const state = await repository.getSystemState(cursorStateKey);
638
639  if (state == null) {
640    return null;
641  }
642
643  const parsed = parseJsonValue(state.valueJson);
644
645  if (!isCursorStateValue(parsed)) {
646    return null;
647  }
648
649  return {
650    id: parsed.message_id,
651    observedAt: parsed.observed_at
652  };
653}
654
655function normalizeCursorStateKey(value: string | undefined): string {
656  return normalizeRequiredString(value ?? DEFAULT_CURSOR_STATE_KEY, "cursorStateKey");
657}
658
659function parseJsonRecord(value: string | null): Record<string, unknown> | null {
660  const parsed = parseJsonValue(value);
661  return isPlainRecord(parsed) ? parsed : null;
662}
663
664function parseTargetTabId(value: string | null): number | null {
665  const normalized = normalizeOptionalString(value);
666
667  if (normalized == null) {
668    return null;
669  }
670
671  const matched = TAB_TARGET_ID_PATTERN.exec(normalized);
672
673  if (matched == null) {
674    return null;
675  }
676
677  const parsed = Number.parseInt(matched[1] ?? "", 10);
678  return Number.isInteger(parsed) && parsed > 0 ? parsed : null;
679}
680
681async function saveCursor(
682  repository: Pick<ControlPlaneRepository, "putSystemState">,
683  cursorStateKey: string,
684  cursor: RenewalProjectorCursor,
685  now: number
686): Promise<void> {
687  await repository.putSystemState({
688    stateKey: cursorStateKey,
689    updatedAt: now,
690    valueJson: JSON.stringify({
691      message_id: cursor.id,
692      observed_at: cursor.observedAt
693    })
694  });
695}
696
697function truncateText(text: string, limit: number): string {
698  if (text.length <= limit) {
699    return text;
700  }
701
702  return `${text.slice(0, Math.max(0, limit - 3))}...`;
703}