im_wower
·
2026-04-03
dispatcher.ts
1import { join } from "node:path";
2
3import type {
4 ArtifactStore,
5 ConversationLinkRecord,
6 LocalConversationRecord,
7 RenewalJobPayloadKind,
8 RenewalJobRecord
9} from "../../../../packages/artifact-db/dist/index.js";
10
11import type {
12 BrowserBridgeActionDispatch,
13 BrowserBridgeActionResultSnapshot,
14 BrowserBridgeController,
15 BrowserBridgeDeliveryAckSnapshot
16} from "../browser-types.js";
17import type { TimedJobRunner, TimedJobRunnerResult, TimedJobTickContext } from "../timed-jobs/index.js";
18import { DEFAULT_RENEWAL_EXECUTION_TIMEOUT_MS } from "../execution-timeouts.js";
19
20import {
21 buildRenewalTargetSnapshot,
22 type RenewalProjectorPayload,
23 type RenewalProjectorTargetSnapshot
24} from "./projector.js";
25import {
26 recordAutomationFailureSignal,
27 recordAutomationSuccessSignal
28} from "./automation.js";
29import {
30 isPlainRecord,
31 normalizeOptionalString,
32 parseJsonValue
33} from "./utils.js";
34
35const DEFAULT_RECHECK_DELAY_MS = 10_000;
36const DEFAULT_INTER_JOB_JITTER_MIN_MS = 500;
37const DEFAULT_INTER_JOB_JITTER_MAX_MS = 3_000;
38const DEFAULT_RETRY_BASE_DELAY_MS = 30_000;
39const DEFAULT_RETRY_JITTER_FACTOR = 0.3;
40const DEFAULT_RETRY_MAX_DELAY_MS = 5 * 60_000;
41const CHATGPT_COLD_START_RETRY_BASE_DELAY_MS = 5_000;
42const CHATGPT_COLD_START_RETRY_MAX_DELAY_MS = 30_000;
43const DEFAULT_SUCCESS_COOLDOWN_MS = 60_000;
44const CHATGPT_PLATFORM = "chatgpt";
45const PROXY_DELIVERY_TARGET_KIND = "browser.proxy_delivery";
46const RUNNER_NAME = "renewal.dispatcher";
47const TAB_TARGET_ID_PATTERN = /^tab:(\d+)$/u;
48
49type RenewalDispatcherJobResult =
50 | "attempt_failed"
51 | "attempt_started"
52 | "attempt_succeeded"
53 | "automation_busy"
54 | "automation_manual"
55 | "automation_paused"
56 | "failed"
57 | "idle"
58 | "missing_active_link"
59 | "no_artifact_store"
60 | "no_browser_bridge"
61 | "ok"
62 | "retry_scheduled";
63
64type RenewalJobTerminalReason =
65 | "invalid_payload"
66 | "missing_local_conversation"
67 | "missing_payload_text";
68
69interface RenewalDispatchPayload {
70 structured: RenewalProjectorPayload | null;
71 text: string;
72}
73
74interface RenewalDispatchTarget {
75 clientId: string | null;
76 conversationId: string | null;
77 organizationId: string | null;
78 pageTitle: string | null;
79 pageUrl: string | null;
80 platform: string;
81 tabId: number | null;
82}
83
84interface RenewalDispatchContext {
85 conversation: LocalConversationRecord;
86 job: RenewalJobRecord;
87 link: ConversationLinkRecord;
88 target: RenewalDispatchTarget;
89 targetSnapshot: RenewalProjectorTargetSnapshot;
90}
91
92interface RenewalDispatchOutcome {
93 dispatch: BrowserBridgeActionDispatch;
94 deliveryAck: BrowserBridgeDeliveryAckSnapshot;
95 result: BrowserBridgeActionResultSnapshot;
96}
97
98interface RenewalExecutionFailure {
99 deliveryAck: BrowserBridgeDeliveryAckSnapshot | null;
100 errorCode: string | null;
101 message: string;
102 result: string;
103 timeoutMs: number | null;
104}
105
106class RenewalProxyDeliveryError extends Error {
107 readonly deliveryAck: BrowserBridgeDeliveryAckSnapshot | null;
108
109 constructor(message: string, deliveryAck: BrowserBridgeDeliveryAckSnapshot | null = null) {
110 super(message);
111 this.name = "RenewalProxyDeliveryError";
112 this.deliveryAck = deliveryAck;
113 }
114}
115
116interface RenewalDispatcherRunnerOptions {
117 browserBridge: BrowserBridgeController | null;
118 executionTimeoutMs?: number;
119 interJobJitterMaxMs?: number;
120 interJobJitterMinMs?: number;
121 now?: () => number;
122 random?: () => number;
123 recheckDelayMs?: number;
124 retryBaseDelayMs?: number;
125 retryJitterFactor?: number;
126 retryMaxDelayMs?: number;
127 successCooldownMs?: number;
128}
129
130interface RenewalDispatcherJitterSettings {
131 interJobJitterMaxMs: number;
132 interJobJitterMinMs: number;
133 random: () => number;
134 retryJitterFactor: number;
135}
136
137interface RenewalRetryDelaySchedule {
138 baseDelayMs: number;
139 delayMs: number;
140 jitterMs: number;
141}
142
143export function createRenewalDispatcherRunner(
144 options: RenewalDispatcherRunnerOptions
145): TimedJobRunner {
146 return {
147 name: RUNNER_NAME,
148 async run(context) {
149 return runRenewalDispatcher(context, options);
150 }
151 };
152}
153
154export async function runRenewalDispatcher(
155 context: TimedJobTickContext,
156 options: RenewalDispatcherRunnerOptions
157): Promise<TimedJobRunnerResult> {
158 const artifactStore = context.artifactStore;
159
160 if (artifactStore == null) {
161 context.log({
162 stage: "scan_skipped",
163 result: "no_artifact_store"
164 });
165 return buildDispatcherSummary("no_artifact_store");
166 }
167
168 if (options.browserBridge == null) {
169 context.log({
170 stage: "scan_skipped",
171 result: "no_browser_bridge"
172 });
173 return buildDispatcherSummary("no_browser_bridge");
174 }
175
176 const now = options.now ?? (() => Date.now());
177 const jitterSettings = resolveDispatcherJitterSettings(options);
178 const nowMs = now();
179 const dueJobs = await artifactStore.listRenewalJobs({
180 limit: context.maxTasksPerTick,
181 nextAttemptAtLte: nowMs,
182 status: "pending"
183 });
184
185 context.log({
186 stage: "scan_window",
187 result: "ok",
188 details: {
189 due_before: nowMs,
190 limit: context.maxTasksPerTick
191 }
192 });
193
194 if (dueJobs.length === 0) {
195 context.log({
196 stage: "scan_completed",
197 result: "idle",
198 details: {
199 due_jobs: 0,
200 failed_jobs: 0,
201 retried_jobs: 0,
202 skipped_jobs: 0,
203 successful_jobs: 0
204 }
205 });
206 return buildDispatcherSummary("idle");
207 }
208
209 let failedJobs = 0;
210 let dispatchedJobs = 0;
211 let retriedJobs = 0;
212 let skippedJobs = 0;
213 let successfulJobs = 0;
214
215 for (const job of dueJobs) {
216 const jobNowMs = now();
217 const dispatchContext = await resolveDispatchContext(artifactStore, job);
218
219 if (dispatchContext.conversation == null) {
220 failedJobs += 1;
221 await markJobFailed(artifactStore, job, {
222 attemptCount: job.attemptCount + 1,
223 lastError: "missing_local_conversation",
224 logPath: resolveJobLogPath(job.logPath, context.logDir, jobNowMs),
225 now: jobNowMs,
226 targetSnapshot: dispatchContext.targetSnapshot
227 });
228 context.log({
229 stage: "job_failed",
230 result: "missing_local_conversation",
231 details: {
232 attempt_count: job.attemptCount + 1,
233 job_id: job.jobId,
234 local_conversation_id: job.localConversationId,
235 message_id: job.messageId
236 }
237 });
238 continue;
239 }
240
241 if (dispatchContext.conversation.automationStatus !== "auto") {
242 skippedJobs += 1;
243 const nextAttemptAt = jobNowMs + resolvePositiveInteger(
244 options.recheckDelayMs,
245 Math.max(DEFAULT_RECHECK_DELAY_MS, context.config.intervalMs)
246 );
247 await artifactStore.updateRenewalJob({
248 jobId: job.jobId,
249 logPath: resolveJobLogPath(job.logPath, context.logDir, jobNowMs),
250 nextAttemptAt,
251 targetSnapshot: dispatchContext.targetSnapshot,
252 updatedAt: jobNowMs
253 });
254 context.log({
255 stage: "job_deferred",
256 result:
257 dispatchContext.conversation.automationStatus === "paused"
258 ? "automation_paused"
259 : "automation_manual",
260 details: {
261 automation_status: dispatchContext.conversation.automationStatus,
262 job_id: job.jobId,
263 local_conversation_id: job.localConversationId,
264 message_id: job.messageId,
265 next_attempt_at: nextAttemptAt
266 }
267 });
268 continue;
269 }
270
271 if (dispatchContext.link == null || dispatchContext.target == null) {
272 const attempts = job.attemptCount + 1;
273 const failureMessage = dispatchContext.link == null ? "missing_active_link" : "route_unavailable";
274 const failureResult = await applyFailureOutcome(artifactStore, job, {
275 attemptCount: attempts,
276 errorMessage: failureMessage,
277 logDir: context.logDir,
278 now: jobNowMs,
279 retryBaseDelayMs: options.retryBaseDelayMs,
280 retryJitterFactor: jitterSettings.retryJitterFactor,
281 retryMaxDelayMs: options.retryMaxDelayMs,
282 random: jitterSettings.random,
283 targetSnapshot: dispatchContext.targetSnapshot
284 });
285 await recordAutomationFailureSignal({
286 conversation: dispatchContext.conversation,
287 errorMessage: failureMessage,
288 observedAt: jobNowMs,
289 store: artifactStore
290 });
291
292 if (failureResult.status === "failed") {
293 failedJobs += 1;
294 context.log({
295 stage: "job_failed",
296 result: failureResult.result,
297 details: {
298 attempt_count: attempts,
299 job_id: job.jobId,
300 message_id: job.messageId
301 }
302 });
303 } else {
304 retriedJobs += 1;
305 context.log({
306 stage: "job_retry_scheduled",
307 result: failureResult.result,
308 details: {
309 attempt_count: attempts,
310 job_id: job.jobId,
311 message_id: job.messageId,
312 next_attempt_at: failureResult.nextAttemptAt,
313 retry_base_delay_ms: failureResult.baseDelayMs,
314 retry_delay_ms: failureResult.delayMs,
315 retry_jitter_ms: failureResult.jitterMs
316 }
317 });
318 }
319 continue;
320 }
321
322 const payload = normalizeDispatchPayload(job.payload, job.payloadKind);
323
324 if (payload.text == null) {
325 const errorMessage = payload.error ?? "invalid_payload";
326 failedJobs += 1;
327 await markJobFailed(artifactStore, job, {
328 attemptCount: job.attemptCount + 1,
329 lastError: errorMessage,
330 logPath: resolveJobLogPath(job.logPath, context.logDir, jobNowMs),
331 now: jobNowMs,
332 targetSnapshot: dispatchContext.targetSnapshot
333 });
334 await recordAutomationFailureSignal({
335 conversation: dispatchContext.conversation,
336 errorMessage,
337 observedAt: jobNowMs,
338 store: artifactStore
339 });
340 context.log({
341 stage: "job_failed",
342 result: errorMessage,
343 details: {
344 attempt_count: job.attemptCount + 1,
345 job_id: job.jobId,
346 message_id: job.messageId
347 }
348 });
349 continue;
350 }
351
352 const interJobJitterMs = dispatchedJobs > 0
353 ? sampleInterJobJitterMs(jitterSettings)
354 : 0;
355
356 if (interJobJitterMs > 0) {
357 context.log({
358 stage: "job_dispatch_jitter",
359 result: "inter_job_jitter_applied",
360 details: {
361 dispatch_sequence_in_tick: dispatchedJobs + 1,
362 job_id: job.jobId,
363 jitter_ms: interJobJitterMs,
364 local_conversation_id: job.localConversationId,
365 message_id: job.messageId
366 }
367 });
368 await sleep(interJobJitterMs);
369 }
370
371 const lockedConversation = await artifactStore.tryBeginLocalConversationExecution({
372 executionState: "renewal_running",
373 localConversationId: job.localConversationId,
374 updatedAt: now()
375 });
376
377 if (lockedConversation == null) {
378 skippedJobs += 1;
379 const nextAttemptAt = now() + resolvePositiveInteger(
380 options.recheckDelayMs,
381 Math.max(DEFAULT_RECHECK_DELAY_MS, context.config.intervalMs)
382 );
383 await artifactStore.updateRenewalJob({
384 jobId: job.jobId,
385 logPath: resolveJobLogPath(job.logPath, context.logDir, jobNowMs),
386 nextAttemptAt,
387 targetSnapshot: dispatchContext.targetSnapshot,
388 updatedAt: now()
389 });
390 context.log({
391 stage: "job_deferred",
392 result: "automation_busy",
393 details: {
394 execution_state: dispatchContext.conversation.executionState,
395 job_id: job.jobId,
396 local_conversation_id: job.localConversationId,
397 message_id: job.messageId,
398 next_attempt_at: nextAttemptAt
399 }
400 });
401 continue;
402 }
403
404 try {
405 const attemptStartedAt = now();
406 const runningJob = await artifactStore.updateRenewalJob({
407 finishedAt: null,
408 jobId: job.jobId,
409 lastAttemptAt: attemptStartedAt,
410 lastError: null,
411 logPath: resolveJobLogPath(job.logPath, context.logDir, attemptStartedAt),
412 nextAttemptAt: null,
413 startedAt: attemptStartedAt,
414 status: "running",
415 targetSnapshot: dispatchContext.targetSnapshot,
416 updatedAt: attemptStartedAt
417 });
418 dispatchedJobs += 1;
419 context.log({
420 stage: "job_attempt_started",
421 result: "attempt_started",
422 details: {
423 attempt_count: job.attemptCount + 1,
424 client_id: dispatchContext.target.clientId,
425 dispatch_sequence_in_tick: dispatchedJobs,
426 inter_job_jitter_ms: interJobJitterMs,
427 job_id: job.jobId,
428 local_conversation_id: job.localConversationId,
429 message_id: job.messageId,
430 started_at: attemptStartedAt,
431 tab_id: dispatchContext.target.tabId
432 }
433 });
434
435 const delivery = await dispatchRenewalJob(options.browserBridge, {
436 assistantMessageId: job.messageId,
437 messageText: payload.text,
438 target: dispatchContext.target,
439 timeoutMs: resolvePositiveInteger(options.executionTimeoutMs, DEFAULT_RENEWAL_EXECUTION_TIMEOUT_MS)
440 });
441 const finishedAt = now();
442 const attemptCount = job.attemptCount + 1;
443 const cooldownUntil = finishedAt + resolveSuccessCooldownMs(options.successCooldownMs, context.config.intervalMs);
444 const recoveredFromColdStart = isChatgptColdStartFailureMessage(
445 dispatchContext.target.platform,
446 job.lastError
447 );
448
449 await artifactStore.upsertLocalConversation({
450 cooldownUntil,
451 localConversationId: job.localConversationId,
452 platform: dispatchContext.conversation.platform,
453 updatedAt: finishedAt
454 });
455 await artifactStore.updateRenewalJob({
456 attemptCount,
457 finishedAt,
458 jobId: job.jobId,
459 lastAttemptAt: attemptStartedAt,
460 lastError: null,
461 logPath: resolveJobLogPath(job.logPath, context.logDir, attemptStartedAt),
462 nextAttemptAt: null,
463 startedAt: runningJob.startedAt ?? attemptStartedAt,
464 status: "done",
465 targetSnapshot: dispatchContext.targetSnapshot,
466 updatedAt: finishedAt
467 });
468 await recordAutomationSuccessSignal({
469 conversation: lockedConversation,
470 observedAt: finishedAt,
471 store: artifactStore
472 });
473 successfulJobs += 1;
474 if (recoveredFromColdStart) {
475 context.log({
476 stage: "chatgpt_template_warmup",
477 result: "template_warmup_completed",
478 details: {
479 attempt_count: attemptCount,
480 job_id: job.jobId,
481 local_conversation_id: job.localConversationId,
482 message_id: job.messageId,
483 recovered_at: finishedAt,
484 previous_error: job.lastError
485 }
486 });
487 }
488 context.log({
489 stage: "job_completed",
490 result: "attempt_succeeded",
491 details: {
492 attempt_count: attemptCount,
493 client_id: delivery.dispatch.clientId,
494 connection_id: delivery.dispatch.connectionId,
495 downstream_delivery_level: delivery.deliveryAck.level,
496 downstream_status_code: delivery.deliveryAck.status_code,
497 job_id: job.jobId,
498 message_id: job.messageId,
499 proxy_request_id: delivery.dispatch.requestId,
500 received_at: delivery.result.received_at,
501 recovered_from_cold_start: recoveredFromColdStart || undefined,
502 status_confirmed_at: delivery.deliveryAck.confirmed_at
503 }
504 });
505 } catch (error) {
506 const failure = normalizeExecutionFailure(error);
507 const attempts = job.attemptCount + 1;
508 const coldStartFailure = isChatgptColdStartFailureMessage(
509 dispatchContext.target.platform,
510 failure.result
511 );
512 const failureResult = await applyFailureOutcome(artifactStore, job, {
513 attemptCount: attempts,
514 errorMessage: failure.result,
515 logDir: context.logDir,
516 now: now(),
517 retryBaseDelayMs: coldStartFailure
518 ? CHATGPT_COLD_START_RETRY_BASE_DELAY_MS
519 : options.retryBaseDelayMs,
520 retryJitterFactor: jitterSettings.retryJitterFactor,
521 retryMaxDelayMs: coldStartFailure
522 ? CHATGPT_COLD_START_RETRY_MAX_DELAY_MS
523 : options.retryMaxDelayMs,
524 random: jitterSettings.random,
525 targetSnapshot: dispatchContext.targetSnapshot
526 });
527 await recordAutomationFailureSignal({
528 conversation: lockedConversation,
529 errorMessage: failure.result,
530 observedAt: now(),
531 store: artifactStore
532 });
533 if (coldStartFailure) {
534 context.log({
535 stage: "chatgpt_cold_start_delivery",
536 result: failureResult.status === "pending"
537 ? "waiting_for_template_warmup"
538 : "template_warmup_exhausted",
539 details: {
540 attempt_count: attempts,
541 error_message: failure.message,
542 job_id: job.jobId,
543 local_conversation_id: job.localConversationId,
544 message_id: job.messageId,
545 next_attempt_at: failureResult.status === "pending"
546 ? failureResult.nextAttemptAt
547 : undefined,
548 retry_base_delay_ms: failureResult.status === "pending"
549 ? failureResult.baseDelayMs
550 : undefined,
551 retry_delay_ms: failureResult.status === "pending"
552 ? failureResult.delayMs
553 : undefined
554 }
555 });
556 }
557
558 if (failureResult.status === "failed") {
559 failedJobs += 1;
560 context.log({
561 stage: "job_failed",
562 result: failureResult.result,
563 details: {
564 attempt_count: attempts,
565 downstream_delivery_level: failure.deliveryAck?.level ?? undefined,
566 downstream_reason: failure.deliveryAck?.reason ?? undefined,
567 downstream_status_code: failure.deliveryAck?.status_code ?? undefined,
568 error_code: failure.errorCode,
569 error_message: failure.message,
570 job_id: job.jobId,
571 message_id: job.messageId,
572 cold_start_waiting_for_template: coldStartFailure || undefined,
573 timeout_ms: failure.timeoutMs
574 }
575 });
576 } else {
577 retriedJobs += 1;
578 context.log({
579 stage: "job_retry_scheduled",
580 result: failureResult.result,
581 details: {
582 attempt_count: attempts,
583 downstream_delivery_level: failure.deliveryAck?.level ?? undefined,
584 downstream_reason: failure.deliveryAck?.reason ?? undefined,
585 downstream_status_code: failure.deliveryAck?.status_code ?? undefined,
586 error_code: failure.errorCode,
587 error_message: failure.message,
588 job_id: job.jobId,
589 message_id: job.messageId,
590 next_attempt_at: failureResult.nextAttemptAt,
591 retry_base_delay_ms: failureResult.baseDelayMs,
592 retry_delay_ms: failureResult.delayMs,
593 retry_jitter_ms: failureResult.jitterMs,
594 cold_start_waiting_for_template: coldStartFailure || undefined,
595 timeout_ms: failure.timeoutMs
596 }
597 });
598 }
599 } finally {
600 await artifactStore.finishLocalConversationExecution({
601 executionState: "renewal_running",
602 localConversationId: job.localConversationId,
603 updatedAt: now()
604 });
605 }
606 }
607
608 context.log({
609 stage: "scan_completed",
610 result: "ok",
611 details: {
612 due_jobs: dueJobs.length,
613 failed_jobs: failedJobs,
614 retried_jobs: retriedJobs,
615 skipped_jobs: skippedJobs,
616 successful_jobs: successfulJobs
617 }
618 });
619
620 return {
621 details: {
622 due_jobs: dueJobs.length,
623 failed_jobs: failedJobs,
624 retried_jobs: retriedJobs,
625 skipped_jobs: skippedJobs,
626 successful_jobs: successfulJobs
627 },
628 result: "ok"
629 };
630}
631
632async function resolveDispatchContext(
633 artifactStore: Pick<ArtifactStore, "getLocalConversation" | "listConversationLinks">,
634 job: RenewalJobRecord
635): Promise<{
636 conversation: LocalConversationRecord | null;
637 link: ConversationLinkRecord | null;
638 target: RenewalDispatchTarget | null;
639 targetSnapshot: RenewalProjectorTargetSnapshot | null;
640}> {
641 const conversation = await artifactStore.getLocalConversation(job.localConversationId);
642
643 if (conversation == null) {
644 return {
645 conversation: null,
646 link: null,
647 target: null,
648 targetSnapshot: parseRenewalTargetSnapshot(job.targetSnapshot)
649 };
650 }
651
652 const [link] = await artifactStore.listConversationLinks({
653 isActive: true,
654 limit: 1,
655 localConversationId: job.localConversationId
656 });
657 const targetSnapshot = link == null
658 ? parseRenewalTargetSnapshot(job.targetSnapshot)
659 : buildRenewalTargetSnapshot(link);
660
661 return {
662 conversation,
663 link: link ?? null,
664 target: resolveDispatchTarget(targetSnapshot),
665 targetSnapshot
666 };
667}
668
669async function dispatchRenewalJob(
670 browserBridge: BrowserBridgeController,
671 input: {
672 assistantMessageId: string;
673 messageText: string;
674 target: RenewalDispatchTarget;
675 timeoutMs: number;
676 }
677): Promise<RenewalDispatchOutcome> {
678 const targetTabId =
679 input.target.conversationId != null || input.target.pageUrl != null
680 ? null
681 : input.target.tabId;
682 const dispatch = browserBridge.proxyDelivery({
683 assistantMessageId: input.assistantMessageId,
684 clientId: input.target.clientId,
685 conversationId: input.target.conversationId,
686 messageText: input.messageText,
687 organizationId: input.target.organizationId,
688 pageTitle: input.target.pageTitle,
689 pageUrl: input.target.pageUrl,
690 planId: buildRenewalPlanId(input.assistantMessageId),
691 platform: input.target.platform,
692 shellPage: false,
693 tabId: targetTabId,
694 timeoutMs: input.timeoutMs
695 });
696 const result = await dispatch.result;
697 const deliveryAck = resolveRenewalDeliveryAck(result);
698
699 if (result.accepted !== true || result.failed === true) {
700 throw new RenewalProxyDeliveryError(
701 normalizeOptionalString(result.reason) ?? "browser proxy delivery failed",
702 deliveryAck
703 );
704 }
705
706 if (deliveryAck == null) {
707 throw new RenewalProxyDeliveryError("downstream_status_missing");
708 }
709
710 if (deliveryAck.level < 1 || deliveryAck.status_code == null || deliveryAck.failed === true || deliveryAck.status_code !== 200) {
711 throw new RenewalProxyDeliveryError(
712 buildRenewalDeliveryFailureResult(deliveryAck),
713 deliveryAck
714 );
715 }
716
717 return {
718 dispatch,
719 deliveryAck,
720 result
721 };
722}
723
724function resolveRenewalDeliveryAck(
725 result: BrowserBridgeActionResultSnapshot
726): BrowserBridgeDeliveryAckSnapshot | null {
727 for (const item of result.results) {
728 if (item.delivery_ack != null) {
729 return item.delivery_ack;
730 }
731 }
732
733 return null;
734}
735
736function buildRenewalDeliveryFailureResult(deliveryAck: BrowserBridgeDeliveryAckSnapshot): string {
737 if (deliveryAck.status_code != null) {
738 return `downstream_status_${deliveryAck.status_code}`;
739 }
740
741 return normalizeOptionalString(deliveryAck.reason) ?? "downstream_status_missing";
742}
743
744async function applyFailureOutcome(
745 artifactStore: Pick<ArtifactStore, "updateRenewalJob">,
746 job: RenewalJobRecord,
747 input: {
748 attemptCount: number;
749 errorMessage: string;
750 logDir: string | null;
751 now: number;
752 retryBaseDelayMs?: number;
753 retryJitterFactor?: number;
754 retryMaxDelayMs?: number;
755 random?: () => number;
756 targetSnapshot: RenewalProjectorTargetSnapshot | null;
757 }
758): Promise<
759 | {
760 result: string;
761 status: "failed";
762 }
763 | {
764 baseDelayMs: number;
765 delayMs: number;
766 jitterMs: number;
767 nextAttemptAt: number;
768 result: string;
769 status: "pending";
770 }
771> {
772 const resolvedError = normalizeOptionalString(input.errorMessage) ?? "renewal_dispatch_failed";
773 const retryable = isRetryableFailure(resolvedError);
774
775 if (!retryable || input.attemptCount >= job.maxAttempts) {
776 await markJobFailed(artifactStore, job, {
777 attemptCount: input.attemptCount,
778 lastError: resolvedError,
779 logPath: resolveJobLogPath(job.logPath, input.logDir, input.now),
780 now: input.now,
781 targetSnapshot: input.targetSnapshot
782 });
783 return {
784 result: resolvedError,
785 status: "failed"
786 };
787 }
788
789 const retryDelay = computeRetryDelayMs(input.attemptCount, {
790 random: input.random,
791 retryBaseDelayMs: input.retryBaseDelayMs,
792 retryJitterFactor: input.retryJitterFactor,
793 retryMaxDelayMs: input.retryMaxDelayMs
794 });
795 const nextAttemptAt = input.now + retryDelay.delayMs;
796 await artifactStore.updateRenewalJob({
797 attemptCount: input.attemptCount,
798 finishedAt: null,
799 jobId: job.jobId,
800 lastAttemptAt: input.now,
801 lastError: resolvedError,
802 logPath: resolveJobLogPath(job.logPath, input.logDir, input.now),
803 nextAttemptAt,
804 startedAt: null,
805 status: "pending",
806 targetSnapshot: input.targetSnapshot,
807 updatedAt: input.now
808 });
809 return {
810 baseDelayMs: retryDelay.baseDelayMs,
811 delayMs: retryDelay.delayMs,
812 jitterMs: retryDelay.jitterMs,
813 nextAttemptAt,
814 result: resolvedError,
815 status: "pending"
816 };
817}
818
819async function markJobFailed(
820 artifactStore: Pick<ArtifactStore, "updateRenewalJob">,
821 job: RenewalJobRecord,
822 input: {
823 attemptCount: number;
824 lastError: string;
825 logPath: string | null;
826 now?: number;
827 targetSnapshot: RenewalProjectorTargetSnapshot | null;
828 }
829): Promise<void> {
830 const finishedAt = input.now ?? Date.now();
831
832 await artifactStore.updateRenewalJob({
833 attemptCount: input.attemptCount,
834 finishedAt,
835 jobId: job.jobId,
836 lastAttemptAt: finishedAt,
837 lastError: input.lastError,
838 logPath: input.logPath,
839 nextAttemptAt: null,
840 startedAt: null,
841 status: "failed",
842 targetSnapshot: input.targetSnapshot,
843 updatedAt: finishedAt
844 });
845}
846
847function buildDispatcherSummary(result: RenewalDispatcherJobResult): TimedJobRunnerResult {
848 return {
849 details: {
850 due_jobs: 0,
851 failed_jobs: 0,
852 retried_jobs: 0,
853 skipped_jobs: 0,
854 successful_jobs: 0
855 },
856 result
857 };
858}
859
860function buildRenewalPlanId(messageId: string): string {
861 return `renewal_${sanitizePathSegment(messageId)}_${Date.now()}`;
862}
863
864function computeRetryDelayMs(
865 attemptCount: number,
866 options: {
867 random?: () => number;
868 retryBaseDelayMs?: number;
869 retryJitterFactor?: number;
870 retryMaxDelayMs?: number;
871 }
872): RenewalRetryDelaySchedule {
873 const baseDelayMs = resolvePositiveInteger(options.retryBaseDelayMs, DEFAULT_RETRY_BASE_DELAY_MS);
874 const maxDelayMs = resolvePositiveInteger(options.retryMaxDelayMs, DEFAULT_RETRY_MAX_DELAY_MS);
875 const retryJitterFactor = resolveNonNegativeNumber(
876 options.retryJitterFactor,
877 DEFAULT_RETRY_JITTER_FACTOR
878 );
879 const exponent = Math.max(0, attemptCount - 1);
880 const cappedBaseDelayMs = Math.min(maxDelayMs, baseDelayMs * (2 ** exponent));
881
882 if (retryJitterFactor === 0 || cappedBaseDelayMs <= 0) {
883 return {
884 baseDelayMs: cappedBaseDelayMs,
885 delayMs: cappedBaseDelayMs,
886 jitterMs: 0
887 };
888 }
889
890 const centeredRandom = (sampleRandomUnit(options.random) * 2) - 1;
891 const sampledJitterMs = Math.round(cappedBaseDelayMs * retryJitterFactor * centeredRandom);
892 const delayMs = Math.max(1, Math.min(maxDelayMs, cappedBaseDelayMs + sampledJitterMs));
893
894 return {
895 baseDelayMs: cappedBaseDelayMs,
896 delayMs,
897 jitterMs: delayMs - cappedBaseDelayMs
898 };
899}
900
901function sampleInterJobJitterMs(settings: RenewalDispatcherJitterSettings): number {
902 return sampleUniformJitterMs(
903 settings.interJobJitterMinMs,
904 settings.interJobJitterMaxMs,
905 settings.random
906 );
907}
908
909function sampleUniformJitterMs(
910 minMs: number,
911 maxMs: number,
912 random?: () => number
913): number {
914 const normalizedMinMs = resolveNonNegativeInteger(Math.min(minMs, maxMs), 0);
915 const normalizedMaxMs = resolveNonNegativeInteger(Math.max(minMs, maxMs), 0);
916
917 if (normalizedMaxMs <= normalizedMinMs) {
918 return normalizedMinMs;
919 }
920
921 const offsetMs = normalizedMaxMs - normalizedMinMs;
922 return normalizedMinMs + Math.round(offsetMs * sampleRandomUnit(random));
923}
924
925function resolveDispatcherJitterSettings(
926 options: RenewalDispatcherRunnerOptions
927): RenewalDispatcherJitterSettings {
928 const minJitterMs = resolveNonNegativeInteger(
929 options.interJobJitterMinMs,
930 DEFAULT_INTER_JOB_JITTER_MIN_MS
931 );
932 const maxJitterMs = resolveNonNegativeInteger(
933 options.interJobJitterMaxMs,
934 DEFAULT_INTER_JOB_JITTER_MAX_MS
935 );
936
937 return {
938 interJobJitterMaxMs: Math.max(minJitterMs, maxJitterMs),
939 interJobJitterMinMs: Math.min(minJitterMs, maxJitterMs),
940 random: options.random ?? Math.random,
941 retryJitterFactor: resolveNonNegativeNumber(
942 options.retryJitterFactor,
943 DEFAULT_RETRY_JITTER_FACTOR
944 )
945 };
946}
947
948function sampleRandomUnit(random?: () => number): number {
949 const sampled = random == null ? Math.random() : random();
950
951 if (!Number.isFinite(sampled)) {
952 return 0.5;
953 }
954
955 return Math.min(1, Math.max(0, sampled));
956}
957
958function normalizeDispatchPayload(
959 payload: string,
960 payloadKind: RenewalJobPayloadKind
961): {
962 error: RenewalJobTerminalReason | null;
963 text: string | null;
964 value: RenewalDispatchPayload | null;
965} {
966 if (payloadKind === "text") {
967 const text = normalizeOptionalString(payload);
968 return text == null
969 ? {
970 error: "missing_payload_text",
971 text: null,
972 value: null
973 }
974 : {
975 error: null,
976 text,
977 value: {
978 structured: null,
979 text
980 }
981 };
982 }
983
984 const parsed = parseJsonValue(payload);
985
986 if (!isPlainRecord(parsed)) {
987 return {
988 error: "invalid_payload",
989 text: null,
990 value: null
991 };
992 }
993
994 const text = normalizeOptionalString(readString(parsed, "text"));
995
996 if (text == null) {
997 return {
998 error: "missing_payload_text",
999 text: null,
1000 value: null
1001 };
1002 }
1003
1004 return {
1005 error: null,
1006 text,
1007 value: {
1008 structured: isRenewalProjectorPayload(parsed) ? parsed : null,
1009 text
1010 }
1011 };
1012}
1013
1014function parseRenewalTargetSnapshot(value: string | null): RenewalProjectorTargetSnapshot | null {
1015 const parsed = parseJsonValue(value);
1016 return isRenewalTargetSnapshot(parsed) ? parsed : null;
1017}
1018
1019function resolveDispatchTarget(
1020 snapshot: RenewalProjectorTargetSnapshot | null
1021): RenewalDispatchTarget | null {
1022 if (snapshot == null || snapshot.target.kind !== PROXY_DELIVERY_TARGET_KIND) {
1023 return null;
1024 }
1025
1026 const payload = isPlainRecord(snapshot.target.payload) ? snapshot.target.payload : null;
1027 const shellPage = readBoolean(payload, "shellPage") === true;
1028 const routeParams = isPlainRecord(snapshot.route.params) ? snapshot.route.params : null;
1029 const tabId = readPositiveInteger(payload, "tabId") ?? parseTabId(snapshot.target.id);
1030 const conversationId =
1031 normalizeOptionalString(readString(payload, "conversationId"))
1032 ?? normalizeOptionalString(readString(routeParams, "conversationId"));
1033 const pageUrl = normalizeOptionalString(readString(payload, "pageUrl")) ?? snapshot.pageUrl;
1034
1035 if (shellPage || (conversationId == null && pageUrl == null && tabId == null)) {
1036 return null;
1037 }
1038
1039 return {
1040 clientId: normalizeOptionalString(readString(payload, "clientId")) ?? snapshot.clientId,
1041 conversationId,
1042 organizationId: normalizeOptionalString(readString(payload, "organizationId")),
1043 pageTitle: normalizeOptionalString(readString(payload, "pageTitle")) ?? snapshot.pageTitle,
1044 pageUrl,
1045 platform: snapshot.platform,
1046 tabId
1047 };
1048}
1049
1050function resolveJobLogPath(existing: string | null, logDir: string | null, now: number): string | null {
1051 if (normalizeOptionalString(existing) != null) {
1052 return existing;
1053 }
1054
1055 if (logDir == null) {
1056 return null;
1057 }
1058
1059 return join(logDir, `${new Date(now).toISOString().slice(0, 10)}.jsonl`);
1060}
1061
1062function isRetryableFailure(message: string): boolean {
1063 const downstreamStatusCode = parseDownstreamStatusCode(message);
1064
1065 if (downstreamStatusCode != null) {
1066 return downstreamStatusCode === 429 || downstreamStatusCode >= 500;
1067 }
1068
1069 return ![
1070 "invalid_payload",
1071 "missing_local_conversation",
1072 "missing_payload_text"
1073 ].includes(message);
1074}
1075
1076function isChatgptColdStartFailureMessage(
1077 platform: string | null | undefined,
1078 message: string | null | undefined
1079): boolean {
1080 const normalizedMessage = normalizeOptionalString(message);
1081
1082 if (platform !== CHATGPT_PLATFORM || normalizedMessage == null) {
1083 return false;
1084 }
1085
1086 return /^delivery\.template_(?:invalid|missing)\b/u.test(normalizedMessage);
1087}
1088
1089function parseDownstreamStatusCode(message: string): number | null {
1090 const match = /^downstream_status_(\d{3})$/u.exec(message.trim());
1091
1092 if (match == null) {
1093 return null;
1094 }
1095
1096 return Number(match[1]);
1097}
1098
1099function isRenewalProjectorPayload(value: unknown): value is RenewalProjectorPayload {
1100 return (
1101 isPlainRecord(value)
1102 && value.kind === "renewal.message"
1103 && typeof value.text === "string"
1104 && typeof value.template === "string"
1105 && value.version === 1
1106 && isPlainRecord(value.sourceMessage)
1107 && typeof value.sourceMessage.id === "string"
1108 );
1109}
1110
1111function isRenewalTargetSnapshot(value: unknown): value is RenewalProjectorTargetSnapshot {
1112 return (
1113 isPlainRecord(value)
1114 && typeof value.platform === "string"
1115 && typeof value.linkId === "string"
1116 && isPlainRecord(value.route)
1117 && isPlainRecord(value.target)
1118 && typeof value.target.kind === "string"
1119 );
1120}
1121
1122function parseTabId(value: string | null): number | null {
1123 const normalized = normalizeOptionalString(value);
1124
1125 if (normalized == null) {
1126 return null;
1127 }
1128
1129 const matched = TAB_TARGET_ID_PATTERN.exec(normalized);
1130
1131 if (matched == null) {
1132 return null;
1133 }
1134
1135 const parsed = Number.parseInt(matched[1] ?? "", 10);
1136 return Number.isInteger(parsed) && parsed > 0 ? parsed : null;
1137}
1138
1139function readBoolean(record: Record<string, unknown> | null, key: string): boolean | null {
1140 if (record == null) {
1141 return null;
1142 }
1143
1144 return typeof record[key] === "boolean" ? record[key] : null;
1145}
1146
1147function readPositiveInteger(record: Record<string, unknown> | null, key: string): number | null {
1148 if (record == null) {
1149 return null;
1150 }
1151
1152 const value = record[key];
1153 return Number.isInteger(value) && Number(value) > 0 ? Number(value) : null;
1154}
1155
1156function readString(record: Record<string, unknown> | null, key: string): string | null {
1157 if (record == null) {
1158 return null;
1159 }
1160
1161 return typeof record[key] === "string" ? record[key] : null;
1162}
1163
1164function resolvePositiveInteger(value: number | undefined, fallback: number): number {
1165 return Number.isInteger(value) && Number(value) > 0 ? Number(value) : fallback;
1166}
1167
1168function resolveNonNegativeInteger(value: number | undefined, fallback: number): number {
1169 return Number.isInteger(value) && Number(value) >= 0 ? Number(value) : fallback;
1170}
1171
1172function resolveNonNegativeNumber(value: number | undefined, fallback: number): number {
1173 return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : fallback;
1174}
1175
1176function resolveSuccessCooldownMs(value: number | undefined, intervalMs: number): number {
1177 if (Number.isInteger(value) && Number(value) > 0) {
1178 return Number(value);
1179 }
1180
1181 return Math.max(DEFAULT_SUCCESS_COOLDOWN_MS, intervalMs + 1);
1182}
1183
1184async function sleep(ms: number): Promise<void> {
1185 const delayMs = resolveNonNegativeInteger(Math.round(ms), 0);
1186
1187 if (delayMs === 0) {
1188 return;
1189 }
1190
1191 await new Promise<void>((resolve) => {
1192 setTimeout(resolve, delayMs);
1193 });
1194}
1195
1196function sanitizePathSegment(value: string): string {
1197 const normalized = value.trim().toLowerCase().replace(/[^a-z0-9]+/gu, "-");
1198 const collapsed = normalized.replace(/-+/gu, "-").replace(/^-|-$/gu, "");
1199 return collapsed === "" ? "unknown" : collapsed;
1200}
1201
1202function readErrorCode(error: unknown): string | null {
1203 return isPlainRecord(error) && typeof error.code === "string" ? error.code : null;
1204}
1205
1206function readErrorTimeoutMs(error: unknown): number | null {
1207 if (!isPlainRecord(error) || typeof error.timeoutMs !== "number") {
1208 return null;
1209 }
1210
1211 return Number.isFinite(error.timeoutMs) && error.timeoutMs > 0
1212 ? Math.round(error.timeoutMs)
1213 : null;
1214}
1215
1216function readErrorDeliveryAck(error: unknown): BrowserBridgeDeliveryAckSnapshot | null {
1217 if (!isPlainRecord(error)) {
1218 return null;
1219 }
1220
1221 const candidate = error.deliveryAck;
1222
1223 if (!isPlainRecord(candidate)) {
1224 return null;
1225 }
1226
1227 const level = candidate.level;
1228 const statusCode = candidate.status_code;
1229 const confirmedAt = candidate.confirmed_at;
1230
1231 return {
1232 confirmed_at: typeof confirmedAt === "number" && Number.isFinite(confirmedAt) ? confirmedAt : null,
1233 failed: candidate.failed === true,
1234 level: level === 1 || level === 2 || level === 3 ? level : 0,
1235 reason: typeof candidate.reason === "string" ? candidate.reason : null,
1236 status_code: typeof statusCode === "number" && Number.isFinite(statusCode) ? statusCode : null
1237 };
1238}
1239
1240function normalizeExecutionFailure(error: unknown): RenewalExecutionFailure {
1241 const deliveryAck = readErrorDeliveryAck(error);
1242 const errorCode = readErrorCode(error);
1243 const timeoutMs = readErrorTimeoutMs(error);
1244 const message = toErrorMessage(error);
1245
1246 switch (errorCode) {
1247 case "action_timeout":
1248 return {
1249 deliveryAck,
1250 errorCode,
1251 message,
1252 result: "browser_action_timeout",
1253 timeoutMs
1254 };
1255 case "request_timeout":
1256 return {
1257 deliveryAck,
1258 errorCode,
1259 message,
1260 result: "browser_request_timeout",
1261 timeoutMs
1262 };
1263 default:
1264 return {
1265 deliveryAck,
1266 errorCode,
1267 message,
1268 result: message,
1269 timeoutMs
1270 };
1271 }
1272}
1273
1274function toErrorMessage(error: unknown): string {
1275 if (error instanceof Error && normalizeOptionalString(error.message) != null) {
1276 return error.message;
1277 }
1278
1279 return String(error);
1280}