- commit
- 0915a3e
- parent
- 8c3090b
- author
- codex@macbookpro
- date
- 2026-03-30 15:55:46 +0800 CST
feat: add timed jobs framework
5 files changed,
+961,
-8
+324,
-1
1@@ -1,7 +1,7 @@
2 import assert from "node:assert/strict";
3 import { EventEmitter } from "node:events";
4 import { createServer } from "node:http";
5-import { existsSync, mkdirSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs";
6+import { existsSync, mkdirSync, mkdtempSync, readFileSync, readdirSync, rmSync, writeFileSync } from "node:fs";
7 import { createConnection } from "node:net";
8 import { homedir, tmpdir } from "node:os";
9 import { join } from "node:path";
10@@ -22,6 +22,7 @@ import {
11 InMemoryBaaLiveInstructionMessageDeduper,
12 BrowserRequestPolicyController,
13 ConductorDaemon,
14+ ConductorTimedJobs,
15 ConductorRuntime,
16 DEFAULT_BAA_EXEC_INSTRUCTION_TIMEOUT_MS,
17 DEFAULT_BAA_INSTRUCTION_TIMEOUT_MS,
18@@ -1487,6 +1488,43 @@ function createManualTimerScheduler() {
19 };
20 }
21
22+function createManualIntervalScheduler() {
23+ let nextId = 1;
24+ const intervals = new Map();
25+
26+ return {
27+ clearInterval(intervalId) {
28+ intervals.delete(intervalId);
29+ },
30+ fireAll() {
31+ for (const interval of [...intervals.values()]) {
32+ interval.handler();
33+ }
34+ },
35+ getActiveCount() {
36+ return intervals.size;
37+ },
38+ setInterval(handler, intervalMs) {
39+ const intervalId = nextId++;
40+ intervals.set(intervalId, {
41+ handler,
42+ intervalMs
43+ });
44+ return intervalId;
45+ }
46+ };
47+}
48+
49+function readJsonlEntries(dirPath) {
50+ const fileNames = readdirSync(dirPath).filter((name) => name.endsWith(".jsonl")).sort();
51+ assert.ok(fileNames.length > 0, `expected at least one jsonl file in ${dirPath}`);
52+ return readFileSync(join(dirPath, fileNames[0]), "utf8")
53+ .trim()
54+ .split("\n")
55+ .filter(Boolean)
56+ .map((line) => JSON.parse(line));
57+}
58+
59 async function flushAsyncWork() {
60 await Promise.resolve();
61 await Promise.resolve();
62@@ -2695,6 +2733,243 @@ test("repeated renew failures degrade the daemon after the configured threshold"
63 assert.equal(daemon.getStatusSnapshot().consecutiveRenewFailures, 2);
64 });
65
66+test("parseConductorCliRequest reads timed-jobs defaults from env and allows CLI overrides", () => {
67+ const request = parseConductorCliRequest(
68+ [
69+ "start",
70+ "--run-once",
71+ "--timed-jobs-interval-ms",
72+ "6000",
73+ "--timed-jobs-max-tasks-per-tick",
74+ "9"
75+ ],
76+ {
77+ BAA_NODE_ID: "mini-main",
78+ BAA_CONDUCTOR_HOST: "mini",
79+ BAA_CONDUCTOR_ROLE: "primary",
80+ BAA_CONDUCTOR_PUBLIC_API_BASE: "https://public.example.test/",
81+ BAA_SHARED_TOKEN: "replace-me",
82+ BAA_TIMED_JOBS_INTERVAL_MS: "10000",
83+ BAA_TIMED_JOBS_MAX_MESSAGES_PER_TICK: "11",
84+ BAA_TIMED_JOBS_MAX_TASKS_PER_TICK: "12",
85+ BAA_TIMED_JOBS_SETTLE_DELAY_MS: "15000"
86+ }
87+ );
88+
89+ assert.equal(request.action, "start");
90+
91+ if (request.action !== "start") {
92+ throw new Error("expected start action");
93+ }
94+
95+ assert.equal(request.config.timedJobsIntervalMs, 6000);
96+ assert.equal(request.config.timedJobsMaxMessagesPerTick, 11);
97+ assert.equal(request.config.timedJobsMaxTasksPerTick, 9);
98+ assert.equal(request.config.timedJobsSettleDelayMs, 15000);
99+});
100+
101+test("ConductorTimedJobs runs registered runners on leader ticks and writes JSONL logs", async () => {
102+ const logsDir = mkdtempSync(join(tmpdir(), "baa-timed-jobs-leader-"));
103+ const daemon = new ConductorDaemon(
104+ {
105+ nodeId: "mini-main",
106+ host: "mini",
107+ role: "primary",
108+ controlApiBase: "https://control.example.test"
109+ },
110+ {
111+ autoStartLoops: false,
112+ client: {
113+ async acquireLeaderLease() {
114+ return createLeaseResult({
115+ holderId: "mini-main",
116+ term: 4,
117+ leaseExpiresAt: 130,
118+ renewedAt: 100,
119+ isLeader: true,
120+ operation: "acquire"
121+ });
122+ },
123+ async sendControllerHeartbeat() {}
124+ },
125+ now: () => 100
126+ }
127+ );
128+ const observedRuns = [];
129+ const timedJobs = new ConductorTimedJobs(
130+ {
131+ intervalMs: 5_000,
132+ maxMessagesPerTick: 10,
133+ maxTasksPerTick: 8,
134+ settleDelayMs: 12_000
135+ },
136+ {
137+ autoStart: false,
138+ logDir: logsDir,
139+ schedule: (work) => daemon.runSchedulerPass(work)
140+ }
141+ );
142+
143+ timedJobs.registerRunner({
144+ name: "renewal.projector",
145+ async run(context) {
146+ observedRuns.push({
147+ batchId: context.batchId,
148+ maxMessagesPerTick: context.maxMessagesPerTick,
149+ settleDelayMs: context.settleDelayMs,
150+ term: context.term
151+ });
152+ context.log({
153+ stage: "scan_window",
154+ result: "ok",
155+ details: {
156+ cursor: "message:1"
157+ }
158+ });
159+ return {
160+ result: "ok",
161+ details: {
162+ projected_messages: 0
163+ }
164+ };
165+ }
166+ });
167+
168+ try {
169+ await daemon.start();
170+ await timedJobs.start();
171+
172+ const tick = await timedJobs.runTick("manual");
173+
174+ assert.equal(tick.decision, "scheduled");
175+ assert.equal(observedRuns.length, 1);
176+ assert.equal(observedRuns[0].maxMessagesPerTick, 10);
177+ assert.equal(observedRuns[0].settleDelayMs, 12_000);
178+ assert.equal(observedRuns[0].term, 4);
179+
180+ await timedJobs.stop();
181+
182+ const entries = readJsonlEntries(logsDir);
183+ assert.ok(
184+ entries.find(
185+ (entry) =>
186+ entry.runner === "renewal.projector"
187+ && entry.stage === "runner_started"
188+ && entry.result === "running"
189+ )
190+ );
191+ assert.ok(
192+ entries.find(
193+ (entry) =>
194+ entry.runner === "renewal.projector"
195+ && entry.stage === "scan_window"
196+ && entry.cursor === "message:1"
197+ )
198+ );
199+
200+ const completedEntry = entries.find(
201+ (entry) => entry.runner === "renewal.projector" && entry.stage === "runner_completed"
202+ );
203+ assert.ok(completedEntry);
204+ assert.equal(completedEntry.result, "ok");
205+ assert.equal(typeof completedEntry.batch_id, "string");
206+ assert.equal(typeof completedEntry.duration_ms, "number");
207+ assert.equal(completedEntry.error, null);
208+ } finally {
209+ await timedJobs.stop();
210+ rmSync(logsDir, {
211+ force: true,
212+ recursive: true
213+ });
214+ }
215+});
216+
217+test("ConductorTimedJobs keeps standby runners idle and clears interval handles on stop", async () => {
218+ const logsDir = mkdtempSync(join(tmpdir(), "baa-timed-jobs-standby-"));
219+ const intervalScheduler = createManualIntervalScheduler();
220+ const daemon = new ConductorDaemon(
221+ {
222+ nodeId: "mac-standby",
223+ host: "mac",
224+ role: "standby",
225+ controlApiBase: "https://control.example.test"
226+ },
227+ {
228+ autoStartLoops: false,
229+ client: {
230+ async acquireLeaderLease() {
231+ return createLeaseResult({
232+ holderId: "mini-main",
233+ term: 7,
234+ leaseExpiresAt: 230,
235+ renewedAt: 200,
236+ isLeader: false,
237+ operation: "acquire"
238+ });
239+ },
240+ async sendControllerHeartbeat() {}
241+ },
242+ now: () => 200
243+ }
244+ );
245+ let runCount = 0;
246+ const timedJobs = new ConductorTimedJobs(
247+ {
248+ intervalMs: 5_000,
249+ maxMessagesPerTick: 10,
250+ maxTasksPerTick: 10,
251+ settleDelayMs: 10_000
252+ },
253+ {
254+ autoStart: true,
255+ clearIntervalImpl: intervalScheduler.clearInterval,
256+ logDir: logsDir,
257+ schedule: (work) => daemon.runSchedulerPass(work),
258+ setIntervalImpl: intervalScheduler.setInterval
259+ }
260+ );
261+
262+ timedJobs.registerRunner({
263+ name: "renewal.dispatcher",
264+ async run() {
265+ runCount += 1;
266+ return {
267+ result: "ok"
268+ };
269+ }
270+ });
271+
272+ try {
273+ await daemon.start();
274+ await timedJobs.start();
275+
276+ assert.equal(intervalScheduler.getActiveCount(), 1);
277+
278+ const tick = await timedJobs.runTick("manual");
279+ assert.equal(tick.decision, "skipped_not_leader");
280+ assert.equal(runCount, 0);
281+
282+ await timedJobs.stop();
283+ assert.equal(intervalScheduler.getActiveCount(), 0);
284+
285+ const entries = readJsonlEntries(logsDir);
286+ assert.ok(
287+ entries.find(
288+ (entry) =>
289+ entry.runner === "renewal.dispatcher"
290+ && entry.stage === "runner_skipped"
291+ && entry.result === "skipped_not_leader"
292+ )
293+ );
294+ } finally {
295+ await timedJobs.stop();
296+ rmSync(logsDir, {
297+ force: true,
298+ recursive: true
299+ });
300+ }
301+});
302+
303 test("createFetchControlApiClient unwraps control-api envelopes and sends bearer auth", async () => {
304 const observedRequests = [];
305 const client = createFetchControlApiClient(
306@@ -4967,6 +5242,54 @@ test("ConductorRuntime exposes a minimal runtime snapshot for CLI and status sur
307 });
308 });
309
310+test("ConductorRuntime initializes timed-jobs logging during startup", async () => {
311+ const stateDir = mkdtempSync(join(tmpdir(), "baa-conductor-runtime-timed-jobs-state-"));
312+ const logsDir = mkdtempSync(join(tmpdir(), "baa-conductor-runtime-timed-jobs-logs-"));
313+ const runtime = new ConductorRuntime(
314+ {
315+ nodeId: "mini-main",
316+ host: "mini",
317+ role: "primary",
318+ controlApiBase: "https://control.example.test",
319+ localApiBase: "http://127.0.0.1:0",
320+ sharedToken: "replace-me",
321+ paths: {
322+ logsDir,
323+ runsDir: "/tmp/runs",
324+ stateDir
325+ }
326+ },
327+ {
328+ autoStartLoops: false,
329+ now: () => 100
330+ }
331+ );
332+
333+ try {
334+ await runtime.start();
335+
336+ const timedJobsLogDir = join(logsDir, "timed-jobs");
337+ assert.equal(existsSync(timedJobsLogDir), true);
338+
339+ const entries = readJsonlEntries(timedJobsLogDir);
340+ assert.ok(
341+ entries.find(
342+ (entry) => entry.runner === "timed-jobs.framework" && entry.stage === "started"
343+ )
344+ );
345+ } finally {
346+ await runtime.stop();
347+ rmSync(logsDir, {
348+ force: true,
349+ recursive: true
350+ });
351+ rmSync(stateDir, {
352+ force: true,
353+ recursive: true
354+ });
355+ }
356+});
357+
358 function createStubRuntimeDaemon(overrides = {}) {
359 return {
360 start: async () => createStubRuntimeDaemonSnapshot(),
+120,
-1
1@@ -49,6 +49,7 @@ import {
2 ConductorLocalControlPlane,
3 resolveDefaultConductorStateDir
4 } from "./local-control-plane.js";
5+import { ConductorTimedJobs } from "./timed-jobs/index.js";
6
7 export type { ConductorHttpRequest, ConductorHttpResponse } from "./http-types.js";
8 export {
9@@ -71,6 +72,7 @@ export {
10 export { handleConductorHttpRequest } from "./local-api.js";
11 export * from "./artifacts/index.js";
12 export * from "./instructions/index.js";
13+export * from "./timed-jobs/index.js";
14
15 export type ConductorRole = "primary" | "standby";
16 export type ConductorLeadershipRole = "leader" | "standby";
17@@ -85,6 +87,10 @@ const DEFAULT_LEASE_RENEW_INTERVAL_MS = 5_000;
18 const DEFAULT_LEASE_TTL_SEC = 30;
19 const DEFAULT_RENEW_FAILURE_THRESHOLD = 2;
20 const DEFAULT_CODE_ROOT_DIR = "/Users/george/code/";
21+const DEFAULT_TIMED_JOBS_INTERVAL_MS = 10_000;
22+const DEFAULT_TIMED_JOBS_MAX_MESSAGES_PER_TICK = 10;
23+const DEFAULT_TIMED_JOBS_MAX_TASKS_PER_TICK = 10;
24+const DEFAULT_TIMED_JOBS_SETTLE_DELAY_MS = 10_000;
25
26 const STARTUP_CHECKLIST: StartupChecklistItem[] = [
27 { key: "register-controller", description: "注册 controller 并写入初始 heartbeat" },
28@@ -171,6 +177,10 @@ export interface ConductorRuntimeConfig extends ConductorConfig {
29 localApiBase?: string | null;
30 paths?: Partial<ConductorRuntimePaths>;
31 sharedToken?: string | null;
32+ timedJobsIntervalMs?: number;
33+ timedJobsMaxMessagesPerTick?: number;
34+ timedJobsMaxTasksPerTick?: number;
35+ timedJobsSettleDelayMs?: number;
36 }
37
38 export interface ResolvedConductorRuntimeConfig
39@@ -192,6 +202,10 @@ export interface ResolvedConductorRuntimeConfig
40 publicApiBase: string;
41 renewFailureThreshold: number;
42 sharedToken: string | null;
43+ timedJobsIntervalMs: number;
44+ timedJobsMaxMessagesPerTick: number;
45+ timedJobsMaxTasksPerTick: number;
46+ timedJobsSettleDelayMs: number;
47 version: string | null;
48 }
49
50@@ -389,6 +403,10 @@ interface CliValueOverrides {
51 sharedToken?: string;
52 stateDir?: string;
53 tmpDir?: string;
54+ timedJobsIntervalMs?: string;
55+ timedJobsMaxMessagesPerTick?: string;
56+ timedJobsMaxTasksPerTick?: string;
57+ timedJobsSettleDelayMs?: string;
58 version?: string;
59 worktreesDir?: string;
60 }
61@@ -1623,6 +1641,10 @@ function resolvePluginDiagnosticLogDir(logsDir: string | null): string | null {
62 return resolveLogSubdir(logsDir, "baa-plugin", "baa-plugin-log");
63 }
64
65+function resolveTimedJobsLogDir(logsDir: string | null): string | null {
66+ return resolveLogSubdir(logsDir, "timed-jobs", "timed-jobs-log");
67+}
68+
69 function resolveLogSubdir(
70 logsDir: string | null,
71 subdir: string,
72@@ -1694,6 +1716,13 @@ export function resolveConductorRuntimeConfig(
73 const renewFailureThreshold = config.renewFailureThreshold ?? DEFAULT_RENEW_FAILURE_THRESHOLD;
74 const artifactInlineThreshold = config.artifactInlineThreshold ?? DEFAULT_BAA_DELIVERY_INLINE_THRESHOLD;
75 const artifactSummaryLength = config.artifactSummaryLength ?? DEFAULT_SUMMARY_LENGTH;
76+ const timedJobsIntervalMs = config.timedJobsIntervalMs ?? DEFAULT_TIMED_JOBS_INTERVAL_MS;
77+ const timedJobsMaxMessagesPerTick =
78+ config.timedJobsMaxMessagesPerTick ?? DEFAULT_TIMED_JOBS_MAX_MESSAGES_PER_TICK;
79+ const timedJobsMaxTasksPerTick =
80+ config.timedJobsMaxTasksPerTick ?? DEFAULT_TIMED_JOBS_MAX_TASKS_PER_TICK;
81+ const timedJobsSettleDelayMs =
82+ config.timedJobsSettleDelayMs ?? DEFAULT_TIMED_JOBS_SETTLE_DELAY_MS;
83 const priority = config.priority ?? (config.role === "primary" ? 100 : 50);
84
85 if (heartbeatIntervalMs <= 0) {
86@@ -1720,6 +1749,22 @@ export function resolveConductorRuntimeConfig(
87 throw new Error("Conductor artifactSummaryLength must be a positive integer.");
88 }
89
90+ if (!Number.isInteger(timedJobsIntervalMs) || timedJobsIntervalMs <= 0) {
91+ throw new Error("Conductor timedJobsIntervalMs must be a positive integer.");
92+ }
93+
94+ if (!Number.isInteger(timedJobsMaxMessagesPerTick) || timedJobsMaxMessagesPerTick <= 0) {
95+ throw new Error("Conductor timedJobsMaxMessagesPerTick must be a positive integer.");
96+ }
97+
98+ if (!Number.isInteger(timedJobsMaxTasksPerTick) || timedJobsMaxTasksPerTick <= 0) {
99+ throw new Error("Conductor timedJobsMaxTasksPerTick must be a positive integer.");
100+ }
101+
102+ if (!Number.isInteger(timedJobsSettleDelayMs) || timedJobsSettleDelayMs < 0) {
103+ throw new Error("Conductor timedJobsSettleDelayMs must be a non-negative integer.");
104+ }
105+
106 return {
107 ...config,
108 artifactInlineThreshold,
109@@ -1742,6 +1787,10 @@ export function resolveConductorRuntimeConfig(
110 publicApiBase: normalizeBaseUrl(publicApiBase),
111 renewFailureThreshold,
112 sharedToken: normalizeOptionalString(config.sharedToken),
113+ timedJobsIntervalMs,
114+ timedJobsMaxMessagesPerTick,
115+ timedJobsMaxTasksPerTick,
116+ timedJobsSettleDelayMs,
117 version: normalizeOptionalString(config.version)
118 };
119 }
120@@ -1824,6 +1873,26 @@ function resolveRuntimeConfigFromSources(
121 overrides.renewFailureThreshold ?? env.BAA_CONDUCTOR_RENEW_FAILURE_THRESHOLD,
122 { minimum: 1 }
123 ),
124+ timedJobsIntervalMs: parseIntegerValue(
125+ "Conductor timed jobs interval",
126+ overrides.timedJobsIntervalMs ?? env.BAA_TIMED_JOBS_INTERVAL_MS,
127+ { minimum: 1 }
128+ ),
129+ timedJobsMaxMessagesPerTick: parseIntegerValue(
130+ "Conductor timed jobs max messages per tick",
131+ overrides.timedJobsMaxMessagesPerTick ?? env.BAA_TIMED_JOBS_MAX_MESSAGES_PER_TICK,
132+ { minimum: 1 }
133+ ),
134+ timedJobsMaxTasksPerTick: parseIntegerValue(
135+ "Conductor timed jobs max tasks per tick",
136+ overrides.timedJobsMaxTasksPerTick ?? env.BAA_TIMED_JOBS_MAX_TASKS_PER_TICK,
137+ { minimum: 1 }
138+ ),
139+ timedJobsSettleDelayMs: parseIntegerValue(
140+ "Conductor timed jobs settle delay",
141+ overrides.timedJobsSettleDelayMs ?? env.BAA_TIMED_JOBS_SETTLE_DELAY_MS,
142+ { minimum: 0 }
143+ ),
144 claudeCodedLocalApiBase: normalizeOptionalString(
145 overrides.claudeCodedLocalApiBase ?? env.BAA_CLAUDE_CODED_LOCAL_API_BASE
146 ),
147@@ -1954,6 +2023,22 @@ export function parseConductorCliRequest(
148 overrides.renewFailureThreshold = readOptionValue(tokens, token, index);
149 index += 1;
150 break;
151+ case "--timed-jobs-interval-ms":
152+ overrides.timedJobsIntervalMs = readOptionValue(tokens, token, index);
153+ index += 1;
154+ break;
155+ case "--timed-jobs-max-messages-per-tick":
156+ overrides.timedJobsMaxMessagesPerTick = readOptionValue(tokens, token, index);
157+ index += 1;
158+ break;
159+ case "--timed-jobs-max-tasks-per-tick":
160+ overrides.timedJobsMaxTasksPerTick = readOptionValue(tokens, token, index);
161+ index += 1;
162+ break;
163+ case "--timed-jobs-settle-delay-ms":
164+ overrides.timedJobsSettleDelayMs = readOptionValue(tokens, token, index);
165+ index += 1;
166+ break;
167 case "--runs-dir":
168 overrides.runsDir = readOptionValue(tokens, token, index);
169 index += 1;
170@@ -2075,6 +2160,10 @@ function formatConfigText(config: ResolvedConductorRuntimeConfig): string {
171 `lease_renew_interval_ms: ${config.leaseRenewIntervalMs}`,
172 `lease_ttl_sec: ${config.leaseTtlSec}`,
173 `renew_failure_threshold: ${config.renewFailureThreshold}`,
174+ `timed_jobs_interval_ms: ${config.timedJobsIntervalMs}`,
175+ `timed_jobs_max_messages_per_tick: ${config.timedJobsMaxMessagesPerTick}`,
176+ `timed_jobs_max_tasks_per_tick: ${config.timedJobsMaxTasksPerTick}`,
177+ `timed_jobs_settle_delay_ms: ${config.timedJobsSettleDelayMs}`,
178 `shared_token_configured: ${String(config.sharedToken != null)}`,
179 `runs_dir: ${config.paths.runsDir ?? "not-configured"}`,
180 `worktrees_dir: ${config.paths.worktreesDir ?? "not-configured"}`,
181@@ -2123,6 +2212,10 @@ function getUsageText(): string {
182 " --lease-renew-interval-ms <integer>",
183 " --lease-ttl-sec <integer>",
184 " --renew-failure-threshold <integer>",
185+ " --timed-jobs-interval-ms <integer>",
186+ " --timed-jobs-max-messages-per-tick <integer>",
187+ " --timed-jobs-max-tasks-per-tick <integer>",
188+ " --timed-jobs-settle-delay-ms <integer>",
189 " --runs-dir <path>",
190 " --worktrees-dir <path>",
191 " --logs-dir <path>",
192@@ -2152,6 +2245,10 @@ function getUsageText(): string {
193 " BAA_CONDUCTOR_LEASE_RENEW_INTERVAL_MS",
194 " BAA_CONDUCTOR_LEASE_TTL_SEC",
195 " BAA_CONDUCTOR_RENEW_FAILURE_THRESHOLD",
196+ " BAA_TIMED_JOBS_INTERVAL_MS",
197+ " BAA_TIMED_JOBS_MAX_MESSAGES_PER_TICK",
198+ " BAA_TIMED_JOBS_MAX_TASKS_PER_TICK",
199+ " BAA_TIMED_JOBS_SETTLE_DELAY_MS",
200 " BAA_RUNS_DIR",
201 " BAA_WORKTREES_DIR",
202 " BAA_LOGS_DIR",
203@@ -2173,6 +2270,7 @@ export class ConductorRuntime {
204 private readonly localApiServer: ConductorLocalHttpServer | null;
205 private localControlPlaneInitialized = false;
206 private readonly now: () => number;
207+ private readonly timedJobs: ConductorTimedJobs;
208 private started = false;
209
210 constructor(config: ConductorRuntimeConfig, options: ConductorRuntimeOptions = {}) {
211@@ -2202,6 +2300,23 @@ export class ConductorRuntime {
212 });
213 const ingestLogDir = resolveIngestLogDir(this.config.paths.logsDir);
214 const pluginDiagnosticLogDir = resolvePluginDiagnosticLogDir(this.config.paths.logsDir);
215+ const timedJobsLogDir = resolveTimedJobsLogDir(this.config.paths.logsDir);
216+ this.timedJobs = new ConductorTimedJobs(
217+ {
218+ intervalMs: this.config.timedJobsIntervalMs,
219+ maxMessagesPerTick: this.config.timedJobsMaxMessagesPerTick,
220+ maxTasksPerTick: this.config.timedJobsMaxTasksPerTick,
221+ settleDelayMs: this.config.timedJobsSettleDelayMs
222+ },
223+ {
224+ artifactStore: this.artifactStore,
225+ autoStart: options.autoStartLoops ?? true,
226+ clearIntervalImpl: options.clearIntervalImpl,
227+ logDir: timedJobsLogDir,
228+ schedule: (work) => this.daemon.runSchedulerPass(work),
229+ setIntervalImpl: options.setIntervalImpl
230+ }
231+ );
232 this.localApiServer =
233 this.config.localApiBase == null
234 ? null
235@@ -2245,12 +2360,15 @@ export class ConductorRuntime {
236 }
237
238 await this.daemon.start();
239- this.started = true;
240
241 try {
242 await this.localApiServer?.start();
243+ await this.timedJobs.start();
244+ this.started = true;
245 } catch (error) {
246 this.started = false;
247+ await this.timedJobs.stop();
248+ await this.localApiServer?.stop();
249 await this.daemon.stop();
250 throw error;
251 }
252@@ -2264,6 +2382,7 @@ export class ConductorRuntime {
253 async stop(): Promise<ConductorRuntimeSnapshot> {
254 this.started = false;
255 this.d1SyncWorker?.stop();
256+ await this.timedJobs.stop();
257 await this.daemon.stop();
258 await this.localApiServer?.stop();
259
1@@ -0,0 +1,15 @@
2+export {
3+ ConductorTimedJobs,
4+ type ConductorTimedJobsOptions,
5+ type TimedJobLogInput,
6+ type TimedJobRunner,
7+ type TimedJobRunnerResult,
8+ type TimedJobScheduleContext,
9+ type TimedJobTickContext,
10+ type TimedJobsConfig,
11+ type TimedJobsIntervalHandle,
12+ type TimedJobsSchedule,
13+ type TimedJobsTickDecision,
14+ type TimedJobsTickResult,
15+ type TimedJobsTickTrigger
16+} from "./runtime.js";
1@@ -0,0 +1,483 @@
2+import { appendFileSync } from "node:fs";
3+import { join } from "node:path";
4+
5+import type { ArtifactStore } from "../../../../packages/artifact-db/dist/index.js";
6+
7+export type TimedJobsIntervalHandle = ReturnType<typeof globalThis.setInterval>;
8+export type TimedJobsTickTrigger = "interval" | "manual";
9+export type TimedJobsTickDecision =
10+ | "failed"
11+ | "scheduled"
12+ | "skipped_busy"
13+ | "skipped_no_runners"
14+ | "skipped_not_leader";
15+
16+export interface TimedJobsConfig {
17+ intervalMs: number;
18+ maxMessagesPerTick: number;
19+ maxTasksPerTick: number;
20+ settleDelayMs: number;
21+}
22+
23+export interface TimedJobScheduleContext {
24+ controllerId: string;
25+ host: string;
26+ term: number;
27+}
28+
29+export interface TimedJobLogInput {
30+ details?: Record<string, unknown>;
31+ durationMs?: number | null;
32+ error?: unknown;
33+ result?: string | null;
34+ stage: string;
35+}
36+
37+export interface TimedJobRunnerResult {
38+ details?: Record<string, unknown>;
39+ result?: string;
40+}
41+
42+export interface TimedJobTickContext extends TimedJobScheduleContext {
43+ artifactStore: ArtifactStore | null;
44+ batchId: string;
45+ config: TimedJobsConfig;
46+ log: (input: TimedJobLogInput) => void;
47+ maxMessagesPerTick: number;
48+ maxTasksPerTick: number;
49+ settleDelayMs: number;
50+ trigger: TimedJobsTickTrigger;
51+}
52+
53+export interface TimedJobRunner {
54+ description?: string;
55+ name: string;
56+ run: (context: TimedJobTickContext) => Promise<TimedJobRunnerResult | void>;
57+}
58+
59+export interface TimedJobsTickResult {
60+ batchId: string;
61+ decision: TimedJobsTickDecision;
62+ runnerCount: number;
63+}
64+
65+export type TimedJobsSchedule = (
66+ work: (context: TimedJobScheduleContext) => Promise<void>
67+) => Promise<"scheduled" | "skipped_not_leader">;
68+
69+export interface ConductorTimedJobsOptions {
70+ artifactStore?: ArtifactStore | null;
71+ autoStart?: boolean;
72+ clearIntervalImpl?: (handle: TimedJobsIntervalHandle) => void;
73+ logDir?: string | null;
74+ schedule: TimedJobsSchedule;
75+ setIntervalImpl?: (
76+ handler: () => void,
77+ intervalMs: number
78+ ) => TimedJobsIntervalHandle;
79+}
80+
81+interface TimedJobsLogEntry {
82+ batch_id: string;
83+ duration_ms: number | null;
84+ error: string | null;
85+ result: string | null;
86+ runner: string;
87+ stage: string;
88+ ts: string;
89+ [key: string]: unknown;
90+}
91+
92+const FRAMEWORK_RUNNER_NAME = "timed-jobs.framework";
93+
94+export class ConductorTimedJobs {
95+ private readonly artifactStore: ArtifactStore | null;
96+ private readonly autoStart: boolean;
97+ private readonly clearIntervalImpl: (handle: TimedJobsIntervalHandle) => void;
98+ private readonly config: TimedJobsConfig;
99+ private inFlightTick: Promise<TimedJobsTickResult> | null = null;
100+ private intervalHandle: TimedJobsIntervalHandle | null = null;
101+ private readonly logDir: string | null;
102+ private readonly runners = new Map<string, TimedJobRunner>();
103+ private readonly schedule: TimedJobsSchedule;
104+ private readonly setIntervalImpl: (
105+ handler: () => void,
106+ intervalMs: number
107+ ) => TimedJobsIntervalHandle;
108+ private started = false;
109+ private batchSequence = 0;
110+
111+ constructor(config: TimedJobsConfig, options: ConductorTimedJobsOptions) {
112+ this.artifactStore = options.artifactStore ?? null;
113+ this.autoStart = options.autoStart ?? true;
114+ this.clearIntervalImpl =
115+ options.clearIntervalImpl ?? ((handle) => globalThis.clearInterval(handle));
116+ this.config = {
117+ intervalMs: config.intervalMs,
118+ maxMessagesPerTick: config.maxMessagesPerTick,
119+ maxTasksPerTick: config.maxTasksPerTick,
120+ settleDelayMs: config.settleDelayMs
121+ };
122+ this.logDir = options.logDir ?? null;
123+ this.schedule = options.schedule;
124+ this.setIntervalImpl =
125+ options.setIntervalImpl ?? ((handler, intervalMs) => globalThis.setInterval(handler, intervalMs));
126+ }
127+
128+ registerRunner(runner: TimedJobRunner): void {
129+ const name = normalizeRunnerName(runner.name);
130+
131+ if (!name) {
132+ throw new Error("Timed job runner name must be non-empty.");
133+ }
134+
135+ if (this.runners.has(name)) {
136+ throw new Error(`Timed job runner "${name}" is already registered.`);
137+ }
138+
139+ this.runners.set(name, {
140+ ...runner,
141+ name
142+ });
143+ }
144+
145+ getConfig(): TimedJobsConfig {
146+ return { ...this.config };
147+ }
148+
149+ getRegisteredRunnerNames(): string[] {
150+ return [...this.runners.keys()];
151+ }
152+
153+ isStarted(): boolean {
154+ return this.started;
155+ }
156+
157+ async start(): Promise<void> {
158+ if (this.started) {
159+ return;
160+ }
161+
162+ this.started = true;
163+ this.writeFrameworkLog({
164+ stage: "started",
165+ result: this.autoStart ? "loop_enabled" : "loop_disabled"
166+ });
167+
168+ if (!this.autoStart) {
169+ return;
170+ }
171+
172+ this.intervalHandle = this.setIntervalImpl(() => {
173+ void this.runTick("interval");
174+ }, this.config.intervalMs);
175+ }
176+
177+ async stop(): Promise<void> {
178+ if (!this.started && this.intervalHandle == null && this.inFlightTick == null) {
179+ return;
180+ }
181+
182+ if (this.intervalHandle != null) {
183+ this.clearIntervalImpl(this.intervalHandle);
184+ this.intervalHandle = null;
185+ }
186+
187+ const pendingTick = this.inFlightTick;
188+ this.started = false;
189+
190+ if (pendingTick != null) {
191+ try {
192+ await pendingTick;
193+ } catch {
194+ // best-effort drain; runner failures are already logged
195+ }
196+ }
197+
198+ this.writeFrameworkLog({
199+ stage: "stopped",
200+ result: "stopped"
201+ });
202+ }
203+
204+ async runTick(trigger: TimedJobsTickTrigger = "manual"): Promise<TimedJobsTickResult> {
205+ if (this.inFlightTick != null) {
206+ const batchId = this.buildBatchId();
207+ const runnerCount = this.runners.size;
208+
209+ this.writeFrameworkLog({
210+ batchId,
211+ details: {
212+ trigger
213+ },
214+ result: "skipped_busy",
215+ stage: "tick_skipped_busy"
216+ });
217+
218+ return {
219+ batchId,
220+ decision: "skipped_busy",
221+ runnerCount
222+ };
223+ }
224+
225+ const tickPromise = this.executeTick(trigger);
226+ this.inFlightTick = tickPromise;
227+
228+ try {
229+ return await tickPromise;
230+ } finally {
231+ if (this.inFlightTick === tickPromise) {
232+ this.inFlightTick = null;
233+ }
234+ }
235+ }
236+
237+ private async executeTick(trigger: TimedJobsTickTrigger): Promise<TimedJobsTickResult> {
238+ const batchId = this.buildBatchId();
239+ const runners = [...this.runners.values()];
240+
241+ if (runners.length === 0) {
242+ this.writeFrameworkLog({
243+ batchId,
244+ details: {
245+ trigger
246+ },
247+ result: "skipped_no_runners",
248+ stage: "tick_skipped_no_runners"
249+ });
250+
251+ return {
252+ batchId,
253+ decision: "skipped_no_runners",
254+ runnerCount: 0
255+ };
256+ }
257+
258+ const tickStartedAt = Date.now();
259+
260+ this.writeFrameworkLog({
261+ batchId,
262+ details: {
263+ runner_count: runners.length,
264+ trigger
265+ },
266+ result: "running",
267+ stage: "tick_started"
268+ });
269+
270+ let runnerFailures = 0;
271+
272+ try {
273+ const decision = await this.schedule(async (scheduleContext) => {
274+ for (const runner of runners) {
275+ const ok = await this.runRunner(runner, batchId, trigger, scheduleContext);
276+
277+ if (!ok) {
278+ runnerFailures += 1;
279+ }
280+ }
281+ });
282+
283+ if (decision === "skipped_not_leader") {
284+ for (const runner of runners) {
285+ this.writeRunnerLog(runner.name, {
286+ batchId,
287+ details: {
288+ trigger
289+ },
290+ durationMs: 0,
291+ result: "skipped_not_leader",
292+ stage: "runner_skipped"
293+ });
294+ }
295+
296+ this.writeFrameworkLog({
297+ batchId,
298+ details: {
299+ runner_count: runners.length,
300+ trigger
301+ },
302+ durationMs: Date.now() - tickStartedAt,
303+ result: "skipped_not_leader",
304+ stage: "tick_completed"
305+ });
306+
307+ return {
308+ batchId,
309+ decision,
310+ runnerCount: runners.length
311+ };
312+ }
313+
314+ this.writeFrameworkLog({
315+ batchId,
316+ details: {
317+ failed_runner_count: runnerFailures,
318+ runner_count: runners.length,
319+ successful_runner_count: runners.length - runnerFailures,
320+ trigger
321+ },
322+ durationMs: Date.now() - tickStartedAt,
323+ result: runnerFailures > 0 ? "completed_with_failures" : "scheduled",
324+ stage: "tick_completed"
325+ });
326+
327+ return {
328+ batchId,
329+ decision,
330+ runnerCount: runners.length
331+ };
332+ } catch (error) {
333+ this.writeFrameworkLog({
334+ batchId,
335+ details: {
336+ runner_count: runners.length,
337+ trigger
338+ },
339+ durationMs: Date.now() - tickStartedAt,
340+ error,
341+ result: "failed",
342+ stage: "tick_failed"
343+ });
344+
345+ return {
346+ batchId,
347+ decision: "failed",
348+ runnerCount: runners.length
349+ };
350+ }
351+ }
352+
353+ private async runRunner(
354+ runner: TimedJobRunner,
355+ batchId: string,
356+ trigger: TimedJobsTickTrigger,
357+ scheduleContext: TimedJobScheduleContext
358+ ): Promise<boolean> {
359+ const startedAt = Date.now();
360+
361+ this.writeRunnerLog(runner.name, {
362+ batchId,
363+ details: {
364+ controller_id: scheduleContext.controllerId,
365+ host: scheduleContext.host,
366+ max_messages_per_tick: this.config.maxMessagesPerTick,
367+ max_tasks_per_tick: this.config.maxTasksPerTick,
368+ settle_delay_ms: this.config.settleDelayMs,
369+ term: scheduleContext.term,
370+ trigger
371+ },
372+ result: "running",
373+ stage: "runner_started"
374+ });
375+
376+ try {
377+ const result = await runner.run({
378+ ...scheduleContext,
379+ artifactStore: this.artifactStore,
380+ batchId,
381+ config: this.getConfig(),
382+ log: (input) => {
383+ this.writeRunnerLog(runner.name, {
384+ ...input,
385+ batchId
386+ });
387+ },
388+ maxMessagesPerTick: this.config.maxMessagesPerTick,
389+ maxTasksPerTick: this.config.maxTasksPerTick,
390+ settleDelayMs: this.config.settleDelayMs,
391+ trigger
392+ });
393+
394+ this.writeRunnerLog(runner.name, {
395+ batchId,
396+ details: result?.details,
397+ durationMs: Date.now() - startedAt,
398+ result: result?.result ?? "ok",
399+ stage: "runner_completed"
400+ });
401+
402+ return true;
403+ } catch (error) {
404+ this.writeRunnerLog(runner.name, {
405+ batchId,
406+ durationMs: Date.now() - startedAt,
407+ error,
408+ result: "failed",
409+ stage: "runner_failed"
410+ });
411+
412+ return false;
413+ }
414+ }
415+
416+ private buildBatchId(): string {
417+ this.batchSequence += 1;
418+ return `timed-jobs-${Date.now()}-${this.batchSequence}`;
419+ }
420+
421+ private writeFrameworkLog(
422+ input: Omit<TimedJobLogInput, "stage"> & { batchId?: string; stage: string }
423+ ): void {
424+ this.writeLogEntry(FRAMEWORK_RUNNER_NAME, input.batchId ?? this.buildBatchId(), input);
425+ }
426+
427+ private writeRunnerLog(
428+ runner: string,
429+ input: Omit<TimedJobLogInput, "stage"> & { batchId: string; stage: string }
430+ ): void {
431+ this.writeLogEntry(runner, input.batchId, input);
432+ }
433+
434+ private writeLogEntry(
435+ runner: string,
436+ batchId: string,
437+ input: TimedJobLogInput
438+ ): void {
439+ if (this.logDir == null) {
440+ return;
441+ }
442+
443+ const entry: TimedJobsLogEntry = {
444+ batch_id: batchId,
445+ duration_ms: input.durationMs ?? null,
446+ error: normalizeErrorMessage(input.error),
447+ result: input.result ?? null,
448+ runner,
449+ stage: input.stage,
450+ ts: new Date().toISOString()
451+ };
452+
453+ if (input.details) {
454+ for (const [key, value] of Object.entries(input.details)) {
455+ entry[key] = value;
456+ }
457+ }
458+
459+ try {
460+ const date = entry.ts.slice(0, 10);
461+ const filePath = join(this.logDir, `${date}.jsonl`);
462+ appendFileSync(filePath, `${JSON.stringify(entry)}\n`);
463+ } catch (error) {
464+ console.error(`[timed-jobs-log] write failed: ${String(error)}`);
465+ }
466+ }
467+}
468+
469+function normalizeErrorMessage(error: unknown): string | null {
470+ if (error == null) {
471+ return null;
472+ }
473+
474+ if (error instanceof Error && typeof error.message === "string" && error.message.length > 0) {
475+ return error.message;
476+ }
477+
478+ const text = String(error);
479+ return text === "" ? null : text;
480+}
481+
482+function normalizeRunnerName(name: string): string {
483+ return String(name ?? "").trim();
484+}
+19,
-6
1@@ -2,7 +2,7 @@
2
3 ## 状态
4
5-- 当前状态:`待开始`
6+- 当前状态:`已完成`
7 - 规模预估:`M`
8 - 依赖任务:`T-S055`
9 - 建议执行者:`Codex`
10@@ -149,22 +149,35 @@
11
12 ### 开始执行
13
14-- 执行者:
15-- 开始时间:
16+- 执行者:`Codex`
17+- 开始时间:`2026-03-30 15:39:12 CST`
18 - 状态变更:`待开始` → `进行中`
19
20 ### 完成摘要
21
22-- 完成时间:
23+- 完成时间:`2026-03-30 15:55:05 CST`
24 - 状态变更:`进行中` → `已完成`
25 - 修改了哪些文件:
26+ - `apps/conductor-daemon/src/timed-jobs/runtime.ts`
27+ - `apps/conductor-daemon/src/timed-jobs/index.ts`
28+ - `apps/conductor-daemon/src/index.ts`
29+ - `apps/conductor-daemon/src/index.test.js`
30+ - `tasks/T-S057.md`
31 - 核心实现思路:
32+ - 新建 `timed-jobs/` 独立模块,提供 runner 注册、leader-only 调度门禁、统一 tick context、start/stop 生命周期和防重入 tick 执行
33+ - 为 timed-jobs 增加可配置的扫描周期、消息/任务批量限制、settle delay,并接入 `conductor` 现有 CLI/env 配置解析与日志目录初始化
34+ - 增加外部 JSONL 日志写入能力,统一记录 framework/runner 的 batch、阶段、结果、耗时和错误,日志失败只做 stderr 告警不影响主流程
35+ - 在 `ConductorRuntime` 中做最小接线,启动时创建 timed-jobs 日志目录并初始化模块,关闭时先停 timed-jobs 再停 daemon/local API,避免定时器泄漏
36+ - 用测试覆盖 leader 执行、standby 跳过、日志落盘、定时器清理、CLI 配置解析和 runtime 启动初始化
37 - 跑了哪些测试:
38+ - `cd /Users/george/code/baa-conductor-timed-jobs-framework && pnpm install`
39+ - `cd /Users/george/code/baa-conductor-timed-jobs-framework && pnpm -C apps/conductor-daemon test`
40+ - `cd /Users/george/code/baa-conductor-timed-jobs-framework && pnpm build`
41
42 ### 执行过程中遇到的问题
43
44--
45+- 新 worktree 初始没有安装依赖,第一次 `pnpm -C apps/conductor-daemon test` 失败于 `pnpm exec tsc` 找不到;补跑一次 `pnpm install` 后恢复正常构建和测试流程
46
47 ### 剩余风险
48
49--
50+- 当前框架只提供基础调度和共享日志能力,还没有默认挂载 projector / dispatcher runner;后续 `T-S058` / `T-S059` 需要在这个模块里注册具体业务 runner