im_wower
·
2026-04-03
projector.ts
1import { join } from "node:path";
2
3import {
4 buildArtifactPublicUrl,
5 type ArtifactStore,
6 type ConversationLinkRecord,
7 type LocalConversationRecord,
8 type MessageRecord,
9 type MessageScanCursor
10} from "../../../../packages/artifact-db/dist/index.js";
11import type { ControlPlaneRepository } from "../../../../packages/db/dist/index.js";
12
13import type { TimedJobRunner, TimedJobRunnerResult, TimedJobTickContext } from "../timed-jobs/index.js";
14import { extractBaaInstructionBlocks } from "../instructions/extract.js";
15
16import {
17 recordAutomationFailureSignal,
18 recordRenewalPayloadSignal
19} from "./automation.js";
20import {
21 isPlainRecord,
22 normalizeOptionalString,
23 normalizeRequiredString,
24 parseJsonValue
25} from "./utils.js";
26
27const DEFAULT_CURSOR_STATE_KEY = "renewal.projector.cursor";
28const DEFAULT_MESSAGE_ROLE = "assistant";
29const DEFAULT_PAYLOAD_TEMPLATE = "summary_with_link";
30const DEFAULT_SUMMARY_LIMIT = 200;
31const RUNNER_NAME = "renewal.projector";
32const TAB_TARGET_ID_PATTERN = /^tab:(\d+)$/u;
33
34export type RenewalProjectorSkipReason =
35 | "automation_manual"
36 | "automation_paused"
37 | "cooldown_active"
38 | "duplicate_job"
39 | "instruction_message"
40 | "missing_active_link"
41 | "missing_local_conversation"
42 | "missing_remote_conversation_id"
43 | "repeated_renewal"
44 | "route_unavailable";
45
46export type RenewalRouteUnavailableReason =
47 | "inactive_link"
48 | "missing_delivery_context"
49 | "shell_page"
50 | "target_kind_not_proxy_delivery";
51
52export interface RenewalProjectorCursor extends MessageScanCursor {}
53
54export interface RenewalProjectorPayload {
55 kind: "renewal.message";
56 linkUrl: string | null;
57 sourceMessage: {
58 artifactUrl: string | null;
59 conversationId: string | null;
60 id: string;
61 observedAt: number;
62 platform: string;
63 summary: string;
64 };
65 summary: string;
66 template: string;
67 text: string;
68 version: 1;
69}
70
71export interface RenewalProjectorTargetSnapshot {
72 clientId: string | null;
73 linkId: string;
74 pageTitle: string | null;
75 pageUrl: string | null;
76 platform: string;
77 route: {
78 params: Record<string, unknown> | null;
79 path: string | null;
80 pattern: string | null;
81 };
82 target: {
83 id: string | null;
84 kind: string;
85 payload: Record<string, unknown> | null;
86 };
87}
88
89export interface RenewalShouldRenewDecision {
90 eligible: boolean;
91 existingJobId?: string | null;
92 reason: "eligible" | RenewalProjectorSkipReason;
93 routeUnavailableReason?: RenewalRouteUnavailableReason;
94}
95
96export interface RenewalProjectorRunnerOptions {
97 cursorStateKey?: string;
98 now?: () => number;
99 repository: Pick<ControlPlaneRepository, "getSystemState" | "putSystemState">;
100}
101
102interface RenewalCandidate {
103 conversation: LocalConversationRecord;
104 link: ConversationLinkRecord;
105 message: MessageRecord;
106}
107
108interface RenewalCandidateResolution {
109 candidate: RenewalCandidate | null;
110 reason: RenewalProjectorSkipReason | null;
111}
112
113interface CursorStateValue {
114 message_id: string;
115 observed_at: number;
116}
117
118interface RenewalRouteAvailabilityResult {
119 available: boolean;
120 reason: RenewalRouteUnavailableReason | null;
121}
122
123export function createRenewalProjectorRunner(
124 options: RenewalProjectorRunnerOptions
125): TimedJobRunner {
126 return {
127 name: RUNNER_NAME,
128 async run(context) {
129 return runRenewalProjector(context, options);
130 }
131 };
132}
133
134export async function runRenewalProjector(
135 context: TimedJobTickContext,
136 options: RenewalProjectorRunnerOptions
137): Promise<TimedJobRunnerResult> {
138 const artifactStore = context.artifactStore;
139 const now = options.now ?? (() => Date.now());
140
141 if (artifactStore == null) {
142 context.log({
143 stage: "scan_skipped",
144 result: "no_artifact_store"
145 });
146 return {
147 details: {
148 cursor_after: null,
149 cursor_before: null,
150 projected_jobs: 0,
151 scanned_messages: 0,
152 skipped_messages: 0
153 },
154 result: "no_artifact_store"
155 };
156 }
157
158 const nowMs = now();
159 const settleCutoff = Math.max(0, nowMs - context.settleDelayMs);
160 const cursorStateKey = normalizeCursorStateKey(options.cursorStateKey);
161 const cursorBefore = await loadCursor(options.repository, cursorStateKey);
162
163 context.log({
164 stage: "scan_window",
165 result: "ok",
166 details: {
167 cursor_before: formatCursor(cursorBefore),
168 settle_cutoff: settleCutoff
169 }
170 });
171
172 const messages = await artifactStore.scanMessages({
173 after: cursorBefore,
174 limit: context.maxMessagesPerTick,
175 observedAtLte: settleCutoff,
176 role: DEFAULT_MESSAGE_ROLE
177 });
178
179 if (messages.length === 0) {
180 context.log({
181 stage: "scan_completed",
182 result: "idle",
183 details: {
184 cursor_after: formatCursor(cursorBefore),
185 cursor_before: formatCursor(cursorBefore),
186 projected_jobs: 0,
187 scanned_messages: 0,
188 skipped_messages: 0
189 }
190 });
191 return {
192 details: {
193 cursor_after: formatCursor(cursorBefore),
194 cursor_before: formatCursor(cursorBefore),
195 projected_jobs: 0,
196 scanned_messages: 0,
197 skipped_messages: 0
198 },
199 result: "idle"
200 };
201 }
202
203 let cursorAfter = cursorBefore;
204 let projectedJobs = 0;
205 let scannedMessages = 0;
206 let skippedMessages = 0;
207 let limitHit = false;
208
209 for (const message of messages) {
210 scannedMessages += 1;
211 cursorAfter = {
212 id: message.id,
213 observedAt: message.observedAt
214 };
215
216 const resolution = await resolveRenewalCandidate(artifactStore, message);
217
218 if (resolution.candidate == null) {
219 skippedMessages += 1;
220 context.log({
221 stage: "message_skipped",
222 result: resolution.reason ?? "missing_local_conversation",
223 details: {
224 cursor: formatCursor(cursorAfter),
225 message_id: message.id,
226 platform: message.platform
227 }
228 });
229 continue;
230 }
231
232 const decision = await shouldRenew({
233 candidate: resolution.candidate,
234 now: nowMs,
235 store: artifactStore
236 });
237
238 if (!decision.eligible) {
239 if (decision.reason === "route_unavailable") {
240 await recordAutomationFailureSignal({
241 conversation: resolution.candidate.conversation,
242 errorMessage: `route_unavailable:${decision.routeUnavailableReason ?? "unknown"}`,
243 observedAt: nowMs,
244 store: artifactStore
245 });
246 }
247 skippedMessages += 1;
248 context.log({
249 stage: "message_skipped",
250 result: decision.reason,
251 details: {
252 cursor: formatCursor(cursorAfter),
253 existing_job_id: decision.existingJobId ?? null,
254 local_conversation_id: resolution.candidate.conversation.localConversationId,
255 message_id: message.id,
256 ...(decision.routeUnavailableReason == null
257 ? {}
258 : {
259 route_unavailable_reason: decision.routeUnavailableReason
260 })
261 }
262 });
263 continue;
264 }
265
266 const payload = buildRenewalPayload(message, artifactStore, resolution.candidate.link.pageUrl);
267 const updatedConversation = await recordRenewalPayloadSignal({
268 conversation: resolution.candidate.conversation,
269 observedAt: nowMs,
270 payloadText: payload.text,
271 store: artifactStore
272 });
273
274 if (updatedConversation.automationStatus === "paused" && updatedConversation.pauseReason === "repeated_renewal") {
275 skippedMessages += 1;
276 context.log({
277 stage: "message_skipped",
278 result: "repeated_renewal",
279 details: {
280 cursor: formatCursor(cursorAfter),
281 local_conversation_id: updatedConversation.localConversationId,
282 message_id: message.id
283 }
284 });
285 continue;
286 }
287
288 try {
289 const job = await artifactStore.insertRenewalJob({
290 jobId: buildRenewalJobId(message.id),
291 localConversationId: updatedConversation.localConversationId,
292 logPath: buildRunnerLogPath(context.logDir, nowMs),
293 messageId: message.id,
294 nextAttemptAt: nowMs,
295 payload: JSON.stringify(payload),
296 payloadKind: "json",
297 targetSnapshot: buildRenewalTargetSnapshot(resolution.candidate.link)
298 });
299
300 projectedJobs += 1;
301 context.log({
302 stage: "job_projected",
303 result: "projected",
304 details: {
305 cursor: formatCursor(cursorAfter),
306 job_id: job.jobId,
307 local_conversation_id: job.localConversationId,
308 message_id: job.messageId
309 }
310 });
311 } catch (error) {
312 if (isDuplicateRenewalJobError(error)) {
313 skippedMessages += 1;
314 context.log({
315 stage: "message_skipped",
316 result: "duplicate_job",
317 details: {
318 cursor: formatCursor(cursorAfter),
319 local_conversation_id: resolution.candidate.conversation.localConversationId,
320 message_id: message.id
321 }
322 });
323 continue;
324 }
325
326 throw error;
327 }
328
329 if (projectedJobs >= context.maxTasksPerTick) {
330 limitHit = true;
331 break;
332 }
333 }
334
335 if (!equalCursor(cursorBefore, cursorAfter) && cursorAfter != null) {
336 await saveCursor(options.repository, cursorStateKey, cursorAfter, nowMs);
337 }
338
339 const result = limitHit ? "task_limit_reached" : "ok";
340 const details = {
341 cursor_after: formatCursor(cursorAfter),
342 cursor_before: formatCursor(cursorBefore),
343 projected_jobs: projectedJobs,
344 scanned_messages: scannedMessages,
345 skipped_messages: skippedMessages
346 };
347
348 context.log({
349 stage: "scan_completed",
350 result,
351 details
352 });
353
354 return {
355 details,
356 result
357 };
358}
359
360export async function shouldRenew(input: {
361 candidate: RenewalCandidate;
362 now: number;
363 store: Pick<ArtifactStore, "listRenewalJobs">;
364}): Promise<RenewalShouldRenewDecision> {
365 const { candidate } = input;
366
367 if (hasBaaInstructionBlocks(candidate.message.rawText)) {
368 return {
369 eligible: false,
370 reason: "instruction_message"
371 };
372 }
373
374 const automationStatus = candidate.conversation.automationStatus;
375
376 if (automationStatus === "manual") {
377 return {
378 eligible: false,
379 reason: "automation_manual"
380 };
381 }
382
383 if (automationStatus === "paused") {
384 return {
385 eligible: false,
386 reason: "automation_paused"
387 };
388 }
389
390 if (
391 candidate.conversation.cooldownUntil != null
392 && candidate.conversation.cooldownUntil > input.now
393 ) {
394 return {
395 eligible: false,
396 reason: "cooldown_active"
397 };
398 }
399
400 const routeAvailability = hasAvailableRoute(candidate.link);
401
402 if (!routeAvailability.available) {
403 return {
404 eligible: false,
405 reason: "route_unavailable",
406 routeUnavailableReason: routeAvailability.reason ?? undefined
407 };
408 }
409
410 const existingJobs = await input.store.listRenewalJobs({
411 limit: 1,
412 messageId: candidate.message.id
413 });
414
415 if (existingJobs[0] != null) {
416 return {
417 eligible: false,
418 existingJobId: existingJobs[0].jobId,
419 reason: "duplicate_job"
420 };
421 }
422
423 return {
424 eligible: true,
425 reason: "eligible"
426 };
427}
428
429export function buildRenewalPayload(
430 message: MessageRecord,
431 store: Pick<ArtifactStore, "getPublicBaseUrl">,
432 fallbackLinkUrl: string | null = null
433): RenewalProjectorPayload {
434 const artifactUrl = buildArtifactPublicUrl(store.getPublicBaseUrl(), message.staticPath);
435 const linkUrl = normalizeOptionalString(message.pageUrl)
436 ?? normalizeOptionalString(fallbackLinkUrl)
437 ?? artifactUrl;
438 const summary = truncateText(message.summary ?? message.rawText, DEFAULT_SUMMARY_LIMIT);
439 const lines = [`[renewal] Context summary: ${summary}`];
440
441 if (linkUrl != null) {
442 lines.push(`Conversation link: ${linkUrl}`);
443 } else {
444 lines.push(`Message id: ${message.id}`);
445 }
446
447 return {
448 kind: "renewal.message",
449 linkUrl,
450 sourceMessage: {
451 artifactUrl,
452 conversationId: message.conversationId,
453 id: message.id,
454 observedAt: message.observedAt,
455 platform: message.platform,
456 summary
457 },
458 summary,
459 template: DEFAULT_PAYLOAD_TEMPLATE,
460 text: lines.join("\n"),
461 version: 1
462 };
463}
464
465export function buildRenewalTargetSnapshot(
466 link: ConversationLinkRecord
467): RenewalProjectorTargetSnapshot {
468 return {
469 clientId: link.clientId,
470 linkId: link.linkId,
471 pageTitle: link.pageTitle,
472 pageUrl: link.pageUrl,
473 platform: link.platform,
474 route: {
475 params: parseJsonRecord(link.routeParams),
476 path: link.routePath,
477 pattern: link.routePattern
478 },
479 target: {
480 id: link.targetId,
481 kind: normalizeRequiredString(link.targetKind, "link.targetKind"),
482 payload: parseJsonRecord(link.targetPayload)
483 }
484 };
485}
486
487async function resolveRenewalCandidate(
488 store: Pick<ArtifactStore, "getLocalConversation" | "listConversationLinks">,
489 message: MessageRecord
490): Promise<RenewalCandidateResolution> {
491 const remoteConversationId = normalizeOptionalString(message.conversationId);
492
493 if (remoteConversationId == null) {
494 return {
495 candidate: null,
496 reason: "missing_remote_conversation_id"
497 };
498 }
499
500 const links = await store.listConversationLinks({
501 isActive: true,
502 limit: 1,
503 platform: message.platform,
504 remoteConversationId
505 });
506 const link = links[0] ?? null;
507
508 if (link == null) {
509 return {
510 candidate: null,
511 reason: "missing_active_link"
512 };
513 }
514
515 const conversation = await store.getLocalConversation(link.localConversationId);
516
517 if (conversation == null) {
518 return {
519 candidate: null,
520 reason: "missing_local_conversation"
521 };
522 }
523
524 return {
525 candidate: {
526 conversation,
527 link,
528 message
529 },
530 reason: null
531 };
532}
533
534function buildRenewalJobId(messageId: string): string {
535 return `renewal_${normalizeRequiredString(messageId, "messageId")}`;
536}
537
538function buildRunnerLogPath(logDir: string | null, now: number): string | null {
539 if (logDir == null) {
540 return null;
541 }
542
543 return join(logDir, `${new Date(now).toISOString().slice(0, 10)}.jsonl`);
544}
545
546function equalCursor(
547 left: RenewalProjectorCursor | null,
548 right: RenewalProjectorCursor | null
549): boolean {
550 return left?.id === right?.id && left?.observedAt === right?.observedAt;
551}
552
553function formatCursor(cursor: RenewalProjectorCursor | null): string | null {
554 if (cursor == null) {
555 return null;
556 }
557
558 return `message:${cursor.observedAt}:${cursor.id}`;
559}
560
561function hasAvailableRoute(link: ConversationLinkRecord): RenewalRouteAvailabilityResult {
562 if (link.isActive !== true) {
563 return {
564 available: false,
565 reason: "inactive_link"
566 };
567 }
568
569 if (normalizeOptionalString(link.targetKind) !== "browser.proxy_delivery") {
570 return {
571 available: false,
572 reason: "target_kind_not_proxy_delivery"
573 };
574 }
575
576 const targetPayload = parseJsonRecord(link.targetPayload);
577 const shellPage = targetPayload?.shellPage === true;
578 const conversationId =
579 normalizeOptionalString(typeof targetPayload?.conversationId === "string" ? targetPayload.conversationId : null)
580 ?? normalizeOptionalString(link.remoteConversationId);
581 const pageUrl =
582 normalizeOptionalString(typeof targetPayload?.pageUrl === "string" ? targetPayload.pageUrl : null)
583 ?? normalizeOptionalString(link.pageUrl);
584 const tabId =
585 (typeof targetPayload?.tabId === "number" && Number.isInteger(targetPayload.tabId) && targetPayload.tabId > 0)
586 ? targetPayload.tabId
587 : parseTargetTabId(link.targetId);
588
589 if (shellPage) {
590 return {
591 available: false,
592 reason: "shell_page"
593 };
594 }
595
596 if (conversationId == null && pageUrl == null && tabId == null) {
597 return {
598 available: false,
599 reason: "missing_delivery_context"
600 };
601 }
602
603 return {
604 available: true,
605 reason: null
606 };
607}
608
609function hasBaaInstructionBlocks(text: string): boolean {
610 try {
611 return extractBaaInstructionBlocks(text).length > 0;
612 } catch {
613 return true;
614 }
615}
616
617function isCursorStateValue(value: unknown): value is CursorStateValue {
618 return (
619 typeof value === "object"
620 && value != null
621 && typeof Reflect.get(value, "message_id") === "string"
622 && Number.isInteger(Reflect.get(value, "observed_at"))
623 );
624}
625
626function isDuplicateRenewalJobError(error: unknown): boolean {
627 return (
628 error instanceof Error
629 && error.message.includes("UNIQUE constraint failed: renewal_jobs.message_id")
630 );
631}
632
633async function loadCursor(
634 repository: Pick<ControlPlaneRepository, "getSystemState">,
635 cursorStateKey: string
636): Promise<RenewalProjectorCursor | null> {
637 const state = await repository.getSystemState(cursorStateKey);
638
639 if (state == null) {
640 return null;
641 }
642
643 const parsed = parseJsonValue(state.valueJson);
644
645 if (!isCursorStateValue(parsed)) {
646 return null;
647 }
648
649 return {
650 id: parsed.message_id,
651 observedAt: parsed.observed_at
652 };
653}
654
655function normalizeCursorStateKey(value: string | undefined): string {
656 return normalizeRequiredString(value ?? DEFAULT_CURSOR_STATE_KEY, "cursorStateKey");
657}
658
659function parseJsonRecord(value: string | null): Record<string, unknown> | null {
660 const parsed = parseJsonValue(value);
661 return isPlainRecord(parsed) ? parsed : null;
662}
663
664function parseTargetTabId(value: string | null): number | null {
665 const normalized = normalizeOptionalString(value);
666
667 if (normalized == null) {
668 return null;
669 }
670
671 const matched = TAB_TARGET_ID_PATTERN.exec(normalized);
672
673 if (matched == null) {
674 return null;
675 }
676
677 const parsed = Number.parseInt(matched[1] ?? "", 10);
678 return Number.isInteger(parsed) && parsed > 0 ? parsed : null;
679}
680
681async function saveCursor(
682 repository: Pick<ControlPlaneRepository, "putSystemState">,
683 cursorStateKey: string,
684 cursor: RenewalProjectorCursor,
685 now: number
686): Promise<void> {
687 await repository.putSystemState({
688 stateKey: cursorStateKey,
689 updatedAt: now,
690 valueJson: JSON.stringify({
691 message_id: cursor.id,
692 observed_at: cursor.observedAt
693 })
694 });
695}
696
697function truncateText(text: string, limit: number): string {
698 if (text.length <= limit) {
699 return text;
700 }
701
702 return `${text.slice(0, Math.max(0, limit - 3))}...`;
703}