baa-conductor


baa-conductor / apps / conductor-daemon / src / timed-jobs
codex@macbookpro  ·  2026-04-01

runtime.ts

  1import { writeFile } from "node:fs/promises";
  2import { join } from "node:path";
  3
  4import type { ArtifactStore } from "../../../../packages/artifact-db/dist/index.js";
  5
  6export type TimedJobsIntervalHandle = ReturnType<typeof globalThis.setInterval>;
  7export type TimedJobsTickTrigger = "interval" | "manual";
  8export type TimedJobsTickDecision =
  9  | "failed"
 10  | "scheduled"
 11  | "skipped_busy"
 12  | "skipped_no_runners"
 13  | "skipped_not_leader"
 14  | "skipped_system_paused";
 15
 16export interface TimedJobsConfig {
 17  intervalMs: number;
 18  maxMessagesPerTick: number;
 19  maxTasksPerTick: number;
 20  settleDelayMs: number;
 21}
 22
 23export interface TimedJobScheduleContext {
 24  controllerId: string;
 25  host: string;
 26  term: number;
 27}
 28
 29export interface TimedJobLogInput {
 30  details?: Record<string, unknown>;
 31  durationMs?: number | null;
 32  error?: unknown;
 33  result?: string | null;
 34  stage: string;
 35}
 36
 37export interface TimedJobRunnerResult {
 38  details?: Record<string, unknown>;
 39  result?: string;
 40}
 41
 42export interface TimedJobTickContext extends TimedJobScheduleContext {
 43  artifactStore: ArtifactStore | null;
 44  batchId: string;
 45  config: TimedJobsConfig;
 46  logDir: string | null;
 47  log: (input: TimedJobLogInput) => void;
 48  maxMessagesPerTick: number;
 49  maxTasksPerTick: number;
 50  settleDelayMs: number;
 51  trigger: TimedJobsTickTrigger;
 52}
 53
 54export interface TimedJobRunner {
 55  description?: string;
 56  name: string;
 57  run: (context: TimedJobTickContext) => Promise<TimedJobRunnerResult | void>;
 58}
 59
 60export interface TimedJobsTickResult {
 61  batchId: string;
 62  decision: TimedJobsTickDecision;
 63  runnerCount: number;
 64}
 65
 66export type TimedJobsSchedule = (
 67  work: (context: TimedJobScheduleContext) => Promise<void>
 68) => Promise<"scheduled" | "skipped_not_leader" | "skipped_system_paused">;
 69
 70type TimedJobsAppendFile = (filePath: string, data: string) => Promise<void>;
 71
 72export interface ConductorTimedJobsOptions {
 73  appendFileImpl?: TimedJobsAppendFile;
 74  artifactStore?: ArtifactStore | null;
 75  autoStart?: boolean;
 76  clearIntervalImpl?: (handle: TimedJobsIntervalHandle) => void;
 77  logDir?: string | null;
 78  schedule: TimedJobsSchedule;
 79  setIntervalImpl?: (
 80    handler: () => void,
 81    intervalMs: number
 82  ) => TimedJobsIntervalHandle;
 83}
 84
 85interface TimedJobsLogEntry {
 86  batch_id: string;
 87  duration_ms: number | null;
 88  error: string | null;
 89  result: string | null;
 90  runner: string;
 91  stage: string;
 92  ts: string;
 93  [key: string]: unknown;
 94}
 95
 96const FRAMEWORK_RUNNER_NAME = "timed-jobs.framework";
 97
 98export class ConductorTimedJobs {
 99  private readonly appendFileImpl: TimedJobsAppendFile;
100  private readonly artifactStore: ArtifactStore | null;
101  private readonly autoStart: boolean;
102  private readonly clearIntervalImpl: (handle: TimedJobsIntervalHandle) => void;
103  private readonly config: TimedJobsConfig;
104  private inFlightTick: Promise<TimedJobsTickResult> | null = null;
105  private intervalHandle: TimedJobsIntervalHandle | null = null;
106  private readonly logDir: string | null;
107  private pendingLogWrite: Promise<void> = Promise.resolve();
108  private readonly runners = new Map<string, TimedJobRunner>();
109  private readonly schedule: TimedJobsSchedule;
110  private readonly setIntervalImpl: (
111    handler: () => void,
112    intervalMs: number
113  ) => TimedJobsIntervalHandle;
114  private started = false;
115  private batchSequence = 0;
116
117  constructor(config: TimedJobsConfig, options: ConductorTimedJobsOptions) {
118    this.appendFileImpl =
119      options.appendFileImpl ?? ((filePath, data) => writeFile(filePath, data, { flag: "a" }));
120    this.artifactStore = options.artifactStore ?? null;
121    this.autoStart = options.autoStart ?? true;
122    this.clearIntervalImpl =
123      options.clearIntervalImpl ?? ((handle) => globalThis.clearInterval(handle));
124    this.config = {
125      intervalMs: config.intervalMs,
126      maxMessagesPerTick: config.maxMessagesPerTick,
127      maxTasksPerTick: config.maxTasksPerTick,
128      settleDelayMs: config.settleDelayMs
129    };
130    this.logDir = options.logDir ?? null;
131    this.schedule = options.schedule;
132    this.setIntervalImpl =
133      options.setIntervalImpl ?? ((handler, intervalMs) => globalThis.setInterval(handler, intervalMs));
134  }
135
136  registerRunner(runner: TimedJobRunner): void {
137    const name = normalizeRunnerName(runner.name);
138
139    if (!name) {
140      throw new Error("Timed job runner name must be non-empty.");
141    }
142
143    if (this.runners.has(name)) {
144      throw new Error(`Timed job runner "${name}" is already registered.`);
145    }
146
147    this.runners.set(name, {
148      ...runner,
149      name
150    });
151  }
152
153  getConfig(): TimedJobsConfig {
154    return { ...this.config };
155  }
156
157  getRegisteredRunnerNames(): string[] {
158    return [...this.runners.keys()];
159  }
160
161  isStarted(): boolean {
162    return this.started;
163  }
164
165  async start(): Promise<void> {
166    if (this.started) {
167      return;
168    }
169
170    this.started = true;
171    this.writeFrameworkLog({
172      stage: "started",
173      result: this.autoStart ? "loop_enabled" : "loop_disabled"
174    });
175
176    if (!this.autoStart) {
177      return;
178    }
179
180    this.intervalHandle = this.setIntervalImpl(() => {
181      void this.runTick("interval");
182    }, this.config.intervalMs);
183  }
184
185  async stop(): Promise<void> {
186    if (!this.started && this.intervalHandle == null && this.inFlightTick == null) {
187      return;
188    }
189
190    if (this.intervalHandle != null) {
191      this.clearIntervalImpl(this.intervalHandle);
192      this.intervalHandle = null;
193    }
194
195    const pendingTick = this.inFlightTick;
196    this.started = false;
197
198    if (pendingTick != null) {
199      try {
200        await pendingTick;
201      } catch {
202        // best-effort drain; runner failures are already logged
203      }
204    }
205
206    this.writeFrameworkLog({
207      stage: "stopped",
208      result: "stopped"
209    });
210    await this.flushLogWrites();
211  }
212
213  async runTick(trigger: TimedJobsTickTrigger = "manual"): Promise<TimedJobsTickResult> {
214    if (this.inFlightTick != null) {
215      const batchId = this.buildBatchId();
216      const runnerCount = this.runners.size;
217
218      this.writeFrameworkLog({
219        batchId,
220        details: {
221          trigger
222        },
223        result: "skipped_busy",
224        stage: "tick_skipped_busy"
225      });
226
227      return {
228        batchId,
229        decision: "skipped_busy",
230        runnerCount
231      };
232    }
233
234    const tickPromise = this.executeTick(trigger);
235    this.inFlightTick = tickPromise;
236
237    try {
238      return await tickPromise;
239    } finally {
240      if (this.inFlightTick === tickPromise) {
241        this.inFlightTick = null;
242      }
243    }
244  }
245
246  private async executeTick(trigger: TimedJobsTickTrigger): Promise<TimedJobsTickResult> {
247    const batchId = this.buildBatchId();
248    const runners = [...this.runners.values()];
249
250    if (runners.length === 0) {
251      this.writeFrameworkLog({
252        batchId,
253        details: {
254          trigger
255        },
256        result: "skipped_no_runners",
257        stage: "tick_skipped_no_runners"
258      });
259
260      return {
261        batchId,
262        decision: "skipped_no_runners",
263        runnerCount: 0
264      };
265    }
266
267    const tickStartedAt = Date.now();
268
269    this.writeFrameworkLog({
270      batchId,
271      details: {
272        runner_count: runners.length,
273        trigger
274      },
275      result: "running",
276      stage: "tick_started"
277    });
278
279    let runnerFailures = 0;
280
281    try {
282      const decision = await this.schedule(async (scheduleContext) => {
283        for (const runner of runners) {
284          const ok = await this.runRunner(runner, batchId, trigger, scheduleContext);
285
286          if (!ok) {
287            runnerFailures += 1;
288          }
289        }
290      });
291
292      if (decision === "skipped_not_leader" || decision === "skipped_system_paused") {
293        for (const runner of runners) {
294          this.writeRunnerLog(runner.name, {
295            batchId,
296            details: {
297              trigger
298            },
299            durationMs: 0,
300            result: decision,
301            stage: "runner_skipped"
302          });
303        }
304
305        this.writeFrameworkLog({
306          batchId,
307          details: {
308            runner_count: runners.length,
309            trigger
310          },
311          durationMs: Date.now() - tickStartedAt,
312          result: decision,
313          stage: "tick_completed"
314        });
315
316        return {
317          batchId,
318          decision,
319          runnerCount: runners.length
320        };
321      }
322
323      this.writeFrameworkLog({
324        batchId,
325        details: {
326          failed_runner_count: runnerFailures,
327          runner_count: runners.length,
328          successful_runner_count: runners.length - runnerFailures,
329          trigger
330        },
331        durationMs: Date.now() - tickStartedAt,
332        result: runnerFailures > 0 ? "completed_with_failures" : "scheduled",
333        stage: "tick_completed"
334      });
335
336      return {
337        batchId,
338        decision,
339        runnerCount: runners.length
340      };
341    } catch (error) {
342      this.writeFrameworkLog({
343        batchId,
344        details: {
345          runner_count: runners.length,
346          trigger
347        },
348        durationMs: Date.now() - tickStartedAt,
349        error,
350        result: "failed",
351        stage: "tick_failed"
352      });
353
354      return {
355        batchId,
356        decision: "failed",
357        runnerCount: runners.length
358      };
359    }
360  }
361
362  private async runRunner(
363    runner: TimedJobRunner,
364    batchId: string,
365    trigger: TimedJobsTickTrigger,
366    scheduleContext: TimedJobScheduleContext
367  ): Promise<boolean> {
368    const startedAt = Date.now();
369
370    this.writeRunnerLog(runner.name, {
371      batchId,
372      details: {
373        controller_id: scheduleContext.controllerId,
374        host: scheduleContext.host,
375        max_messages_per_tick: this.config.maxMessagesPerTick,
376        max_tasks_per_tick: this.config.maxTasksPerTick,
377        settle_delay_ms: this.config.settleDelayMs,
378        term: scheduleContext.term,
379        trigger
380      },
381      result: "running",
382      stage: "runner_started"
383    });
384
385    try {
386      const result = await runner.run({
387        ...scheduleContext,
388        artifactStore: this.artifactStore,
389        batchId,
390        config: this.getConfig(),
391        logDir: this.logDir,
392        log: (input) => {
393          this.writeRunnerLog(runner.name, {
394            ...input,
395            batchId
396          });
397        },
398        maxMessagesPerTick: this.config.maxMessagesPerTick,
399        maxTasksPerTick: this.config.maxTasksPerTick,
400        settleDelayMs: this.config.settleDelayMs,
401        trigger
402      });
403
404      this.writeRunnerLog(runner.name, {
405        batchId,
406        details: result?.details,
407        durationMs: Date.now() - startedAt,
408        result: result?.result ?? "ok",
409        stage: "runner_completed"
410      });
411
412      return true;
413    } catch (error) {
414      this.writeRunnerLog(runner.name, {
415        batchId,
416        durationMs: Date.now() - startedAt,
417        error,
418        result: "failed",
419        stage: "runner_failed"
420      });
421
422      return false;
423    }
424  }
425
426  private buildBatchId(): string {
427    this.batchSequence += 1;
428    return `timed-jobs-${Date.now()}-${this.batchSequence}`;
429  }
430
431  private writeFrameworkLog(
432    input: Omit<TimedJobLogInput, "stage"> & { batchId?: string; stage: string }
433  ): void {
434    this.writeLogEntry(FRAMEWORK_RUNNER_NAME, input.batchId ?? this.buildBatchId(), input);
435  }
436
437  private writeRunnerLog(
438    runner: string,
439    input: Omit<TimedJobLogInput, "stage"> & { batchId: string; stage: string }
440  ): void {
441    this.writeLogEntry(runner, input.batchId, input);
442  }
443
444  private async flushLogWrites(): Promise<void> {
445    try {
446      await this.pendingLogWrite;
447    } catch {
448      // individual write failures are already reported
449    }
450  }
451
452  private queueLogWrite(filePath: string, data: string): void {
453    // Keep JSONL entries ordered without blocking tick execution on filesystem IO.
454    this.pendingLogWrite = this.pendingLogWrite
455      .catch(() => {})
456      .then(() => this.appendFileImpl(filePath, data))
457      .catch((error) => {
458        console.error(`[timed-jobs-log] write failed: ${String(error)}`);
459      });
460  }
461
462  private writeLogEntry(
463    runner: string,
464    batchId: string,
465    input: TimedJobLogInput
466  ): void {
467    if (this.logDir == null) {
468      return;
469    }
470
471    const entry: TimedJobsLogEntry = {
472      batch_id: batchId,
473      duration_ms: input.durationMs ?? null,
474      error: normalizeErrorMessage(input.error),
475      result: input.result ?? null,
476      runner,
477      stage: input.stage,
478      ts: new Date().toISOString()
479    };
480
481    if (input.details) {
482      for (const [key, value] of Object.entries(input.details)) {
483        entry[key] = value;
484      }
485    }
486
487    const date = entry.ts.slice(0, 10);
488    const filePath = join(this.logDir, `${date}.jsonl`);
489    this.queueLogWrite(filePath, `${JSON.stringify(entry)}\n`);
490  }
491}
492
493function normalizeErrorMessage(error: unknown): string | null {
494  if (error == null) {
495    return null;
496  }
497
498  if (error instanceof Error && typeof error.message === "string" && error.message.length > 0) {
499    return error.message;
500  }
501
502  const text = String(error);
503  return text === "" ? null : text;
504}
505
506function normalizeRunnerName(name: string): string {
507  return String(name ?? "").trim();
508}