baa-conductor


baa-conductor / apps / conductor-daemon / src / renewal
im_wower  ·  2026-04-03

dispatcher.ts

   1import { join } from "node:path";
   2
   3import type {
   4  ArtifactStore,
   5  ConversationLinkRecord,
   6  LocalConversationRecord,
   7  RenewalJobPayloadKind,
   8  RenewalJobRecord
   9} from "../../../../packages/artifact-db/dist/index.js";
  10
  11import type {
  12  BrowserBridgeActionDispatch,
  13  BrowserBridgeActionResultSnapshot,
  14  BrowserBridgeController,
  15  BrowserBridgeDeliveryAckSnapshot
  16} from "../browser-types.js";
  17import type { TimedJobRunner, TimedJobRunnerResult, TimedJobTickContext } from "../timed-jobs/index.js";
  18import { DEFAULT_RENEWAL_EXECUTION_TIMEOUT_MS } from "../execution-timeouts.js";
  19
  20import {
  21  buildRenewalTargetSnapshot,
  22  type RenewalProjectorPayload,
  23  type RenewalProjectorTargetSnapshot
  24} from "./projector.js";
  25import {
  26  recordAutomationFailureSignal,
  27  recordAutomationSuccessSignal
  28} from "./automation.js";
  29import {
  30  isPlainRecord,
  31  normalizeOptionalString,
  32  parseJsonValue
  33} from "./utils.js";
  34
  35const DEFAULT_RECHECK_DELAY_MS = 10_000;
  36const DEFAULT_INTER_JOB_JITTER_MIN_MS = 500;
  37const DEFAULT_INTER_JOB_JITTER_MAX_MS = 3_000;
  38const DEFAULT_RETRY_BASE_DELAY_MS = 30_000;
  39const DEFAULT_RETRY_JITTER_FACTOR = 0.3;
  40const DEFAULT_RETRY_MAX_DELAY_MS = 5 * 60_000;
  41const CHATGPT_COLD_START_RETRY_BASE_DELAY_MS = 5_000;
  42const CHATGPT_COLD_START_RETRY_MAX_DELAY_MS = 30_000;
  43const DEFAULT_SUCCESS_COOLDOWN_MS = 60_000;
  44const CHATGPT_PLATFORM = "chatgpt";
  45const PROXY_DELIVERY_TARGET_KIND = "browser.proxy_delivery";
  46const RUNNER_NAME = "renewal.dispatcher";
  47const TAB_TARGET_ID_PATTERN = /^tab:(\d+)$/u;
  48
  49type RenewalDispatcherJobResult =
  50  | "attempt_failed"
  51  | "attempt_started"
  52  | "attempt_succeeded"
  53  | "automation_busy"
  54  | "automation_manual"
  55  | "automation_paused"
  56  | "failed"
  57  | "idle"
  58  | "missing_active_link"
  59  | "no_artifact_store"
  60  | "no_browser_bridge"
  61  | "ok"
  62  | "retry_scheduled";
  63
  64type RenewalJobTerminalReason =
  65  | "invalid_payload"
  66  | "missing_local_conversation"
  67  | "missing_payload_text";
  68
  69interface RenewalDispatchPayload {
  70  structured: RenewalProjectorPayload | null;
  71  text: string;
  72}
  73
  74interface RenewalDispatchTarget {
  75  clientId: string | null;
  76  conversationId: string | null;
  77  organizationId: string | null;
  78  pageTitle: string | null;
  79  pageUrl: string | null;
  80  platform: string;
  81  tabId: number | null;
  82}
  83
  84interface RenewalDispatchContext {
  85  conversation: LocalConversationRecord;
  86  job: RenewalJobRecord;
  87  link: ConversationLinkRecord;
  88  target: RenewalDispatchTarget;
  89  targetSnapshot: RenewalProjectorTargetSnapshot;
  90}
  91
  92interface RenewalDispatchOutcome {
  93  dispatch: BrowserBridgeActionDispatch;
  94  deliveryAck: BrowserBridgeDeliveryAckSnapshot;
  95  result: BrowserBridgeActionResultSnapshot;
  96}
  97
  98interface RenewalExecutionFailure {
  99  deliveryAck: BrowserBridgeDeliveryAckSnapshot | null;
 100  errorCode: string | null;
 101  message: string;
 102  result: string;
 103  timeoutMs: number | null;
 104}
 105
 106class RenewalProxyDeliveryError extends Error {
 107  readonly deliveryAck: BrowserBridgeDeliveryAckSnapshot | null;
 108
 109  constructor(message: string, deliveryAck: BrowserBridgeDeliveryAckSnapshot | null = null) {
 110    super(message);
 111    this.name = "RenewalProxyDeliveryError";
 112    this.deliveryAck = deliveryAck;
 113  }
 114}
 115
 116interface RenewalDispatcherRunnerOptions {
 117  browserBridge: BrowserBridgeController | null;
 118  executionTimeoutMs?: number;
 119  interJobJitterMaxMs?: number;
 120  interJobJitterMinMs?: number;
 121  now?: () => number;
 122  random?: () => number;
 123  recheckDelayMs?: number;
 124  retryBaseDelayMs?: number;
 125  retryJitterFactor?: number;
 126  retryMaxDelayMs?: number;
 127  successCooldownMs?: number;
 128}
 129
 130interface RenewalDispatcherJitterSettings {
 131  interJobJitterMaxMs: number;
 132  interJobJitterMinMs: number;
 133  random: () => number;
 134  retryJitterFactor: number;
 135}
 136
 137interface RenewalRetryDelaySchedule {
 138  baseDelayMs: number;
 139  delayMs: number;
 140  jitterMs: number;
 141}
 142
 143export function createRenewalDispatcherRunner(
 144  options: RenewalDispatcherRunnerOptions
 145): TimedJobRunner {
 146  return {
 147    name: RUNNER_NAME,
 148    async run(context) {
 149      return runRenewalDispatcher(context, options);
 150    }
 151  };
 152}
 153
 154export async function runRenewalDispatcher(
 155  context: TimedJobTickContext,
 156  options: RenewalDispatcherRunnerOptions
 157): Promise<TimedJobRunnerResult> {
 158  const artifactStore = context.artifactStore;
 159
 160  if (artifactStore == null) {
 161    context.log({
 162      stage: "scan_skipped",
 163      result: "no_artifact_store"
 164    });
 165    return buildDispatcherSummary("no_artifact_store");
 166  }
 167
 168  if (options.browserBridge == null) {
 169    context.log({
 170      stage: "scan_skipped",
 171      result: "no_browser_bridge"
 172    });
 173    return buildDispatcherSummary("no_browser_bridge");
 174  }
 175
 176  const now = options.now ?? (() => Date.now());
 177  const jitterSettings = resolveDispatcherJitterSettings(options);
 178  const nowMs = now();
 179  const dueJobs = await artifactStore.listRenewalJobs({
 180    limit: context.maxTasksPerTick,
 181    nextAttemptAtLte: nowMs,
 182    status: "pending"
 183  });
 184
 185  context.log({
 186    stage: "scan_window",
 187    result: "ok",
 188    details: {
 189      due_before: nowMs,
 190      limit: context.maxTasksPerTick
 191    }
 192  });
 193
 194  if (dueJobs.length === 0) {
 195    context.log({
 196      stage: "scan_completed",
 197      result: "idle",
 198      details: {
 199        due_jobs: 0,
 200        failed_jobs: 0,
 201        retried_jobs: 0,
 202        skipped_jobs: 0,
 203        successful_jobs: 0
 204      }
 205    });
 206    return buildDispatcherSummary("idle");
 207  }
 208
 209  let failedJobs = 0;
 210  let dispatchedJobs = 0;
 211  let retriedJobs = 0;
 212  let skippedJobs = 0;
 213  let successfulJobs = 0;
 214
 215  for (const job of dueJobs) {
 216    const jobNowMs = now();
 217    const dispatchContext = await resolveDispatchContext(artifactStore, job);
 218
 219    if (dispatchContext.conversation == null) {
 220      failedJobs += 1;
 221      await markJobFailed(artifactStore, job, {
 222        attemptCount: job.attemptCount + 1,
 223        lastError: "missing_local_conversation",
 224        logPath: resolveJobLogPath(job.logPath, context.logDir, jobNowMs),
 225        now: jobNowMs,
 226        targetSnapshot: dispatchContext.targetSnapshot
 227      });
 228      context.log({
 229        stage: "job_failed",
 230        result: "missing_local_conversation",
 231        details: {
 232          attempt_count: job.attemptCount + 1,
 233          job_id: job.jobId,
 234          local_conversation_id: job.localConversationId,
 235          message_id: job.messageId
 236        }
 237      });
 238      continue;
 239    }
 240
 241    if (dispatchContext.conversation.automationStatus !== "auto") {
 242      skippedJobs += 1;
 243      const nextAttemptAt = jobNowMs + resolvePositiveInteger(
 244        options.recheckDelayMs,
 245        Math.max(DEFAULT_RECHECK_DELAY_MS, context.config.intervalMs)
 246      );
 247      await artifactStore.updateRenewalJob({
 248        jobId: job.jobId,
 249        logPath: resolveJobLogPath(job.logPath, context.logDir, jobNowMs),
 250        nextAttemptAt,
 251        targetSnapshot: dispatchContext.targetSnapshot,
 252        updatedAt: jobNowMs
 253      });
 254      context.log({
 255        stage: "job_deferred",
 256        result:
 257          dispatchContext.conversation.automationStatus === "paused"
 258            ? "automation_paused"
 259            : "automation_manual",
 260        details: {
 261          automation_status: dispatchContext.conversation.automationStatus,
 262          job_id: job.jobId,
 263          local_conversation_id: job.localConversationId,
 264          message_id: job.messageId,
 265          next_attempt_at: nextAttemptAt
 266        }
 267      });
 268      continue;
 269    }
 270
 271    if (dispatchContext.link == null || dispatchContext.target == null) {
 272      const attempts = job.attemptCount + 1;
 273      const failureMessage = dispatchContext.link == null ? "missing_active_link" : "route_unavailable";
 274      const failureResult = await applyFailureOutcome(artifactStore, job, {
 275        attemptCount: attempts,
 276        errorMessage: failureMessage,
 277        logDir: context.logDir,
 278        now: jobNowMs,
 279        retryBaseDelayMs: options.retryBaseDelayMs,
 280        retryJitterFactor: jitterSettings.retryJitterFactor,
 281        retryMaxDelayMs: options.retryMaxDelayMs,
 282        random: jitterSettings.random,
 283        targetSnapshot: dispatchContext.targetSnapshot
 284      });
 285      await recordAutomationFailureSignal({
 286        conversation: dispatchContext.conversation,
 287        errorMessage: failureMessage,
 288        observedAt: jobNowMs,
 289        store: artifactStore
 290      });
 291
 292      if (failureResult.status === "failed") {
 293        failedJobs += 1;
 294        context.log({
 295          stage: "job_failed",
 296          result: failureResult.result,
 297          details: {
 298            attempt_count: attempts,
 299            job_id: job.jobId,
 300            message_id: job.messageId
 301          }
 302        });
 303      } else {
 304        retriedJobs += 1;
 305        context.log({
 306          stage: "job_retry_scheduled",
 307          result: failureResult.result,
 308          details: {
 309            attempt_count: attempts,
 310            job_id: job.jobId,
 311            message_id: job.messageId,
 312            next_attempt_at: failureResult.nextAttemptAt,
 313            retry_base_delay_ms: failureResult.baseDelayMs,
 314            retry_delay_ms: failureResult.delayMs,
 315            retry_jitter_ms: failureResult.jitterMs
 316          }
 317        });
 318      }
 319      continue;
 320    }
 321
 322    const payload = normalizeDispatchPayload(job.payload, job.payloadKind);
 323
 324    if (payload.text == null) {
 325      const errorMessage = payload.error ?? "invalid_payload";
 326      failedJobs += 1;
 327      await markJobFailed(artifactStore, job, {
 328        attemptCount: job.attemptCount + 1,
 329        lastError: errorMessage,
 330        logPath: resolveJobLogPath(job.logPath, context.logDir, jobNowMs),
 331        now: jobNowMs,
 332        targetSnapshot: dispatchContext.targetSnapshot
 333      });
 334      await recordAutomationFailureSignal({
 335        conversation: dispatchContext.conversation,
 336        errorMessage,
 337        observedAt: jobNowMs,
 338        store: artifactStore
 339      });
 340      context.log({
 341        stage: "job_failed",
 342        result: errorMessage,
 343        details: {
 344          attempt_count: job.attemptCount + 1,
 345          job_id: job.jobId,
 346          message_id: job.messageId
 347        }
 348      });
 349      continue;
 350    }
 351
 352    const interJobJitterMs = dispatchedJobs > 0
 353      ? sampleInterJobJitterMs(jitterSettings)
 354      : 0;
 355
 356    if (interJobJitterMs > 0) {
 357      context.log({
 358        stage: "job_dispatch_jitter",
 359        result: "inter_job_jitter_applied",
 360        details: {
 361          dispatch_sequence_in_tick: dispatchedJobs + 1,
 362          job_id: job.jobId,
 363          jitter_ms: interJobJitterMs,
 364          local_conversation_id: job.localConversationId,
 365          message_id: job.messageId
 366        }
 367      });
 368      await sleep(interJobJitterMs);
 369    }
 370
 371    const lockedConversation = await artifactStore.tryBeginLocalConversationExecution({
 372      executionState: "renewal_running",
 373      localConversationId: job.localConversationId,
 374      updatedAt: now()
 375    });
 376
 377    if (lockedConversation == null) {
 378      skippedJobs += 1;
 379      const nextAttemptAt = now() + resolvePositiveInteger(
 380        options.recheckDelayMs,
 381        Math.max(DEFAULT_RECHECK_DELAY_MS, context.config.intervalMs)
 382      );
 383      await artifactStore.updateRenewalJob({
 384        jobId: job.jobId,
 385        logPath: resolveJobLogPath(job.logPath, context.logDir, jobNowMs),
 386        nextAttemptAt,
 387        targetSnapshot: dispatchContext.targetSnapshot,
 388        updatedAt: now()
 389      });
 390      context.log({
 391        stage: "job_deferred",
 392        result: "automation_busy",
 393        details: {
 394          execution_state: dispatchContext.conversation.executionState,
 395          job_id: job.jobId,
 396          local_conversation_id: job.localConversationId,
 397          message_id: job.messageId,
 398          next_attempt_at: nextAttemptAt
 399        }
 400      });
 401      continue;
 402    }
 403
 404    try {
 405      const attemptStartedAt = now();
 406      const runningJob = await artifactStore.updateRenewalJob({
 407        finishedAt: null,
 408        jobId: job.jobId,
 409        lastAttemptAt: attemptStartedAt,
 410        lastError: null,
 411        logPath: resolveJobLogPath(job.logPath, context.logDir, attemptStartedAt),
 412        nextAttemptAt: null,
 413        startedAt: attemptStartedAt,
 414        status: "running",
 415        targetSnapshot: dispatchContext.targetSnapshot,
 416        updatedAt: attemptStartedAt
 417      });
 418      dispatchedJobs += 1;
 419      context.log({
 420        stage: "job_attempt_started",
 421        result: "attempt_started",
 422        details: {
 423          attempt_count: job.attemptCount + 1,
 424          client_id: dispatchContext.target.clientId,
 425          dispatch_sequence_in_tick: dispatchedJobs,
 426          inter_job_jitter_ms: interJobJitterMs,
 427          job_id: job.jobId,
 428          local_conversation_id: job.localConversationId,
 429          message_id: job.messageId,
 430          started_at: attemptStartedAt,
 431          tab_id: dispatchContext.target.tabId
 432        }
 433      });
 434
 435      const delivery = await dispatchRenewalJob(options.browserBridge, {
 436        assistantMessageId: job.messageId,
 437        messageText: payload.text,
 438        target: dispatchContext.target,
 439        timeoutMs: resolvePositiveInteger(options.executionTimeoutMs, DEFAULT_RENEWAL_EXECUTION_TIMEOUT_MS)
 440      });
 441      const finishedAt = now();
 442      const attemptCount = job.attemptCount + 1;
 443      const cooldownUntil = finishedAt + resolveSuccessCooldownMs(options.successCooldownMs, context.config.intervalMs);
 444      const recoveredFromColdStart = isChatgptColdStartFailureMessage(
 445        dispatchContext.target.platform,
 446        job.lastError
 447      );
 448
 449      await artifactStore.upsertLocalConversation({
 450        cooldownUntil,
 451        localConversationId: job.localConversationId,
 452        platform: dispatchContext.conversation.platform,
 453        updatedAt: finishedAt
 454      });
 455      await artifactStore.updateRenewalJob({
 456        attemptCount,
 457        finishedAt,
 458        jobId: job.jobId,
 459        lastAttemptAt: attemptStartedAt,
 460        lastError: null,
 461        logPath: resolveJobLogPath(job.logPath, context.logDir, attemptStartedAt),
 462        nextAttemptAt: null,
 463        startedAt: runningJob.startedAt ?? attemptStartedAt,
 464        status: "done",
 465        targetSnapshot: dispatchContext.targetSnapshot,
 466        updatedAt: finishedAt
 467      });
 468      await recordAutomationSuccessSignal({
 469        conversation: lockedConversation,
 470        observedAt: finishedAt,
 471        store: artifactStore
 472      });
 473      successfulJobs += 1;
 474      if (recoveredFromColdStart) {
 475        context.log({
 476          stage: "chatgpt_template_warmup",
 477          result: "template_warmup_completed",
 478          details: {
 479            attempt_count: attemptCount,
 480            job_id: job.jobId,
 481            local_conversation_id: job.localConversationId,
 482            message_id: job.messageId,
 483            recovered_at: finishedAt,
 484            previous_error: job.lastError
 485          }
 486        });
 487      }
 488      context.log({
 489        stage: "job_completed",
 490        result: "attempt_succeeded",
 491        details: {
 492          attempt_count: attemptCount,
 493          client_id: delivery.dispatch.clientId,
 494          connection_id: delivery.dispatch.connectionId,
 495          downstream_delivery_level: delivery.deliveryAck.level,
 496          downstream_status_code: delivery.deliveryAck.status_code,
 497          job_id: job.jobId,
 498          message_id: job.messageId,
 499          proxy_request_id: delivery.dispatch.requestId,
 500          received_at: delivery.result.received_at,
 501          recovered_from_cold_start: recoveredFromColdStart || undefined,
 502          status_confirmed_at: delivery.deliveryAck.confirmed_at
 503        }
 504      });
 505    } catch (error) {
 506      const failure = normalizeExecutionFailure(error);
 507      const attempts = job.attemptCount + 1;
 508      const coldStartFailure = isChatgptColdStartFailureMessage(
 509        dispatchContext.target.platform,
 510        failure.result
 511      );
 512      const failureResult = await applyFailureOutcome(artifactStore, job, {
 513        attemptCount: attempts,
 514        errorMessage: failure.result,
 515        logDir: context.logDir,
 516        now: now(),
 517        retryBaseDelayMs: coldStartFailure
 518          ? CHATGPT_COLD_START_RETRY_BASE_DELAY_MS
 519          : options.retryBaseDelayMs,
 520        retryJitterFactor: jitterSettings.retryJitterFactor,
 521        retryMaxDelayMs: coldStartFailure
 522          ? CHATGPT_COLD_START_RETRY_MAX_DELAY_MS
 523          : options.retryMaxDelayMs,
 524        random: jitterSettings.random,
 525        targetSnapshot: dispatchContext.targetSnapshot
 526      });
 527      await recordAutomationFailureSignal({
 528        conversation: lockedConversation,
 529        errorMessage: failure.result,
 530        observedAt: now(),
 531        store: artifactStore
 532      });
 533      if (coldStartFailure) {
 534        context.log({
 535          stage: "chatgpt_cold_start_delivery",
 536          result: failureResult.status === "pending"
 537            ? "waiting_for_template_warmup"
 538            : "template_warmup_exhausted",
 539          details: {
 540            attempt_count: attempts,
 541            error_message: failure.message,
 542            job_id: job.jobId,
 543            local_conversation_id: job.localConversationId,
 544            message_id: job.messageId,
 545            next_attempt_at: failureResult.status === "pending"
 546              ? failureResult.nextAttemptAt
 547              : undefined,
 548            retry_base_delay_ms: failureResult.status === "pending"
 549              ? failureResult.baseDelayMs
 550              : undefined,
 551            retry_delay_ms: failureResult.status === "pending"
 552              ? failureResult.delayMs
 553              : undefined
 554          }
 555        });
 556      }
 557
 558      if (failureResult.status === "failed") {
 559        failedJobs += 1;
 560        context.log({
 561          stage: "job_failed",
 562          result: failureResult.result,
 563          details: {
 564            attempt_count: attempts,
 565            downstream_delivery_level: failure.deliveryAck?.level ?? undefined,
 566            downstream_reason: failure.deliveryAck?.reason ?? undefined,
 567            downstream_status_code: failure.deliveryAck?.status_code ?? undefined,
 568            error_code: failure.errorCode,
 569            error_message: failure.message,
 570            job_id: job.jobId,
 571            message_id: job.messageId,
 572            cold_start_waiting_for_template: coldStartFailure || undefined,
 573            timeout_ms: failure.timeoutMs
 574          }
 575        });
 576      } else {
 577        retriedJobs += 1;
 578        context.log({
 579          stage: "job_retry_scheduled",
 580          result: failureResult.result,
 581          details: {
 582            attempt_count: attempts,
 583            downstream_delivery_level: failure.deliveryAck?.level ?? undefined,
 584            downstream_reason: failure.deliveryAck?.reason ?? undefined,
 585            downstream_status_code: failure.deliveryAck?.status_code ?? undefined,
 586            error_code: failure.errorCode,
 587            error_message: failure.message,
 588            job_id: job.jobId,
 589            message_id: job.messageId,
 590            next_attempt_at: failureResult.nextAttemptAt,
 591            retry_base_delay_ms: failureResult.baseDelayMs,
 592            retry_delay_ms: failureResult.delayMs,
 593            retry_jitter_ms: failureResult.jitterMs,
 594            cold_start_waiting_for_template: coldStartFailure || undefined,
 595            timeout_ms: failure.timeoutMs
 596          }
 597        });
 598      }
 599    } finally {
 600      await artifactStore.finishLocalConversationExecution({
 601        executionState: "renewal_running",
 602        localConversationId: job.localConversationId,
 603        updatedAt: now()
 604      });
 605    }
 606  }
 607
 608  context.log({
 609    stage: "scan_completed",
 610    result: "ok",
 611    details: {
 612      due_jobs: dueJobs.length,
 613      failed_jobs: failedJobs,
 614      retried_jobs: retriedJobs,
 615      skipped_jobs: skippedJobs,
 616      successful_jobs: successfulJobs
 617    }
 618  });
 619
 620  return {
 621    details: {
 622      due_jobs: dueJobs.length,
 623      failed_jobs: failedJobs,
 624      retried_jobs: retriedJobs,
 625      skipped_jobs: skippedJobs,
 626      successful_jobs: successfulJobs
 627    },
 628    result: "ok"
 629  };
 630}
 631
 632async function resolveDispatchContext(
 633  artifactStore: Pick<ArtifactStore, "getLocalConversation" | "listConversationLinks">,
 634  job: RenewalJobRecord
 635): Promise<{
 636  conversation: LocalConversationRecord | null;
 637  link: ConversationLinkRecord | null;
 638  target: RenewalDispatchTarget | null;
 639  targetSnapshot: RenewalProjectorTargetSnapshot | null;
 640}> {
 641  const conversation = await artifactStore.getLocalConversation(job.localConversationId);
 642
 643  if (conversation == null) {
 644    return {
 645      conversation: null,
 646      link: null,
 647      target: null,
 648      targetSnapshot: parseRenewalTargetSnapshot(job.targetSnapshot)
 649    };
 650  }
 651
 652  const [link] = await artifactStore.listConversationLinks({
 653    isActive: true,
 654    limit: 1,
 655    localConversationId: job.localConversationId
 656  });
 657  const targetSnapshot = link == null
 658    ? parseRenewalTargetSnapshot(job.targetSnapshot)
 659    : buildRenewalTargetSnapshot(link);
 660
 661  return {
 662    conversation,
 663    link: link ?? null,
 664    target: resolveDispatchTarget(targetSnapshot),
 665    targetSnapshot
 666  };
 667}
 668
 669async function dispatchRenewalJob(
 670  browserBridge: BrowserBridgeController,
 671  input: {
 672    assistantMessageId: string;
 673    messageText: string;
 674    target: RenewalDispatchTarget;
 675    timeoutMs: number;
 676  }
 677): Promise<RenewalDispatchOutcome> {
 678  const targetTabId =
 679    input.target.conversationId != null || input.target.pageUrl != null
 680      ? null
 681      : input.target.tabId;
 682  const dispatch = browserBridge.proxyDelivery({
 683    assistantMessageId: input.assistantMessageId,
 684    clientId: input.target.clientId,
 685    conversationId: input.target.conversationId,
 686    messageText: input.messageText,
 687    organizationId: input.target.organizationId,
 688    pageTitle: input.target.pageTitle,
 689    pageUrl: input.target.pageUrl,
 690    planId: buildRenewalPlanId(input.assistantMessageId),
 691    platform: input.target.platform,
 692    shellPage: false,
 693    tabId: targetTabId,
 694    timeoutMs: input.timeoutMs
 695  });
 696  const result = await dispatch.result;
 697  const deliveryAck = resolveRenewalDeliveryAck(result);
 698
 699  if (result.accepted !== true || result.failed === true) {
 700    throw new RenewalProxyDeliveryError(
 701      normalizeOptionalString(result.reason) ?? "browser proxy delivery failed",
 702      deliveryAck
 703    );
 704  }
 705
 706  if (deliveryAck == null) {
 707    throw new RenewalProxyDeliveryError("downstream_status_missing");
 708  }
 709
 710  if (deliveryAck.level < 1 || deliveryAck.status_code == null || deliveryAck.failed === true || deliveryAck.status_code !== 200) {
 711    throw new RenewalProxyDeliveryError(
 712      buildRenewalDeliveryFailureResult(deliveryAck),
 713      deliveryAck
 714    );
 715  }
 716
 717  return {
 718    dispatch,
 719    deliveryAck,
 720    result
 721  };
 722}
 723
 724function resolveRenewalDeliveryAck(
 725  result: BrowserBridgeActionResultSnapshot
 726): BrowserBridgeDeliveryAckSnapshot | null {
 727  for (const item of result.results) {
 728    if (item.delivery_ack != null) {
 729      return item.delivery_ack;
 730    }
 731  }
 732
 733  return null;
 734}
 735
 736function buildRenewalDeliveryFailureResult(deliveryAck: BrowserBridgeDeliveryAckSnapshot): string {
 737  if (deliveryAck.status_code != null) {
 738    return `downstream_status_${deliveryAck.status_code}`;
 739  }
 740
 741  return normalizeOptionalString(deliveryAck.reason) ?? "downstream_status_missing";
 742}
 743
 744async function applyFailureOutcome(
 745  artifactStore: Pick<ArtifactStore, "updateRenewalJob">,
 746  job: RenewalJobRecord,
 747  input: {
 748    attemptCount: number;
 749    errorMessage: string;
 750    logDir: string | null;
 751    now: number;
 752    retryBaseDelayMs?: number;
 753    retryJitterFactor?: number;
 754    retryMaxDelayMs?: number;
 755    random?: () => number;
 756    targetSnapshot: RenewalProjectorTargetSnapshot | null;
 757  }
 758): Promise<
 759  | {
 760      result: string;
 761      status: "failed";
 762    }
 763  | {
 764      baseDelayMs: number;
 765      delayMs: number;
 766      jitterMs: number;
 767      nextAttemptAt: number;
 768      result: string;
 769      status: "pending";
 770    }
 771> {
 772  const resolvedError = normalizeOptionalString(input.errorMessage) ?? "renewal_dispatch_failed";
 773  const retryable = isRetryableFailure(resolvedError);
 774
 775  if (!retryable || input.attemptCount >= job.maxAttempts) {
 776    await markJobFailed(artifactStore, job, {
 777      attemptCount: input.attemptCount,
 778      lastError: resolvedError,
 779      logPath: resolveJobLogPath(job.logPath, input.logDir, input.now),
 780      now: input.now,
 781      targetSnapshot: input.targetSnapshot
 782    });
 783    return {
 784      result: resolvedError,
 785      status: "failed"
 786    };
 787  }
 788
 789  const retryDelay = computeRetryDelayMs(input.attemptCount, {
 790    random: input.random,
 791    retryBaseDelayMs: input.retryBaseDelayMs,
 792    retryJitterFactor: input.retryJitterFactor,
 793    retryMaxDelayMs: input.retryMaxDelayMs
 794  });
 795  const nextAttemptAt = input.now + retryDelay.delayMs;
 796  await artifactStore.updateRenewalJob({
 797    attemptCount: input.attemptCount,
 798    finishedAt: null,
 799    jobId: job.jobId,
 800    lastAttemptAt: input.now,
 801    lastError: resolvedError,
 802    logPath: resolveJobLogPath(job.logPath, input.logDir, input.now),
 803    nextAttemptAt,
 804    startedAt: null,
 805    status: "pending",
 806    targetSnapshot: input.targetSnapshot,
 807    updatedAt: input.now
 808  });
 809  return {
 810    baseDelayMs: retryDelay.baseDelayMs,
 811    delayMs: retryDelay.delayMs,
 812    jitterMs: retryDelay.jitterMs,
 813    nextAttemptAt,
 814    result: resolvedError,
 815    status: "pending"
 816  };
 817}
 818
 819async function markJobFailed(
 820  artifactStore: Pick<ArtifactStore, "updateRenewalJob">,
 821  job: RenewalJobRecord,
 822  input: {
 823    attemptCount: number;
 824    lastError: string;
 825    logPath: string | null;
 826    now?: number;
 827    targetSnapshot: RenewalProjectorTargetSnapshot | null;
 828  }
 829): Promise<void> {
 830  const finishedAt = input.now ?? Date.now();
 831
 832  await artifactStore.updateRenewalJob({
 833    attemptCount: input.attemptCount,
 834    finishedAt,
 835    jobId: job.jobId,
 836    lastAttemptAt: finishedAt,
 837    lastError: input.lastError,
 838    logPath: input.logPath,
 839    nextAttemptAt: null,
 840    startedAt: null,
 841    status: "failed",
 842    targetSnapshot: input.targetSnapshot,
 843    updatedAt: finishedAt
 844  });
 845}
 846
 847function buildDispatcherSummary(result: RenewalDispatcherJobResult): TimedJobRunnerResult {
 848  return {
 849    details: {
 850      due_jobs: 0,
 851      failed_jobs: 0,
 852      retried_jobs: 0,
 853      skipped_jobs: 0,
 854      successful_jobs: 0
 855    },
 856    result
 857  };
 858}
 859
 860function buildRenewalPlanId(messageId: string): string {
 861  return `renewal_${sanitizePathSegment(messageId)}_${Date.now()}`;
 862}
 863
 864function computeRetryDelayMs(
 865  attemptCount: number,
 866  options: {
 867    random?: () => number;
 868    retryBaseDelayMs?: number;
 869    retryJitterFactor?: number;
 870    retryMaxDelayMs?: number;
 871  }
 872): RenewalRetryDelaySchedule {
 873  const baseDelayMs = resolvePositiveInteger(options.retryBaseDelayMs, DEFAULT_RETRY_BASE_DELAY_MS);
 874  const maxDelayMs = resolvePositiveInteger(options.retryMaxDelayMs, DEFAULT_RETRY_MAX_DELAY_MS);
 875  const retryJitterFactor = resolveNonNegativeNumber(
 876    options.retryJitterFactor,
 877    DEFAULT_RETRY_JITTER_FACTOR
 878  );
 879  const exponent = Math.max(0, attemptCount - 1);
 880  const cappedBaseDelayMs = Math.min(maxDelayMs, baseDelayMs * (2 ** exponent));
 881
 882  if (retryJitterFactor === 0 || cappedBaseDelayMs <= 0) {
 883    return {
 884      baseDelayMs: cappedBaseDelayMs,
 885      delayMs: cappedBaseDelayMs,
 886      jitterMs: 0
 887    };
 888  }
 889
 890  const centeredRandom = (sampleRandomUnit(options.random) * 2) - 1;
 891  const sampledJitterMs = Math.round(cappedBaseDelayMs * retryJitterFactor * centeredRandom);
 892  const delayMs = Math.max(1, Math.min(maxDelayMs, cappedBaseDelayMs + sampledJitterMs));
 893
 894  return {
 895    baseDelayMs: cappedBaseDelayMs,
 896    delayMs,
 897    jitterMs: delayMs - cappedBaseDelayMs
 898  };
 899}
 900
 901function sampleInterJobJitterMs(settings: RenewalDispatcherJitterSettings): number {
 902  return sampleUniformJitterMs(
 903    settings.interJobJitterMinMs,
 904    settings.interJobJitterMaxMs,
 905    settings.random
 906  );
 907}
 908
 909function sampleUniformJitterMs(
 910  minMs: number,
 911  maxMs: number,
 912  random?: () => number
 913): number {
 914  const normalizedMinMs = resolveNonNegativeInteger(Math.min(minMs, maxMs), 0);
 915  const normalizedMaxMs = resolveNonNegativeInteger(Math.max(minMs, maxMs), 0);
 916
 917  if (normalizedMaxMs <= normalizedMinMs) {
 918    return normalizedMinMs;
 919  }
 920
 921  const offsetMs = normalizedMaxMs - normalizedMinMs;
 922  return normalizedMinMs + Math.round(offsetMs * sampleRandomUnit(random));
 923}
 924
 925function resolveDispatcherJitterSettings(
 926  options: RenewalDispatcherRunnerOptions
 927): RenewalDispatcherJitterSettings {
 928  const minJitterMs = resolveNonNegativeInteger(
 929    options.interJobJitterMinMs,
 930    DEFAULT_INTER_JOB_JITTER_MIN_MS
 931  );
 932  const maxJitterMs = resolveNonNegativeInteger(
 933    options.interJobJitterMaxMs,
 934    DEFAULT_INTER_JOB_JITTER_MAX_MS
 935  );
 936
 937  return {
 938    interJobJitterMaxMs: Math.max(minJitterMs, maxJitterMs),
 939    interJobJitterMinMs: Math.min(minJitterMs, maxJitterMs),
 940    random: options.random ?? Math.random,
 941    retryJitterFactor: resolveNonNegativeNumber(
 942      options.retryJitterFactor,
 943      DEFAULT_RETRY_JITTER_FACTOR
 944    )
 945  };
 946}
 947
 948function sampleRandomUnit(random?: () => number): number {
 949  const sampled = random == null ? Math.random() : random();
 950
 951  if (!Number.isFinite(sampled)) {
 952    return 0.5;
 953  }
 954
 955  return Math.min(1, Math.max(0, sampled));
 956}
 957
 958function normalizeDispatchPayload(
 959  payload: string,
 960  payloadKind: RenewalJobPayloadKind
 961): {
 962  error: RenewalJobTerminalReason | null;
 963  text: string | null;
 964  value: RenewalDispatchPayload | null;
 965} {
 966  if (payloadKind === "text") {
 967    const text = normalizeOptionalString(payload);
 968    return text == null
 969      ? {
 970          error: "missing_payload_text",
 971          text: null,
 972          value: null
 973        }
 974      : {
 975          error: null,
 976          text,
 977          value: {
 978            structured: null,
 979            text
 980          }
 981        };
 982  }
 983
 984  const parsed = parseJsonValue(payload);
 985
 986  if (!isPlainRecord(parsed)) {
 987    return {
 988      error: "invalid_payload",
 989      text: null,
 990      value: null
 991    };
 992  }
 993
 994  const text = normalizeOptionalString(readString(parsed, "text"));
 995
 996  if (text == null) {
 997    return {
 998      error: "missing_payload_text",
 999      text: null,
1000      value: null
1001    };
1002  }
1003
1004  return {
1005    error: null,
1006    text,
1007    value: {
1008      structured: isRenewalProjectorPayload(parsed) ? parsed : null,
1009      text
1010    }
1011  };
1012}
1013
1014function parseRenewalTargetSnapshot(value: string | null): RenewalProjectorTargetSnapshot | null {
1015  const parsed = parseJsonValue(value);
1016  return isRenewalTargetSnapshot(parsed) ? parsed : null;
1017}
1018
1019function resolveDispatchTarget(
1020  snapshot: RenewalProjectorTargetSnapshot | null
1021): RenewalDispatchTarget | null {
1022  if (snapshot == null || snapshot.target.kind !== PROXY_DELIVERY_TARGET_KIND) {
1023    return null;
1024  }
1025
1026  const payload = isPlainRecord(snapshot.target.payload) ? snapshot.target.payload : null;
1027  const shellPage = readBoolean(payload, "shellPage") === true;
1028  const routeParams = isPlainRecord(snapshot.route.params) ? snapshot.route.params : null;
1029  const tabId = readPositiveInteger(payload, "tabId") ?? parseTabId(snapshot.target.id);
1030  const conversationId =
1031    normalizeOptionalString(readString(payload, "conversationId"))
1032    ?? normalizeOptionalString(readString(routeParams, "conversationId"));
1033  const pageUrl = normalizeOptionalString(readString(payload, "pageUrl")) ?? snapshot.pageUrl;
1034
1035  if (shellPage || (conversationId == null && pageUrl == null && tabId == null)) {
1036    return null;
1037  }
1038
1039  return {
1040    clientId: normalizeOptionalString(readString(payload, "clientId")) ?? snapshot.clientId,
1041    conversationId,
1042    organizationId: normalizeOptionalString(readString(payload, "organizationId")),
1043    pageTitle: normalizeOptionalString(readString(payload, "pageTitle")) ?? snapshot.pageTitle,
1044    pageUrl,
1045    platform: snapshot.platform,
1046    tabId
1047  };
1048}
1049
1050function resolveJobLogPath(existing: string | null, logDir: string | null, now: number): string | null {
1051  if (normalizeOptionalString(existing) != null) {
1052    return existing;
1053  }
1054
1055  if (logDir == null) {
1056    return null;
1057  }
1058
1059  return join(logDir, `${new Date(now).toISOString().slice(0, 10)}.jsonl`);
1060}
1061
1062function isRetryableFailure(message: string): boolean {
1063  const downstreamStatusCode = parseDownstreamStatusCode(message);
1064
1065  if (downstreamStatusCode != null) {
1066    return downstreamStatusCode === 429 || downstreamStatusCode >= 500;
1067  }
1068
1069  return ![
1070    "invalid_payload",
1071    "missing_local_conversation",
1072    "missing_payload_text"
1073  ].includes(message);
1074}
1075
1076function isChatgptColdStartFailureMessage(
1077  platform: string | null | undefined,
1078  message: string | null | undefined
1079): boolean {
1080  const normalizedMessage = normalizeOptionalString(message);
1081
1082  if (platform !== CHATGPT_PLATFORM || normalizedMessage == null) {
1083    return false;
1084  }
1085
1086  return /^delivery\.template_(?:invalid|missing)\b/u.test(normalizedMessage);
1087}
1088
1089function parseDownstreamStatusCode(message: string): number | null {
1090  const match = /^downstream_status_(\d{3})$/u.exec(message.trim());
1091
1092  if (match == null) {
1093    return null;
1094  }
1095
1096  return Number(match[1]);
1097}
1098
1099function isRenewalProjectorPayload(value: unknown): value is RenewalProjectorPayload {
1100  return (
1101    isPlainRecord(value)
1102    && value.kind === "renewal.message"
1103    && typeof value.text === "string"
1104    && typeof value.template === "string"
1105    && value.version === 1
1106    && isPlainRecord(value.sourceMessage)
1107    && typeof value.sourceMessage.id === "string"
1108  );
1109}
1110
1111function isRenewalTargetSnapshot(value: unknown): value is RenewalProjectorTargetSnapshot {
1112  return (
1113    isPlainRecord(value)
1114    && typeof value.platform === "string"
1115    && typeof value.linkId === "string"
1116    && isPlainRecord(value.route)
1117    && isPlainRecord(value.target)
1118    && typeof value.target.kind === "string"
1119  );
1120}
1121
1122function parseTabId(value: string | null): number | null {
1123  const normalized = normalizeOptionalString(value);
1124
1125  if (normalized == null) {
1126    return null;
1127  }
1128
1129  const matched = TAB_TARGET_ID_PATTERN.exec(normalized);
1130
1131  if (matched == null) {
1132    return null;
1133  }
1134
1135  const parsed = Number.parseInt(matched[1] ?? "", 10);
1136  return Number.isInteger(parsed) && parsed > 0 ? parsed : null;
1137}
1138
1139function readBoolean(record: Record<string, unknown> | null, key: string): boolean | null {
1140  if (record == null) {
1141    return null;
1142  }
1143
1144  return typeof record[key] === "boolean" ? record[key] : null;
1145}
1146
1147function readPositiveInteger(record: Record<string, unknown> | null, key: string): number | null {
1148  if (record == null) {
1149    return null;
1150  }
1151
1152  const value = record[key];
1153  return Number.isInteger(value) && Number(value) > 0 ? Number(value) : null;
1154}
1155
1156function readString(record: Record<string, unknown> | null, key: string): string | null {
1157  if (record == null) {
1158    return null;
1159  }
1160
1161  return typeof record[key] === "string" ? record[key] : null;
1162}
1163
1164function resolvePositiveInteger(value: number | undefined, fallback: number): number {
1165  return Number.isInteger(value) && Number(value) > 0 ? Number(value) : fallback;
1166}
1167
1168function resolveNonNegativeInteger(value: number | undefined, fallback: number): number {
1169  return Number.isInteger(value) && Number(value) >= 0 ? Number(value) : fallback;
1170}
1171
1172function resolveNonNegativeNumber(value: number | undefined, fallback: number): number {
1173  return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : fallback;
1174}
1175
1176function resolveSuccessCooldownMs(value: number | undefined, intervalMs: number): number {
1177  if (Number.isInteger(value) && Number(value) > 0) {
1178    return Number(value);
1179  }
1180
1181  return Math.max(DEFAULT_SUCCESS_COOLDOWN_MS, intervalMs + 1);
1182}
1183
1184async function sleep(ms: number): Promise<void> {
1185  const delayMs = resolveNonNegativeInteger(Math.round(ms), 0);
1186
1187  if (delayMs === 0) {
1188    return;
1189  }
1190
1191  await new Promise<void>((resolve) => {
1192    setTimeout(resolve, delayMs);
1193  });
1194}
1195
1196function sanitizePathSegment(value: string): string {
1197  const normalized = value.trim().toLowerCase().replace(/[^a-z0-9]+/gu, "-");
1198  const collapsed = normalized.replace(/-+/gu, "-").replace(/^-|-$/gu, "");
1199  return collapsed === "" ? "unknown" : collapsed;
1200}
1201
1202function readErrorCode(error: unknown): string | null {
1203  return isPlainRecord(error) && typeof error.code === "string" ? error.code : null;
1204}
1205
1206function readErrorTimeoutMs(error: unknown): number | null {
1207  if (!isPlainRecord(error) || typeof error.timeoutMs !== "number") {
1208    return null;
1209  }
1210
1211  return Number.isFinite(error.timeoutMs) && error.timeoutMs > 0
1212    ? Math.round(error.timeoutMs)
1213    : null;
1214}
1215
1216function readErrorDeliveryAck(error: unknown): BrowserBridgeDeliveryAckSnapshot | null {
1217  if (!isPlainRecord(error)) {
1218    return null;
1219  }
1220
1221  const candidate = error.deliveryAck;
1222
1223  if (!isPlainRecord(candidate)) {
1224    return null;
1225  }
1226
1227  const level = candidate.level;
1228  const statusCode = candidate.status_code;
1229  const confirmedAt = candidate.confirmed_at;
1230
1231  return {
1232    confirmed_at: typeof confirmedAt === "number" && Number.isFinite(confirmedAt) ? confirmedAt : null,
1233    failed: candidate.failed === true,
1234    level: level === 1 || level === 2 || level === 3 ? level : 0,
1235    reason: typeof candidate.reason === "string" ? candidate.reason : null,
1236    status_code: typeof statusCode === "number" && Number.isFinite(statusCode) ? statusCode : null
1237  };
1238}
1239
1240function normalizeExecutionFailure(error: unknown): RenewalExecutionFailure {
1241  const deliveryAck = readErrorDeliveryAck(error);
1242  const errorCode = readErrorCode(error);
1243  const timeoutMs = readErrorTimeoutMs(error);
1244  const message = toErrorMessage(error);
1245
1246  switch (errorCode) {
1247    case "action_timeout":
1248      return {
1249        deliveryAck,
1250        errorCode,
1251        message,
1252        result: "browser_action_timeout",
1253        timeoutMs
1254      };
1255    case "request_timeout":
1256      return {
1257        deliveryAck,
1258        errorCode,
1259        message,
1260        result: "browser_request_timeout",
1261        timeoutMs
1262      };
1263    default:
1264      return {
1265        deliveryAck,
1266        errorCode,
1267        message,
1268        result: message,
1269        timeoutMs
1270      };
1271  }
1272}
1273
1274function toErrorMessage(error: unknown): string {
1275  if (error instanceof Error && normalizeOptionalString(error.message) != null) {
1276    return error.message;
1277  }
1278
1279  return String(error);
1280}