- commit
- 89b162c
- parent
- 01c3a35
- author
- codex@macbookpro
- date
- 2026-03-30 16:32:35 +0800 CST
feat: project renewal jobs from settled messages
10 files changed,
+1192,
-6
+390,
-0
1@@ -30,6 +30,7 @@ import {
2 PersistentBaaLiveInstructionMessageDeduper,
3 PersistentBaaLiveInstructionSnapshotStore,
4 createFetchControlApiClient,
5+ createRenewalProjectorRunner,
6 executeBaaInstruction,
7 extractBaaInstructionBlocks,
8 handleConductorHttpRequest,
9@@ -2884,6 +2885,272 @@ test("ConductorTimedJobs runs registered runners on leader ticks and writes JSON
10 }
11 });
12
13+test("renewal projector scans settled messages with cursor semantics and skips ineligible conversations", async () => {
14+ const rootDir = mkdtempSync(join(tmpdir(), "baa-renewal-projector-"));
15+ const logsDir = mkdtempSync(join(tmpdir(), "baa-renewal-projector-logs-"));
16+ const controlPlane = new ConductorLocalControlPlane({
17+ databasePath: join(rootDir, "control-plane.db")
18+ });
19+ await controlPlane.initialize();
20+ const repository = controlPlane.repository;
21+ const artifactStore = new ArtifactStore({
22+ artifactDir: join(rootDir, "artifacts"),
23+ databasePath: join(rootDir, "artifact.db"),
24+ publicBaseUrl: "https://conductor.makefile.so"
25+ });
26+ const nowMs = Date.UTC(2026, 2, 30, 10, 0, 0);
27+
28+ try {
29+ await artifactStore.upsertLocalConversation({
30+ automationStatus: "manual",
31+ localConversationId: "lc_manual",
32+ platform: "claude"
33+ });
34+ await artifactStore.upsertConversationLink({
35+ clientId: "firefox-manual",
36+ linkId: "link_manual",
37+ localConversationId: "lc_manual",
38+ observedAt: nowMs - 9_000,
39+ pageUrl: "https://claude.ai/chat/conv_manual",
40+ platform: "claude",
41+ remoteConversationId: "conv_manual",
42+ targetKind: "browser.proxy_delivery",
43+ targetPayload: {
44+ clientId: "firefox-manual",
45+ tabId: 1
46+ }
47+ });
48+ const manualMessage = await artifactStore.insertMessage({
49+ conversationId: "conv_manual",
50+ id: "msg_manual",
51+ observedAt: nowMs - 9_000,
52+ platform: "claude",
53+ rawText: "manual conversation should not project",
54+ role: "assistant"
55+ });
56+
57+ await artifactStore.upsertLocalConversation({
58+ automationStatus: "paused",
59+ localConversationId: "lc_paused",
60+ pausedAt: nowMs - 8_000,
61+ platform: "claude"
62+ });
63+ await artifactStore.upsertConversationLink({
64+ clientId: "firefox-paused",
65+ linkId: "link_paused",
66+ localConversationId: "lc_paused",
67+ observedAt: nowMs - 8_000,
68+ pageUrl: "https://claude.ai/chat/conv_paused",
69+ platform: "claude",
70+ remoteConversationId: "conv_paused",
71+ targetKind: "browser.proxy_delivery",
72+ targetPayload: {
73+ clientId: "firefox-paused",
74+ tabId: 2
75+ }
76+ });
77+ const pausedMessage = await artifactStore.insertMessage({
78+ conversationId: "conv_paused",
79+ id: "msg_paused",
80+ observedAt: nowMs - 8_000,
81+ platform: "claude",
82+ rawText: "paused conversation should not project",
83+ role: "assistant"
84+ });
85+
86+ await artifactStore.upsertLocalConversation({
87+ automationStatus: "auto",
88+ cooldownUntil: nowMs + 30_000,
89+ localConversationId: "lc_cooldown",
90+ platform: "claude"
91+ });
92+ await artifactStore.upsertConversationLink({
93+ clientId: "firefox-cooldown",
94+ linkId: "link_cooldown",
95+ localConversationId: "lc_cooldown",
96+ observedAt: nowMs - 7_000,
97+ pageUrl: "https://claude.ai/chat/conv_cooldown",
98+ platform: "claude",
99+ remoteConversationId: "conv_cooldown",
100+ targetKind: "browser.proxy_delivery",
101+ targetPayload: {
102+ clientId: "firefox-cooldown",
103+ tabId: 3
104+ }
105+ });
106+ const cooldownMessage = await artifactStore.insertMessage({
107+ conversationId: "conv_cooldown",
108+ id: "msg_cooldown",
109+ observedAt: nowMs - 7_000,
110+ platform: "claude",
111+ rawText: "cooldown conversation should not project",
112+ role: "assistant"
113+ });
114+
115+ await artifactStore.upsertLocalConversation({
116+ automationStatus: "auto",
117+ localConversationId: "lc_auto",
118+ platform: "claude"
119+ });
120+ await artifactStore.upsertConversationLink({
121+ clientId: "firefox-auto",
122+ linkId: "link_auto",
123+ localConversationId: "lc_auto",
124+ observedAt: nowMs - 6_000,
125+ pageTitle: "Claude Auto",
126+ pageUrl: "https://claude.ai/chat/conv_auto",
127+ platform: "claude",
128+ remoteConversationId: "conv_auto",
129+ routeParams: {
130+ conversationId: "conv_auto"
131+ },
132+ routePath: "/chat/conv_auto",
133+ routePattern: "/chat/:conversationId",
134+ targetId: "client:firefox-auto",
135+ targetKind: "browser.proxy_delivery",
136+ targetPayload: {
137+ clientId: "firefox-auto",
138+ tabId: 4
139+ }
140+ });
141+ const autoMessage = await artifactStore.insertMessage({
142+ conversationId: "conv_auto",
143+ id: "msg_auto",
144+ observedAt: nowMs - 6_000,
145+ platform: "claude",
146+ rawText: "eligible auto conversation should project",
147+ role: "assistant"
148+ });
149+
150+ await artifactStore.upsertLocalConversation({
151+ automationStatus: "auto",
152+ localConversationId: "lc_missing_target",
153+ platform: "claude"
154+ });
155+ await artifactStore.upsertConversationLink({
156+ clientId: "firefox-missing-target",
157+ linkId: "link_missing_target",
158+ localConversationId: "lc_missing_target",
159+ observedAt: nowMs - 5_000,
160+ pageUrl: "https://claude.ai/chat/conv_missing_target",
161+ platform: "claude",
162+ remoteConversationId: "conv_missing_target"
163+ });
164+ const missingTargetMessage = await artifactStore.insertMessage({
165+ conversationId: "conv_missing_target",
166+ id: "msg_missing_target",
167+ observedAt: nowMs - 5_000,
168+ platform: "claude",
169+ rawText: "missing target should not project",
170+ role: "assistant"
171+ });
172+
173+ const timedJobs = new ConductorTimedJobs(
174+ {
175+ intervalMs: 5_000,
176+ maxMessagesPerTick: 10,
177+ maxTasksPerTick: 10,
178+ settleDelayMs: 1_000
179+ },
180+ {
181+ artifactStore,
182+ autoStart: false,
183+ logDir: logsDir,
184+ schedule: async (work) => {
185+ await work({
186+ controllerId: "mini-main",
187+ host: "mini",
188+ term: 2
189+ });
190+ return "scheduled";
191+ }
192+ }
193+ );
194+ timedJobs.registerRunner(
195+ createRenewalProjectorRunner({
196+ now: () => nowMs,
197+ repository
198+ })
199+ );
200+
201+ await timedJobs.start();
202+ const tick = await timedJobs.runTick("manual");
203+ assert.equal(tick.decision, "scheduled");
204+
205+ const jobs = await artifactStore.listRenewalJobs({});
206+ assert.equal(jobs.length, 1);
207+ assert.equal(jobs[0].messageId, autoMessage.id);
208+ assert.equal(jobs[0].status, "pending");
209+ assert.equal(jobs[0].payloadKind, "json");
210+ assert.equal(typeof jobs[0].logPath, "string");
211+ assert.match(jobs[0].logPath, /\.jsonl$/u);
212+
213+ const payload = JSON.parse(jobs[0].payload);
214+ assert.equal(payload.template, "summary_with_link");
215+ assert.match(payload.text, /\[renewal\] Context summary:/u);
216+ assert.equal(payload.sourceMessage.id, autoMessage.id);
217+ assert.equal(payload.linkUrl, "https://claude.ai/chat/conv_auto");
218+
219+ const targetSnapshot = JSON.parse(jobs[0].targetSnapshot);
220+ assert.equal(targetSnapshot.target.kind, "browser.proxy_delivery");
221+ assert.equal(targetSnapshot.route.pattern, "/chat/:conversationId");
222+ assert.equal(targetSnapshot.target.payload.tabId, 4);
223+
224+ const cursorState = await repository.getSystemState("renewal.projector.cursor");
225+ assert.ok(cursorState);
226+ assert.deepEqual(JSON.parse(cursorState.valueJson), {
227+ message_id: missingTargetMessage.id,
228+ observed_at: missingTargetMessage.observedAt
229+ });
230+
231+ const entries = readJsonlEntries(logsDir);
232+ assert.ok(entries.find((entry) => entry.runner === "renewal.projector" && entry.stage === "job_projected"));
233+ assert.ok(
234+ entries.find(
235+ (entry) => entry.runner === "renewal.projector" && entry.stage === "message_skipped" && entry.result === "automation_manual"
236+ )
237+ );
238+ assert.ok(
239+ entries.find(
240+ (entry) => entry.runner === "renewal.projector" && entry.stage === "message_skipped" && entry.result === "automation_paused"
241+ )
242+ );
243+ assert.ok(
244+ entries.find(
245+ (entry) => entry.runner === "renewal.projector" && entry.stage === "message_skipped" && entry.result === "cooldown_active"
246+ )
247+ );
248+ assert.ok(
249+ entries.find(
250+ (entry) => entry.runner === "renewal.projector" && entry.stage === "message_skipped" && entry.result === "route_unavailable"
251+ )
252+ );
253+ assert.ok(
254+ entries.find(
255+ (entry) =>
256+ entry.runner === "renewal.projector"
257+ && entry.stage === "scan_completed"
258+ && entry.cursor_after === `message:${missingTargetMessage.observedAt}:${missingTargetMessage.id}`
259+ )
260+ );
261+
262+ await timedJobs.stop();
263+ assert.equal(manualMessage.id, "msg_manual");
264+ assert.equal(pausedMessage.id, "msg_paused");
265+ assert.equal(cooldownMessage.id, "msg_cooldown");
266+ } finally {
267+ artifactStore.close();
268+ rmSync(rootDir, {
269+ force: true,
270+ recursive: true
271+ });
272+ rmSync(logsDir, {
273+ force: true,
274+ recursive: true
275+ });
276+ }
277+});
278+
279 test("ConductorTimedJobs keeps standby runners idle and clears interval handles on stop", async () => {
280 const logsDir = mkdtempSync(join(tmpdir(), "baa-timed-jobs-standby-"));
281 const intervalScheduler = createManualIntervalScheduler();
282@@ -6107,6 +6374,129 @@ test("ConductorRuntime persists renewal conversation links from browser.final_me
283 }
284 });
285
286+test("ConductorRuntime registers renewal projector, projects auto messages once, and keeps cursor across restart", async () => {
287+ const stateDir = mkdtempSync(join(tmpdir(), "baa-conductor-renewal-projector-runtime-"));
288+ const logsDir = mkdtempSync(join(tmpdir(), "baa-conductor-renewal-projector-runtime-logs-"));
289+ const nowSeconds = Math.floor(Date.UTC(2026, 2, 30, 11, 0, 0) / 1000);
290+ const runtimeConfig = {
291+ nodeId: "mini-main",
292+ host: "mini",
293+ role: "primary",
294+ controlApiBase: "https://control.example.test",
295+ localApiBase: "http://127.0.0.1:0",
296+ sharedToken: "replace-me",
297+ timedJobsIntervalMs: 60_000,
298+ timedJobsSettleDelayMs: 0,
299+ paths: {
300+ logsDir,
301+ runsDir: "/tmp/runs",
302+ stateDir
303+ }
304+ };
305+ const runtimeOptions = {
306+ autoStartLoops: false,
307+ now: () => nowSeconds
308+ };
309+ const observedAt = Date.UTC(2026, 2, 30, 10, 59, 0);
310+ let runtime = new ConductorRuntime(runtimeConfig, runtimeOptions);
311+ let client = null;
312+
313+ try {
314+ const snapshot = await runtime.start();
315+ const baseUrl = snapshot.controlApi.localApiBase;
316+ client = await connectFirefoxBridgeClient(snapshot.controlApi.firefoxWsUrl, "firefox-renewal-projector");
317+
318+ client.socket.send(
319+ JSON.stringify({
320+ type: "browser.final_message",
321+ platform: "chatgpt",
322+ conversation_id: "conv-runtime-projector",
323+ assistant_message_id: "msg-runtime-projector-1",
324+ raw_text: "runtime renewal projector message",
325+ observed_at: observedAt,
326+ page_title: "Runtime Projector",
327+ page_url: "https://chatgpt.com/c/conv-runtime-projector",
328+ tab_id: 77
329+ })
330+ );
331+
332+ const linksPayload = await waitForCondition(async () => {
333+ const response = await fetch(
334+ `${baseUrl}/v1/renewal/links?platform=chatgpt&remote_conversation_id=conv-runtime-projector`
335+ );
336+ assert.equal(response.status, 200);
337+ const payload = await response.json();
338+ assert.equal(payload.data.count, 1);
339+ return payload;
340+ }, 5_000, 50);
341+ const localConversationId = linksPayload.data.links[0].local_conversation_id;
342+
343+ const autoResponse = await fetch(
344+ `${baseUrl}/v1/renewal/conversations/${localConversationId}/auto`,
345+ {
346+ method: "POST"
347+ }
348+ );
349+ assert.equal(autoResponse.status, 200);
350+
351+ const timedJobs = runtime["timedJobs"];
352+ const artifactStore = runtime["artifactStore"];
353+ assert.ok(timedJobs.getRegisteredRunnerNames().includes("renewal.projector"));
354+
355+ const firstTick = await timedJobs.runTick("manual");
356+ assert.equal(firstTick.decision, "scheduled");
357+
358+ const jobsAfterFirstTick = await artifactStore.listRenewalJobs({});
359+ assert.equal(jobsAfterFirstTick.length, 1);
360+ assert.equal(jobsAfterFirstTick[0].messageId, "msg-runtime-projector-1");
361+ assert.equal(jobsAfterFirstTick[0].status, "pending");
362+
363+ const secondTick = await timedJobs.runTick("manual");
364+ assert.equal(secondTick.decision, "scheduled");
365+ assert.deepEqual(await artifactStore.listRenewalJobs({}), jobsAfterFirstTick);
366+
367+ await runtime.stop();
368+ runtime = new ConductorRuntime(runtimeConfig, runtimeOptions);
369+ const restartedSnapshot = await runtime.start();
370+ const restartedTimedJobs = runtime["timedJobs"];
371+ const restartedStore = runtime["artifactStore"];
372+
373+ assert.equal(restartedSnapshot.controlApi.localApiBase.startsWith("http://127.0.0.1:"), true);
374+ const restartTick = await restartedTimedJobs.runTick("manual");
375+ assert.equal(restartTick.decision, "scheduled");
376+ const jobsAfterRestart = await restartedStore.listRenewalJobs({});
377+ assert.equal(jobsAfterRestart.length, 1);
378+ assert.equal(jobsAfterRestart[0].messageId, "msg-runtime-projector-1");
379+
380+ const timedJobsEntries = readJsonlEntries(join(logsDir, "timed-jobs"));
381+ assert.ok(
382+ timedJobsEntries.find(
383+ (entry) => entry.runner === "renewal.projector" && entry.stage === "job_projected"
384+ )
385+ );
386+ assert.ok(
387+ timedJobsEntries.find(
388+ (entry) =>
389+ entry.runner === "renewal.projector"
390+ && entry.stage === "scan_completed"
391+ && entry.cursor_after === `message:${observedAt}:msg-runtime-projector-1`
392+ )
393+ );
394+ } finally {
395+ client?.queue.stop();
396+ client?.socket.close(1000, "done");
397+ await runtime.stop();
398+ rmSync(stateDir, {
399+ force: true,
400+ recursive: true
401+ });
402+ rmSync(logsDir, {
403+ force: true,
404+ recursive: true
405+ });
406+ }
407+});
408+
409 test("persistent live ingest survives restart and /v1/browser restores recent history from the journal", async () => {
410 const stateDir = mkdtempSync(join(tmpdir(), "baa-conductor-final-message-persist-"));
411 const databasePath = join(stateDir, "control-plane.sqlite");
+8,
-0
1@@ -49,6 +49,7 @@ import {
2 ConductorLocalControlPlane,
3 resolveDefaultConductorStateDir
4 } from "./local-control-plane.js";
5+import { createRenewalProjectorRunner } from "./renewal/projector.js";
6 import { ConductorTimedJobs } from "./timed-jobs/index.js";
7
8 export type { ConductorHttpRequest, ConductorHttpResponse } from "./http-types.js";
9@@ -72,6 +73,7 @@ export {
10 export { handleConductorHttpRequest } from "./local-api.js";
11 export * from "./artifacts/index.js";
12 export * from "./instructions/index.js";
13+export * from "./renewal/projector.js";
14 export * from "./timed-jobs/index.js";
15
16 export type ConductorRole = "primary" | "standby";
17@@ -2317,6 +2319,12 @@ export class ConductorRuntime {
18 setIntervalImpl: options.setIntervalImpl
19 }
20 );
21+ this.timedJobs.registerRunner(
22+ createRenewalProjectorRunner({
23+ now: () => this.now() * 1000,
24+ repository: this.localControlPlane.repository
25+ })
26+ );
27 this.localApiServer =
28 this.config.localApiBase == null
29 ? null
1@@ -0,0 +1,614 @@
2+import { join } from "node:path";
3+
4+import {
5+ buildArtifactPublicUrl,
6+ type ArtifactStore,
7+ type ConversationLinkRecord,
8+ type LocalConversationRecord,
9+ type MessageRecord,
10+ type MessageScanCursor
11+} from "../../../../packages/artifact-db/dist/index.js";
12+import type { ControlPlaneRepository } from "../../../../packages/db/dist/index.js";
13+
14+import type { TimedJobRunner, TimedJobRunnerResult, TimedJobTickContext } from "../timed-jobs/index.js";
15+
16+const DEFAULT_CURSOR_STATE_KEY = "renewal.projector.cursor";
17+const DEFAULT_MESSAGE_ROLE = "assistant";
18+const DEFAULT_PAYLOAD_TEMPLATE = "summary_with_link";
19+const DEFAULT_SUMMARY_LIMIT = 200;
20+const RUNNER_NAME = "renewal.projector";
21+
22+export type RenewalProjectorSkipReason =
23+ | "automation_manual"
24+ | "automation_paused"
25+ | "cooldown_active"
26+ | "duplicate_job"
27+ | "missing_active_link"
28+ | "missing_local_conversation"
29+ | "missing_remote_conversation_id"
30+ | "route_unavailable";
31+
32+export interface RenewalProjectorCursor extends MessageScanCursor {}
33+
34+export interface RenewalProjectorPayload {
35+ kind: "renewal.message";
36+ linkUrl: string | null;
37+ sourceMessage: {
38+ artifactUrl: string | null;
39+ conversationId: string | null;
40+ id: string;
41+ observedAt: number;
42+ platform: string;
43+ summary: string;
44+ };
45+ summary: string;
46+ template: string;
47+ text: string;
48+ version: 1;
49+}
50+
51+export interface RenewalProjectorTargetSnapshot {
52+ clientId: string | null;
53+ linkId: string;
54+ pageTitle: string | null;
55+ pageUrl: string | null;
56+ platform: string;
57+ route: {
58+ params: Record<string, unknown> | null;
59+ path: string | null;
60+ pattern: string | null;
61+ };
62+ target: {
63+ id: string | null;
64+ kind: string;
65+ payload: Record<string, unknown> | null;
66+ };
67+}
68+
69+export interface RenewalShouldRenewDecision {
70+ eligible: boolean;
71+ existingJobId?: string | null;
72+ reason: "eligible" | RenewalProjectorSkipReason;
73+}
74+
75+export interface RenewalProjectorRunnerOptions {
76+ cursorStateKey?: string;
77+ now?: () => number;
78+ repository: Pick<ControlPlaneRepository, "getSystemState" | "putSystemState">;
79+}
80+
81+interface RenewalCandidate {
82+ conversation: LocalConversationRecord;
83+ link: ConversationLinkRecord;
84+ message: MessageRecord;
85+}
86+
87+interface RenewalCandidateResolution {
88+ candidate: RenewalCandidate | null;
89+ reason: RenewalProjectorSkipReason | null;
90+}
91+
92+interface CursorStateValue {
93+ message_id: string;
94+ observed_at: number;
95+}
96+
97+export function createRenewalProjectorRunner(
98+ options: RenewalProjectorRunnerOptions
99+): TimedJobRunner {
100+ return {
101+ name: RUNNER_NAME,
102+ async run(context) {
103+ return runRenewalProjector(context, options);
104+ }
105+ };
106+}
107+
108+export async function runRenewalProjector(
109+ context: TimedJobTickContext,
110+ options: RenewalProjectorRunnerOptions
111+): Promise<TimedJobRunnerResult> {
112+ const artifactStore = context.artifactStore;
113+ const now = options.now ?? (() => Date.now());
114+
115+ if (artifactStore == null) {
116+ context.log({
117+ stage: "scan_skipped",
118+ result: "no_artifact_store"
119+ });
120+ return {
121+ details: {
122+ cursor_after: null,
123+ cursor_before: null,
124+ projected_jobs: 0,
125+ scanned_messages: 0,
126+ skipped_messages: 0
127+ },
128+ result: "no_artifact_store"
129+ };
130+ }
131+
132+ const nowMs = now();
133+ const settleCutoff = Math.max(0, nowMs - context.settleDelayMs);
134+ const cursorStateKey = normalizeCursorStateKey(options.cursorStateKey);
135+ const cursorBefore = await loadCursor(options.repository, cursorStateKey);
136+
137+ context.log({
138+ stage: "scan_window",
139+ result: "ok",
140+ details: {
141+ cursor_before: formatCursor(cursorBefore),
142+ settle_cutoff: settleCutoff
143+ }
144+ });
145+
146+ const messages = await artifactStore.scanMessages({
147+ after: cursorBefore,
148+ limit: context.maxMessagesPerTick,
149+ observedAtLte: settleCutoff,
150+ role: DEFAULT_MESSAGE_ROLE
151+ });
152+
153+ if (messages.length === 0) {
154+ context.log({
155+ stage: "scan_completed",
156+ result: "idle",
157+ details: {
158+ cursor_after: formatCursor(cursorBefore),
159+ cursor_before: formatCursor(cursorBefore),
160+ projected_jobs: 0,
161+ scanned_messages: 0,
162+ skipped_messages: 0
163+ }
164+ });
165+ return {
166+ details: {
167+ cursor_after: formatCursor(cursorBefore),
168+ cursor_before: formatCursor(cursorBefore),
169+ projected_jobs: 0,
170+ scanned_messages: 0,
171+ skipped_messages: 0
172+ },
173+ result: "idle"
174+ };
175+ }
176+
177+ let cursorAfter = cursorBefore;
178+ let projectedJobs = 0;
179+ let scannedMessages = 0;
180+ let skippedMessages = 0;
181+ let limitHit = false;
182+
183+ for (const message of messages) {
184+ scannedMessages += 1;
185+ cursorAfter = {
186+ id: message.id,
187+ observedAt: message.observedAt
188+ };
189+
190+ const resolution = await resolveRenewalCandidate(artifactStore, message);
191+
192+ if (resolution.candidate == null) {
193+ skippedMessages += 1;
194+ context.log({
195+ stage: "message_skipped",
196+ result: resolution.reason ?? "missing_local_conversation",
197+ details: {
198+ cursor: formatCursor(cursorAfter),
199+ message_id: message.id,
200+ platform: message.platform
201+ }
202+ });
203+ continue;
204+ }
205+
206+ const decision = await shouldRenew({
207+ candidate: resolution.candidate,
208+ now: nowMs,
209+ store: artifactStore
210+ });
211+
212+ if (!decision.eligible) {
213+ skippedMessages += 1;
214+ context.log({
215+ stage: "message_skipped",
216+ result: decision.reason,
217+ details: {
218+ cursor: formatCursor(cursorAfter),
219+ existing_job_id: decision.existingJobId ?? null,
220+ local_conversation_id: resolution.candidate.conversation.localConversationId,
221+ message_id: message.id
222+ }
223+ });
224+ continue;
225+ }
226+
227+ try {
228+ const job = await artifactStore.insertRenewalJob({
229+ jobId: buildRenewalJobId(message.id),
230+ localConversationId: resolution.candidate.conversation.localConversationId,
231+ logPath: buildRunnerLogPath(context.logDir, nowMs),
232+ messageId: message.id,
233+ nextAttemptAt: nowMs,
234+ payload: JSON.stringify(
235+ buildRenewalPayload(message, artifactStore, resolution.candidate.link.pageUrl)
236+ ),
237+ payloadKind: "json",
238+ targetSnapshot: buildRenewalTargetSnapshot(resolution.candidate.link)
239+ });
240+
241+ projectedJobs += 1;
242+ context.log({
243+ stage: "job_projected",
244+ result: "projected",
245+ details: {
246+ cursor: formatCursor(cursorAfter),
247+ job_id: job.jobId,
248+ local_conversation_id: job.localConversationId,
249+ message_id: job.messageId
250+ }
251+ });
252+ } catch (error) {
253+ if (isDuplicateRenewalJobError(error)) {
254+ skippedMessages += 1;
255+ context.log({
256+ stage: "message_skipped",
257+ result: "duplicate_job",
258+ details: {
259+ cursor: formatCursor(cursorAfter),
260+ local_conversation_id: resolution.candidate.conversation.localConversationId,
261+ message_id: message.id
262+ }
263+ });
264+ continue;
265+ }
266+
267+ throw error;
268+ }
269+
270+ if (projectedJobs >= context.maxTasksPerTick) {
271+ limitHit = true;
272+ break;
273+ }
274+ }
275+
276+ if (!equalCursor(cursorBefore, cursorAfter) && cursorAfter != null) {
277+ await saveCursor(options.repository, cursorStateKey, cursorAfter, nowMs);
278+ }
279+
280+ const result = limitHit ? "task_limit_reached" : "ok";
281+ const details = {
282+ cursor_after: formatCursor(cursorAfter),
283+ cursor_before: formatCursor(cursorBefore),
284+ projected_jobs: projectedJobs,
285+ scanned_messages: scannedMessages,
286+ skipped_messages: skippedMessages
287+ };
288+
289+ context.log({
290+ stage: "scan_completed",
291+ result,
292+ details
293+ });
294+
295+ return {
296+ details,
297+ result
298+ };
299+}
300+
301+export async function shouldRenew(input: {
302+ candidate: RenewalCandidate;
303+ now: number;
304+ store: Pick<ArtifactStore, "listRenewalJobs">;
305+}): Promise<RenewalShouldRenewDecision> {
306+ const { candidate } = input;
307+ const automationStatus = candidate.conversation.automationStatus;
308+
309+ if (automationStatus === "manual") {
310+ return {
311+ eligible: false,
312+ reason: "automation_manual"
313+ };
314+ }
315+
316+ if (automationStatus === "paused") {
317+ return {
318+ eligible: false,
319+ reason: "automation_paused"
320+ };
321+ }
322+
323+ if (
324+ candidate.conversation.cooldownUntil != null
325+ && candidate.conversation.cooldownUntil > input.now
326+ ) {
327+ return {
328+ eligible: false,
329+ reason: "cooldown_active"
330+ };
331+ }
332+
333+ if (!hasAvailableRoute(candidate.link)) {
334+ return {
335+ eligible: false,
336+ reason: "route_unavailable"
337+ };
338+ }
339+
340+ const existingJobs = await input.store.listRenewalJobs({
341+ limit: 1,
342+ messageId: candidate.message.id
343+ });
344+
345+ if (existingJobs[0] != null) {
346+ return {
347+ eligible: false,
348+ existingJobId: existingJobs[0].jobId,
349+ reason: "duplicate_job"
350+ };
351+ }
352+
353+ return {
354+ eligible: true,
355+ reason: "eligible"
356+ };
357+}
358+
359+export function buildRenewalPayload(
360+ message: MessageRecord,
361+ store: Pick<ArtifactStore, "getPublicBaseUrl">,
362+ fallbackLinkUrl: string | null = null
363+): RenewalProjectorPayload {
364+ const artifactUrl = buildArtifactPublicUrl(store.getPublicBaseUrl(), message.staticPath);
365+ const linkUrl = normalizeOptionalString(message.pageUrl)
366+ ?? normalizeOptionalString(fallbackLinkUrl)
367+ ?? artifactUrl;
368+ const summary = truncateText(message.summary ?? message.rawText, DEFAULT_SUMMARY_LIMIT);
369+ const lines = [`[renewal] Context summary: ${summary}`];
370+
371+ if (linkUrl != null) {
372+ lines.push(`Conversation link: ${linkUrl}`);
373+ } else {
374+ lines.push(`Message id: ${message.id}`);
375+ }
376+
377+ return {
378+ kind: "renewal.message",
379+ linkUrl,
380+ sourceMessage: {
381+ artifactUrl,
382+ conversationId: message.conversationId,
383+ id: message.id,
384+ observedAt: message.observedAt,
385+ platform: message.platform,
386+ summary
387+ },
388+ summary,
389+ template: DEFAULT_PAYLOAD_TEMPLATE,
390+ text: lines.join("\n"),
391+ version: 1
392+ };
393+}
394+
395+export function buildRenewalTargetSnapshot(
396+ link: ConversationLinkRecord
397+): RenewalProjectorTargetSnapshot {
398+ return {
399+ clientId: link.clientId,
400+ linkId: link.linkId,
401+ pageTitle: link.pageTitle,
402+ pageUrl: link.pageUrl,
403+ platform: link.platform,
404+ route: {
405+ params: parseJsonRecord(link.routeParams),
406+ path: link.routePath,
407+ pattern: link.routePattern
408+ },
409+ target: {
410+ id: link.targetId,
411+ kind: normalizeRequiredString(link.targetKind, "link.targetKind"),
412+ payload: parseJsonRecord(link.targetPayload)
413+ }
414+ };
415+}
416+
417+async function resolveRenewalCandidate(
418+ store: Pick<ArtifactStore, "getLocalConversation" | "listConversationLinks">,
419+ message: MessageRecord
420+): Promise<RenewalCandidateResolution> {
421+ const remoteConversationId = normalizeOptionalString(message.conversationId);
422+
423+ if (remoteConversationId == null) {
424+ return {
425+ candidate: null,
426+ reason: "missing_remote_conversation_id"
427+ };
428+ }
429+
430+ const links = await store.listConversationLinks({
431+ isActive: true,
432+ limit: 1,
433+ platform: message.platform,
434+ remoteConversationId
435+ });
436+ const link = links[0] ?? null;
437+
438+ if (link == null) {
439+ return {
440+ candidate: null,
441+ reason: "missing_active_link"
442+ };
443+ }
444+
445+ const conversation = await store.getLocalConversation(link.localConversationId);
446+
447+ if (conversation == null) {
448+ return {
449+ candidate: null,
450+ reason: "missing_local_conversation"
451+ };
452+ }
453+
454+ return {
455+ candidate: {
456+ conversation,
457+ link,
458+ message
459+ },
460+ reason: null
461+ };
462+}
463+
464+function buildRenewalJobId(messageId: string): string {
465+ return `renewal_${normalizeRequiredString(messageId, "messageId")}`;
466+}
467+
468+function buildRunnerLogPath(logDir: string | null, now: number): string | null {
469+ if (logDir == null) {
470+ return null;
471+ }
472+
473+ return join(logDir, `${new Date(now).toISOString().slice(0, 10)}.jsonl`);
474+}
475+
476+function equalCursor(
477+ left: RenewalProjectorCursor | null,
478+ right: RenewalProjectorCursor | null
479+): boolean {
480+ return left?.id === right?.id && left?.observedAt === right?.observedAt;
481+}
482+
483+function formatCursor(cursor: RenewalProjectorCursor | null): string | null {
484+ if (cursor == null) {
485+ return null;
486+ }
487+
488+ return `message:${cursor.observedAt}:${cursor.id}`;
489+}
490+
491+function hasAvailableRoute(link: ConversationLinkRecord): boolean {
492+ if (link.isActive !== true) {
493+ return false;
494+ }
495+
496+ if (normalizeOptionalString(link.targetKind) == null) {
497+ return false;
498+ }
499+
500+ if (
501+ normalizeOptionalString(link.targetId) == null
502+ && parseJsonRecord(link.targetPayload) == null
503+ && normalizeOptionalString(link.pageUrl) == null
504+ ) {
505+ return false;
506+ }
507+
508+ return true;
509+}
510+
511+function isCursorStateValue(value: unknown): value is CursorStateValue {
512+ return (
513+ typeof value === "object"
514+ && value != null
515+ && typeof Reflect.get(value, "message_id") === "string"
516+ && Number.isInteger(Reflect.get(value, "observed_at"))
517+ );
518+}
519+
520+function isDuplicateRenewalJobError(error: unknown): boolean {
521+ return (
522+ error instanceof Error
523+ && error.message.includes("UNIQUE constraint failed: renewal_jobs.message_id")
524+ );
525+}
526+
527+async function loadCursor(
528+ repository: Pick<ControlPlaneRepository, "getSystemState">,
529+ cursorStateKey: string
530+): Promise<RenewalProjectorCursor | null> {
531+ const state = await repository.getSystemState(cursorStateKey);
532+
533+ if (state == null) {
534+ return null;
535+ }
536+
537+ const parsed = parseJsonValue(state.valueJson);
538+
539+ if (!isCursorStateValue(parsed)) {
540+ return null;
541+ }
542+
543+ return {
544+ id: parsed.message_id,
545+ observedAt: parsed.observed_at
546+ };
547+}
548+
549+function normalizeCursorStateKey(value: string | undefined): string {
550+ return normalizeRequiredString(value ?? DEFAULT_CURSOR_STATE_KEY, "cursorStateKey");
551+}
552+
553+function normalizeOptionalString(value: string | null | undefined): string | null {
554+ if (value == null) {
555+ return null;
556+ }
557+
558+ const normalized = value.trim();
559+ return normalized === "" ? null : normalized;
560+}
561+
562+function normalizeRequiredString(value: string | null | undefined, name: string): string {
563+ const normalized = normalizeOptionalString(value);
564+
565+ if (normalized == null) {
566+ throw new Error(`${name} must be a non-empty string.`);
567+ }
568+
569+ return normalized;
570+}
571+
572+function parseJsonRecord(value: string | null): Record<string, unknown> | null {
573+ const parsed = parseJsonValue(value);
574+ return isPlainRecord(parsed) ? parsed : null;
575+}
576+
577+function parseJsonValue(value: string | null): unknown {
578+ if (value == null) {
579+ return null;
580+ }
581+
582+ try {
583+ return JSON.parse(value);
584+ } catch {
585+ return null;
586+ }
587+}
588+
589+async function saveCursor(
590+ repository: Pick<ControlPlaneRepository, "putSystemState">,
591+ cursorStateKey: string,
592+ cursor: RenewalProjectorCursor,
593+ now: number
594+): Promise<void> {
595+ await repository.putSystemState({
596+ stateKey: cursorStateKey,
597+ updatedAt: Math.floor(now / 1000),
598+ valueJson: JSON.stringify({
599+ message_id: cursor.id,
600+ observed_at: cursor.observedAt
601+ })
602+ });
603+}
604+
605+function isPlainRecord(value: unknown): value is Record<string, unknown> {
606+ return typeof value === "object" && value != null && !Array.isArray(value);
607+}
608+
609+function truncateText(text: string, limit: number): string {
610+ if (text.length <= limit) {
611+ return text;
612+ }
613+
614+ return `${text.slice(0, Math.max(0, limit - 3))}...`;
615+}
1@@ -42,6 +42,7 @@ export interface TimedJobTickContext extends TimedJobScheduleContext {
2 artifactStore: ArtifactStore | null;
3 batchId: string;
4 config: TimedJobsConfig;
5+ logDir: string | null;
6 log: (input: TimedJobLogInput) => void;
7 maxMessagesPerTick: number;
8 maxTasksPerTick: number;
9@@ -378,6 +379,7 @@ export class ConductorTimedJobs {
10 artifactStore: this.artifactStore,
11 batchId,
12 config: this.getConfig(),
13+ logDir: this.logDir,
14 log: (input) => {
15 this.writeRunnerLog(runner.name, {
16 ...input,
+81,
-0
1@@ -102,6 +102,87 @@ test("ArtifactStore writes message, execution, session, and index artifacts sync
2 }
3 });
4
5+test("ArtifactStore scans messages by cursor and settle cutoff without rescanning older rows", async () => {
6+ const rootDir = mkdtempSync(join(tmpdir(), "artifact-db-message-scan-test-"));
7+ const stateDir = join(rootDir, "state");
8+ const databasePath = join(stateDir, ARTIFACT_DB_FILENAME);
9+ const artifactDir = join(stateDir, ARTIFACTS_DIRNAME);
10+ const store = new ArtifactStore({
11+ artifactDir,
12+ databasePath
13+ });
14+
15+ try {
16+ const first = await store.insertMessage({
17+ id: "msg_scan_001",
18+ observedAt: Date.UTC(2026, 2, 28, 9, 0, 0),
19+ platform: "claude",
20+ rawText: "first assistant message",
21+ role: "assistant"
22+ });
23+ await store.insertMessage({
24+ id: "msg_scan_user",
25+ observedAt: Date.UTC(2026, 2, 28, 9, 0, 30),
26+ platform: "claude",
27+ rawText: "interleaved user message",
28+ role: "user"
29+ });
30+ const second = await store.insertMessage({
31+ id: "msg_scan_002",
32+ observedAt: Date.UTC(2026, 2, 28, 9, 1, 0),
33+ platform: "claude",
34+ rawText: "second assistant message",
35+ role: "assistant"
36+ });
37+ const third = await store.insertMessage({
38+ id: "msg_scan_003",
39+ observedAt: Date.UTC(2026, 2, 28, 9, 2, 0),
40+ platform: "claude",
41+ rawText: "third assistant message",
42+ role: "assistant"
43+ });
44+
45+ assert.deepEqual(
46+ await store.scanMessages({
47+ limit: 10,
48+ observedAtLte: second.observedAt,
49+ role: "assistant"
50+ }),
51+ [first, second]
52+ );
53+ assert.deepEqual(
54+ await store.scanMessages({
55+ after: {
56+ id: first.id,
57+ observedAt: first.observedAt
58+ },
59+ limit: 10,
60+ observedAtLte: third.observedAt,
61+ role: "assistant"
62+ }),
63+ [second, third]
64+ );
65+ assert.deepEqual(
66+ await store.scanMessages({
67+ after: {
68+ id: second.id,
69+ observedAt: second.observedAt
70+ },
71+ limit: 10,
72+ observedAtLte: second.observedAt,
73+ role: "assistant"
74+ }),
75+ []
76+ );
77+ } finally {
78+ store.close();
79+ rmSync(rootDir, {
80+ force: true,
81+ recursive: true
82+ });
83+ }
84+});
85+
86 test("ArtifactStore persists renewal storage records and enqueues sync payloads", async () => {
87 const rootDir = mkdtempSync(join(tmpdir(), "artifact-db-renewal-test-"));
88 const stateDir = join(rootDir, "state");
+2,
-0
1@@ -30,10 +30,12 @@ export {
2 type ListRenewalJobsOptions,
3 type ListSessionsOptions,
4 type LocalConversationRecord,
5+ type MessageScanCursor,
6 type MessageRecord,
7 type RenewalJobPayloadKind,
8 type RenewalJobRecord,
9 type RenewalJobStatus,
10+ type ScanMessagesOptions,
11 type SessionIndexEntry,
12 type SessionRecord,
13 type SessionTimelineEntry,
+2,
-0
1@@ -18,6 +18,8 @@ CREATE INDEX IF NOT EXISTS idx_messages_conversation
2 ON messages(conversation_id);
3 CREATE INDEX IF NOT EXISTS idx_messages_platform
4 ON messages(platform, observed_at DESC);
5+CREATE INDEX IF NOT EXISTS idx_messages_role_observed
6+ ON messages(role, observed_at ASC, id ASC);
7
8 CREATE TABLE IF NOT EXISTS executions (
9 instruction_id TEXT PRIMARY KEY,
+54,
-0
1@@ -35,10 +35,12 @@ import {
2 type ListRenewalJobsOptions,
3 type ListSessionsOptions,
4 type LocalConversationRecord,
5+ type MessageScanCursor,
6 type MessageRecord,
7 type RenewalJobPayloadKind,
8 type RenewalJobRecord,
9 type RenewalJobStatus,
10+ type ScanMessagesOptions,
11 type SessionIndexEntry,
12 type SessionRecord,
13 type SessionTimelineEntry,
14@@ -656,6 +658,47 @@ export class ArtifactStore {
15 return rows.map(mapMessageRow);
16 }
17
18+ async scanMessages(options: ScanMessagesOptions = {}): Promise<MessageRecord[]> {
19+ const conditions: string[] = [];
20+ const params: Array<number | string | null> = [];
21+ const after = normalizeMessageScanCursor(options.after);
22+
23+ if (options.platform != null) {
24+ conditions.push("platform = ?");
25+ params.push(options.platform);
26+ }
27+
28+ if (options.role != null) {
29+ conditions.push("role = ?");
30+ params.push(options.role);
31+ }
32+
33+ if (options.observedAtLte != null) {
34+ conditions.push("observed_at <= ?");
35+ params.push(normalizeNonNegativeInteger(options.observedAtLte, 0, "observedAtLte"));
36+ }
37+
38+ if (after != null) {
39+ conditions.push("(observed_at > ? OR (observed_at = ? AND id > ?))");
40+ params.push(after.observedAt, after.observedAt, after.id);
41+ }
42+
43+ const rows = this.getRows<MessageRow>(
44+ [
45+ "SELECT * FROM messages",
46+ conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : "",
47+ "ORDER BY observed_at ASC, id ASC",
48+ "LIMIT ?"
49+ ]
50+ .filter(Boolean)
51+ .join(" "),
52+ ...params,
53+ normalizePositiveInteger(options.limit, 50)
54+ );
55+
56+ return rows.map(mapMessageRow);
57+ }
58+
59 async listSessions(options: ListSessionsOptions = {}): Promise<SessionRecord[]> {
60 const { clause, params } = buildConversationFilters(options.platform, options.conversationId);
61 const rows = this.getRows<SessionRow>(
62@@ -1589,6 +1632,17 @@ function normalizeOffset(value: number | undefined): number {
63 return value;
64 }
65
66+function normalizeMessageScanCursor(cursor: MessageScanCursor | null | undefined): MessageScanCursor | null {
67+ if (cursor == null) {
68+ return null;
69+ }
70+
71+ return {
72+ id: normalizeRequiredString(cursor.id, "cursor.id"),
73+ observedAt: normalizeNonNegativeInteger(cursor.observedAt, 0, "cursor.observedAt")
74+ };
75+}
76+
77 function normalizeOptionalBaseUrl(value: string | null | undefined): string | null {
78 const normalized = normalizeOptionalString(value);
79 return normalized == null ? null : normalized.replace(/\/+$/u, "");
+13,
-0
1@@ -49,6 +49,11 @@ export interface MessageRecord {
2 createdAt: number;
3 }
4
5+export interface MessageScanCursor {
6+ id: string;
7+ observedAt: number;
8+}
9+
10 export interface ExecutionRecord {
11 instructionId: string;
12 messageId: string;
13@@ -253,6 +258,14 @@ export interface ListMessagesOptions {
14 platform?: string;
15 }
16
17+export interface ScanMessagesOptions {
18+ after?: MessageScanCursor | null;
19+ limit?: number;
20+ observedAtLte?: number;
21+ platform?: string;
22+ role?: string;
23+}
24+
25 export interface ListExecutionsOptions {
26 limit?: number;
27 messageId?: string;
+26,
-6
1@@ -2,7 +2,7 @@
2
3 ## 状态
4
5-- 当前状态:`待开始`
6+- 当前状态:`已完成`
7 - 规模预估:`M`
8 - 依赖任务:`T-S055`、`T-S056`、`T-S057`
9 - 建议执行者:`Codex` 或 `Claude`
10@@ -154,22 +154,42 @@
11
12 ### 开始执行
13
14-- 执行者:
15-- 开始时间:
16+- 执行者:`Codex`
17+- 开始时间:`2026-03-30 16:00:12 CST`
18 - 状态变更:`待开始` → `进行中`
19
20 ### 完成摘要
21
22-- 完成时间:
23+- 完成时间:`2026-03-30 16:31:47 CST`
24 - 状态变更:`进行中` → `已完成`
25 - 修改了哪些文件:
26+ - `apps/conductor-daemon/src/renewal/projector.ts`
27+ - `apps/conductor-daemon/src/index.ts`
28+ - `apps/conductor-daemon/src/timed-jobs/runtime.ts`
29+ - `apps/conductor-daemon/src/index.test.js`
30+ - `packages/artifact-db/src/schema.ts`
31+ - `packages/artifact-db/src/store.ts`
32+ - `packages/artifact-db/src/types.ts`
33+ - `packages/artifact-db/src/index.ts`
34+ - `packages/artifact-db/src/index.test.js`
35+ - `tasks/T-S058.md`
36 - 核心实现思路:
37+ - 在 `artifact-db` 增加按 `(observed_at, id)` 游标顺序扫描消息的 store 原语和索引,支持按 settle cutoff 增量扫描旧消息,避免重复全表扫描
38+ - 新增 `renewal/projector.ts`,把消息到续命任务的投影逻辑收敛到独立 runner,包含 cursor 持久化、`shouldRenew()` 判断、结构化 payload / target snapshot 构建,以及 projector 过程日志
39+ - projector 基于 `local_conversations` 和 `conversation_links` 判断 `manual / auto / paused`、冷却期、重复任务和目标路由可用性,仅为满足条件的消息生成幂等 `pending` `renewal_jobs`
40+ - 在 `ConductorRuntime` 启动时注册 `renewal.projector` 到 timed-jobs 框架,并把 shared timed-jobs log 目录暴露给 runner 以回写 `renewal_jobs.log_path`
41+ - 用测试覆盖 cursor 扫描、manual / paused / cooldown / route unavailable 跳过、幂等投影、runtime 接线和重启后 cursor 持久化
42 - 跑了哪些测试:
43+ - `cd /Users/george/code/baa-conductor-renewal-message-projector && pnpm install`
44+ - `cd /Users/george/code/baa-conductor-renewal-message-projector && pnpm -C packages/artifact-db test`
45+ - `cd /Users/george/code/baa-conductor-renewal-message-projector && pnpm -C apps/conductor-daemon test`
46+ - `cd /Users/george/code/baa-conductor-renewal-message-projector && pnpm build`
47
48 ### 执行过程中遇到的问题
49
50--
51+- 新 worktree 初始没有安装依赖,第一次跑测试直接失败于 `pnpm exec tsc` 找不到;补跑一次 `pnpm install` 后恢复正常构建和测试流程
52
53 ### 剩余风险
54
55--
56+- 当前 projector 主要依赖 `messages.conversation_id -> active conversation_links` 解析本地对话;如果未来 final-message 再次出现缺失 remote conversation 且无法从其他信号补齐的情况,这类消息会被记录为 skip,不会生成续命任务
57+- 当前冷却期判断复用 `local_conversations.cooldown_until` 字段,但还没有单独的策略配置或自动写入逻辑;如果后续希望把“生成任务后自动推进冷却窗口”做成正式策略,需要在后续任务里补控制面配置