codex@macbookpro
·
2026-04-01
runtime.ts
1import { writeFile } from "node:fs/promises";
2import { join } from "node:path";
3
4import type { ArtifactStore } from "../../../../packages/artifact-db/dist/index.js";
5
6export type TimedJobsIntervalHandle = ReturnType<typeof globalThis.setInterval>;
7export type TimedJobsTickTrigger = "interval" | "manual";
8export type TimedJobsTickDecision =
9 | "failed"
10 | "scheduled"
11 | "skipped_busy"
12 | "skipped_no_runners"
13 | "skipped_not_leader"
14 | "skipped_system_paused";
15
16export interface TimedJobsConfig {
17 intervalMs: number;
18 maxMessagesPerTick: number;
19 maxTasksPerTick: number;
20 settleDelayMs: number;
21}
22
23export interface TimedJobScheduleContext {
24 controllerId: string;
25 host: string;
26 term: number;
27}
28
29export interface TimedJobLogInput {
30 details?: Record<string, unknown>;
31 durationMs?: number | null;
32 error?: unknown;
33 result?: string | null;
34 stage: string;
35}
36
37export interface TimedJobRunnerResult {
38 details?: Record<string, unknown>;
39 result?: string;
40}
41
42export interface TimedJobTickContext extends TimedJobScheduleContext {
43 artifactStore: ArtifactStore | null;
44 batchId: string;
45 config: TimedJobsConfig;
46 logDir: string | null;
47 log: (input: TimedJobLogInput) => void;
48 maxMessagesPerTick: number;
49 maxTasksPerTick: number;
50 settleDelayMs: number;
51 trigger: TimedJobsTickTrigger;
52}
53
54export interface TimedJobRunner {
55 description?: string;
56 name: string;
57 run: (context: TimedJobTickContext) => Promise<TimedJobRunnerResult | void>;
58}
59
60export interface TimedJobsTickResult {
61 batchId: string;
62 decision: TimedJobsTickDecision;
63 runnerCount: number;
64}
65
66export type TimedJobsSchedule = (
67 work: (context: TimedJobScheduleContext) => Promise<void>
68) => Promise<"scheduled" | "skipped_not_leader" | "skipped_system_paused">;
69
70type TimedJobsAppendFile = (filePath: string, data: string) => Promise<void>;
71
72export interface ConductorTimedJobsOptions {
73 appendFileImpl?: TimedJobsAppendFile;
74 artifactStore?: ArtifactStore | null;
75 autoStart?: boolean;
76 clearIntervalImpl?: (handle: TimedJobsIntervalHandle) => void;
77 logDir?: string | null;
78 schedule: TimedJobsSchedule;
79 setIntervalImpl?: (
80 handler: () => void,
81 intervalMs: number
82 ) => TimedJobsIntervalHandle;
83}
84
85interface TimedJobsLogEntry {
86 batch_id: string;
87 duration_ms: number | null;
88 error: string | null;
89 result: string | null;
90 runner: string;
91 stage: string;
92 ts: string;
93 [key: string]: unknown;
94}
95
96const FRAMEWORK_RUNNER_NAME = "timed-jobs.framework";
97
98export class ConductorTimedJobs {
99 private readonly appendFileImpl: TimedJobsAppendFile;
100 private readonly artifactStore: ArtifactStore | null;
101 private readonly autoStart: boolean;
102 private readonly clearIntervalImpl: (handle: TimedJobsIntervalHandle) => void;
103 private readonly config: TimedJobsConfig;
104 private inFlightTick: Promise<TimedJobsTickResult> | null = null;
105 private intervalHandle: TimedJobsIntervalHandle | null = null;
106 private readonly logDir: string | null;
107 private pendingLogWrite: Promise<void> = Promise.resolve();
108 private readonly runners = new Map<string, TimedJobRunner>();
109 private readonly schedule: TimedJobsSchedule;
110 private readonly setIntervalImpl: (
111 handler: () => void,
112 intervalMs: number
113 ) => TimedJobsIntervalHandle;
114 private started = false;
115 private batchSequence = 0;
116
117 constructor(config: TimedJobsConfig, options: ConductorTimedJobsOptions) {
118 this.appendFileImpl =
119 options.appendFileImpl ?? ((filePath, data) => writeFile(filePath, data, { flag: "a" }));
120 this.artifactStore = options.artifactStore ?? null;
121 this.autoStart = options.autoStart ?? true;
122 this.clearIntervalImpl =
123 options.clearIntervalImpl ?? ((handle) => globalThis.clearInterval(handle));
124 this.config = {
125 intervalMs: config.intervalMs,
126 maxMessagesPerTick: config.maxMessagesPerTick,
127 maxTasksPerTick: config.maxTasksPerTick,
128 settleDelayMs: config.settleDelayMs
129 };
130 this.logDir = options.logDir ?? null;
131 this.schedule = options.schedule;
132 this.setIntervalImpl =
133 options.setIntervalImpl ?? ((handler, intervalMs) => globalThis.setInterval(handler, intervalMs));
134 }
135
136 registerRunner(runner: TimedJobRunner): void {
137 const name = normalizeRunnerName(runner.name);
138
139 if (!name) {
140 throw new Error("Timed job runner name must be non-empty.");
141 }
142
143 if (this.runners.has(name)) {
144 throw new Error(`Timed job runner "${name}" is already registered.`);
145 }
146
147 this.runners.set(name, {
148 ...runner,
149 name
150 });
151 }
152
153 getConfig(): TimedJobsConfig {
154 return { ...this.config };
155 }
156
157 getRegisteredRunnerNames(): string[] {
158 return [...this.runners.keys()];
159 }
160
161 isStarted(): boolean {
162 return this.started;
163 }
164
165 async start(): Promise<void> {
166 if (this.started) {
167 return;
168 }
169
170 this.started = true;
171 this.writeFrameworkLog({
172 stage: "started",
173 result: this.autoStart ? "loop_enabled" : "loop_disabled"
174 });
175
176 if (!this.autoStart) {
177 return;
178 }
179
180 this.intervalHandle = this.setIntervalImpl(() => {
181 void this.runTick("interval");
182 }, this.config.intervalMs);
183 }
184
185 async stop(): Promise<void> {
186 if (!this.started && this.intervalHandle == null && this.inFlightTick == null) {
187 return;
188 }
189
190 if (this.intervalHandle != null) {
191 this.clearIntervalImpl(this.intervalHandle);
192 this.intervalHandle = null;
193 }
194
195 const pendingTick = this.inFlightTick;
196 this.started = false;
197
198 if (pendingTick != null) {
199 try {
200 await pendingTick;
201 } catch {
202 // best-effort drain; runner failures are already logged
203 }
204 }
205
206 this.writeFrameworkLog({
207 stage: "stopped",
208 result: "stopped"
209 });
210 await this.flushLogWrites();
211 }
212
213 async runTick(trigger: TimedJobsTickTrigger = "manual"): Promise<TimedJobsTickResult> {
214 if (this.inFlightTick != null) {
215 const batchId = this.buildBatchId();
216 const runnerCount = this.runners.size;
217
218 this.writeFrameworkLog({
219 batchId,
220 details: {
221 trigger
222 },
223 result: "skipped_busy",
224 stage: "tick_skipped_busy"
225 });
226
227 return {
228 batchId,
229 decision: "skipped_busy",
230 runnerCount
231 };
232 }
233
234 const tickPromise = this.executeTick(trigger);
235 this.inFlightTick = tickPromise;
236
237 try {
238 return await tickPromise;
239 } finally {
240 if (this.inFlightTick === tickPromise) {
241 this.inFlightTick = null;
242 }
243 }
244 }
245
246 private async executeTick(trigger: TimedJobsTickTrigger): Promise<TimedJobsTickResult> {
247 const batchId = this.buildBatchId();
248 const runners = [...this.runners.values()];
249
250 if (runners.length === 0) {
251 this.writeFrameworkLog({
252 batchId,
253 details: {
254 trigger
255 },
256 result: "skipped_no_runners",
257 stage: "tick_skipped_no_runners"
258 });
259
260 return {
261 batchId,
262 decision: "skipped_no_runners",
263 runnerCount: 0
264 };
265 }
266
267 const tickStartedAt = Date.now();
268
269 this.writeFrameworkLog({
270 batchId,
271 details: {
272 runner_count: runners.length,
273 trigger
274 },
275 result: "running",
276 stage: "tick_started"
277 });
278
279 let runnerFailures = 0;
280
281 try {
282 const decision = await this.schedule(async (scheduleContext) => {
283 for (const runner of runners) {
284 const ok = await this.runRunner(runner, batchId, trigger, scheduleContext);
285
286 if (!ok) {
287 runnerFailures += 1;
288 }
289 }
290 });
291
292 if (decision === "skipped_not_leader" || decision === "skipped_system_paused") {
293 for (const runner of runners) {
294 this.writeRunnerLog(runner.name, {
295 batchId,
296 details: {
297 trigger
298 },
299 durationMs: 0,
300 result: decision,
301 stage: "runner_skipped"
302 });
303 }
304
305 this.writeFrameworkLog({
306 batchId,
307 details: {
308 runner_count: runners.length,
309 trigger
310 },
311 durationMs: Date.now() - tickStartedAt,
312 result: decision,
313 stage: "tick_completed"
314 });
315
316 return {
317 batchId,
318 decision,
319 runnerCount: runners.length
320 };
321 }
322
323 this.writeFrameworkLog({
324 batchId,
325 details: {
326 failed_runner_count: runnerFailures,
327 runner_count: runners.length,
328 successful_runner_count: runners.length - runnerFailures,
329 trigger
330 },
331 durationMs: Date.now() - tickStartedAt,
332 result: runnerFailures > 0 ? "completed_with_failures" : "scheduled",
333 stage: "tick_completed"
334 });
335
336 return {
337 batchId,
338 decision,
339 runnerCount: runners.length
340 };
341 } catch (error) {
342 this.writeFrameworkLog({
343 batchId,
344 details: {
345 runner_count: runners.length,
346 trigger
347 },
348 durationMs: Date.now() - tickStartedAt,
349 error,
350 result: "failed",
351 stage: "tick_failed"
352 });
353
354 return {
355 batchId,
356 decision: "failed",
357 runnerCount: runners.length
358 };
359 }
360 }
361
362 private async runRunner(
363 runner: TimedJobRunner,
364 batchId: string,
365 trigger: TimedJobsTickTrigger,
366 scheduleContext: TimedJobScheduleContext
367 ): Promise<boolean> {
368 const startedAt = Date.now();
369
370 this.writeRunnerLog(runner.name, {
371 batchId,
372 details: {
373 controller_id: scheduleContext.controllerId,
374 host: scheduleContext.host,
375 max_messages_per_tick: this.config.maxMessagesPerTick,
376 max_tasks_per_tick: this.config.maxTasksPerTick,
377 settle_delay_ms: this.config.settleDelayMs,
378 term: scheduleContext.term,
379 trigger
380 },
381 result: "running",
382 stage: "runner_started"
383 });
384
385 try {
386 const result = await runner.run({
387 ...scheduleContext,
388 artifactStore: this.artifactStore,
389 batchId,
390 config: this.getConfig(),
391 logDir: this.logDir,
392 log: (input) => {
393 this.writeRunnerLog(runner.name, {
394 ...input,
395 batchId
396 });
397 },
398 maxMessagesPerTick: this.config.maxMessagesPerTick,
399 maxTasksPerTick: this.config.maxTasksPerTick,
400 settleDelayMs: this.config.settleDelayMs,
401 trigger
402 });
403
404 this.writeRunnerLog(runner.name, {
405 batchId,
406 details: result?.details,
407 durationMs: Date.now() - startedAt,
408 result: result?.result ?? "ok",
409 stage: "runner_completed"
410 });
411
412 return true;
413 } catch (error) {
414 this.writeRunnerLog(runner.name, {
415 batchId,
416 durationMs: Date.now() - startedAt,
417 error,
418 result: "failed",
419 stage: "runner_failed"
420 });
421
422 return false;
423 }
424 }
425
426 private buildBatchId(): string {
427 this.batchSequence += 1;
428 return `timed-jobs-${Date.now()}-${this.batchSequence}`;
429 }
430
431 private writeFrameworkLog(
432 input: Omit<TimedJobLogInput, "stage"> & { batchId?: string; stage: string }
433 ): void {
434 this.writeLogEntry(FRAMEWORK_RUNNER_NAME, input.batchId ?? this.buildBatchId(), input);
435 }
436
437 private writeRunnerLog(
438 runner: string,
439 input: Omit<TimedJobLogInput, "stage"> & { batchId: string; stage: string }
440 ): void {
441 this.writeLogEntry(runner, input.batchId, input);
442 }
443
444 private async flushLogWrites(): Promise<void> {
445 try {
446 await this.pendingLogWrite;
447 } catch {
448 // individual write failures are already reported
449 }
450 }
451
452 private queueLogWrite(filePath: string, data: string): void {
453 // Keep JSONL entries ordered without blocking tick execution on filesystem IO.
454 this.pendingLogWrite = this.pendingLogWrite
455 .catch(() => {})
456 .then(() => this.appendFileImpl(filePath, data))
457 .catch((error) => {
458 console.error(`[timed-jobs-log] write failed: ${String(error)}`);
459 });
460 }
461
462 private writeLogEntry(
463 runner: string,
464 batchId: string,
465 input: TimedJobLogInput
466 ): void {
467 if (this.logDir == null) {
468 return;
469 }
470
471 const entry: TimedJobsLogEntry = {
472 batch_id: batchId,
473 duration_ms: input.durationMs ?? null,
474 error: normalizeErrorMessage(input.error),
475 result: input.result ?? null,
476 runner,
477 stage: input.stage,
478 ts: new Date().toISOString()
479 };
480
481 if (input.details) {
482 for (const [key, value] of Object.entries(input.details)) {
483 entry[key] = value;
484 }
485 }
486
487 const date = entry.ts.slice(0, 10);
488 const filePath = join(this.logDir, `${date}.jsonl`);
489 this.queueLogWrite(filePath, `${JSON.stringify(entry)}\n`);
490 }
491}
492
493function normalizeErrorMessage(error: unknown): string | null {
494 if (error == null) {
495 return null;
496 }
497
498 if (error instanceof Error && typeof error.message === "string" && error.message.length > 0) {
499 return error.message;
500 }
501
502 const text = String(error);
503 return text === "" ? null : text;
504}
505
506function normalizeRunnerName(name: string): string {
507 return String(name ?? "").trim();
508}