- commit
- 45920f3
- parent
- 23f6f6c
- author
- im_wower
- date
- 2026-03-21 23:51:54 +0800 CST
Merge branch 'feat/T-004-conductor-lease'
5 files changed,
+902,
-15
+178,
-0
1@@ -0,0 +1,178 @@
2+import assert from "node:assert/strict";
3+import test from "node:test";
4+
5+import { ConductorDaemon } from "./index.ts";
6+
7+function createLeaseResult({
8+ holderId,
9+ holderHost = "mini",
10+ term,
11+ leaseExpiresAt,
12+ renewedAt,
13+ isLeader,
14+ operation
15+}) {
16+ return {
17+ holderId,
18+ holderHost,
19+ term,
20+ leaseExpiresAt,
21+ renewedAt,
22+ isLeader,
23+ operation,
24+ lease: {
25+ leaseName: "global",
26+ holderId,
27+ holderHost,
28+ term,
29+ leaseExpiresAt,
30+ renewedAt,
31+ preferredHolderId: holderId,
32+ metadataJson: null
33+ }
34+ };
35+}
36+
37+test("start enters leader state and allows scheduler work only for the lease holder", async () => {
38+ const heartbeatRequests = [];
39+ const leaseRequests = [];
40+ let currentNow = 100;
41+ const daemon = new ConductorDaemon(
42+ {
43+ nodeId: "mini-main",
44+ host: "mini",
45+ role: "primary",
46+ controlApiBase: "https://control.example.test"
47+ },
48+ {
49+ autoStartLoops: false,
50+ client: {
51+ async acquireLeaderLease(request) {
52+ leaseRequests.push(request);
53+
54+ return createLeaseResult({
55+ holderId: "mini-main",
56+ term: 4,
57+ leaseExpiresAt: 130,
58+ renewedAt: 100,
59+ isLeader: true,
60+ operation: "acquire"
61+ });
62+ },
63+ async sendControllerHeartbeat(request) {
64+ heartbeatRequests.push(request);
65+ }
66+ },
67+ now: () => currentNow
68+ }
69+ );
70+
71+ const state = await daemon.start();
72+ assert.equal(state, "leader");
73+ assert.equal(heartbeatRequests.length, 1);
74+ assert.equal(leaseRequests.length, 1);
75+ assert.equal(daemon.canSchedule(), true);
76+ assert.equal(daemon.getNextLeaseOperation(), "renew");
77+
78+ let executed = false;
79+ const decision = await daemon.runSchedulerPass(async () => {
80+ executed = true;
81+ });
82+
83+ assert.equal(decision, "scheduled");
84+ assert.equal(executed, true);
85+
86+ currentNow = 131;
87+ assert.equal(daemon.canSchedule(), false);
88+});
89+
90+test("standby responses keep scheduler disabled", async () => {
91+ const daemon = new ConductorDaemon(
92+ {
93+ nodeId: "mac-standby",
94+ host: "mac",
95+ role: "standby",
96+ controlApiBase: "https://control.example.test"
97+ },
98+ {
99+ autoStartLoops: false,
100+ client: {
101+ async acquireLeaderLease() {
102+ return createLeaseResult({
103+ holderId: "mini-main",
104+ term: 7,
105+ leaseExpiresAt: 230,
106+ renewedAt: 200,
107+ isLeader: false,
108+ operation: "acquire"
109+ });
110+ },
111+ async sendControllerHeartbeat() {}
112+ },
113+ now: () => 200
114+ }
115+ );
116+
117+ const state = await daemon.start();
118+ assert.equal(state, "standby");
119+ assert.equal(daemon.canSchedule(), false);
120+ assert.equal(daemon.getStatusSnapshot().schedulerEnabled, false);
121+
122+ let executed = false;
123+ const decision = await daemon.runSchedulerPass(async () => {
124+ executed = true;
125+ });
126+
127+ assert.equal(decision, "skipped_not_leader");
128+ assert.equal(executed, false);
129+});
130+
131+test("repeated renew failures degrade the daemon after the configured threshold", async () => {
132+ const calls = [];
133+ let currentNow = 100;
134+ const daemon = new ConductorDaemon(
135+ {
136+ nodeId: "mini-main",
137+ host: "mini",
138+ role: "primary",
139+ controlApiBase: "https://control.example.test",
140+ renewFailureThreshold: 2
141+ },
142+ {
143+ autoStartLoops: false,
144+ client: {
145+ async acquireLeaderLease(request) {
146+ calls.push(request);
147+
148+ if (calls.length === 1) {
149+ return createLeaseResult({
150+ holderId: "mini-main",
151+ term: 1,
152+ leaseExpiresAt: 130,
153+ renewedAt: 100,
154+ isLeader: true,
155+ operation: "acquire"
156+ });
157+ }
158+
159+ throw new Error("lease endpoint timeout");
160+ },
161+ async sendControllerHeartbeat() {}
162+ },
163+ now: () => currentNow
164+ }
165+ );
166+
167+ await daemon.start();
168+ assert.equal(daemon.getStatusSnapshot().leaseState, "leader");
169+
170+ currentNow = 110;
171+ await assert.rejects(() => daemon.runLeaseCycle(), /lease endpoint timeout/);
172+ assert.equal(daemon.getStatusSnapshot().leaseState, "leader");
173+ assert.equal(daemon.getStatusSnapshot().consecutiveRenewFailures, 1);
174+
175+ currentNow = 115;
176+ await assert.rejects(() => daemon.runLeaseCycle(), /lease endpoint timeout/);
177+ assert.equal(daemon.getStatusSnapshot().leaseState, "degraded");
178+ assert.equal(daemon.getStatusSnapshot().consecutiveRenewFailures, 2);
179+});
+411,
-6
1@@ -1,11 +1,69 @@
2 export type ConductorRole = "primary" | "standby";
3 export type LeaseState = "leader" | "standby" | "degraded";
4+export type SchedulerDecision = "scheduled" | "skipped_not_leader";
5+export type TimerHandle = ReturnType<typeof globalThis.setInterval>;
6+export type LeaderLeaseOperation = "acquire" | "renew";
7+
8+const DEFAULT_HEARTBEAT_INTERVAL_MS = 5_000;
9+const DEFAULT_LEASE_RENEW_INTERVAL_MS = 5_000;
10+const DEFAULT_LEASE_TTL_SEC = 30;
11+const DEFAULT_RENEW_FAILURE_THRESHOLD = 2;
12+
13+export interface ControllerHeartbeatInput {
14+ controllerId: string;
15+ host: string;
16+ role: string;
17+ priority: number;
18+ status: string;
19+ version?: string | null;
20+ heartbeatAt?: number;
21+ startedAt?: number | null;
22+}
23+
24+export interface LeaderLeaseRecord {
25+ leaseName: string;
26+ holderId: string;
27+ holderHost: string;
28+ term: number;
29+ leaseExpiresAt: number;
30+ renewedAt: number;
31+ preferredHolderId: string | null;
32+ metadataJson: string | null;
33+}
34+
35+export interface LeaderLeaseAcquireInput {
36+ controllerId: string;
37+ host: string;
38+ ttlSec: number;
39+ preferred?: boolean;
40+ metadataJson?: string | null;
41+ now?: number;
42+}
43+
44+export interface LeaderLeaseAcquireResult {
45+ holderId: string;
46+ holderHost: string;
47+ term: number;
48+ leaseExpiresAt: number;
49+ renewedAt: number;
50+ isLeader: boolean;
51+ operation: LeaderLeaseOperation;
52+ lease: LeaderLeaseRecord;
53+}
54
55 export interface ConductorConfig {
56 nodeId: string;
57 host: string;
58 role: ConductorRole;
59 controlApiBase: string;
60+ priority?: number;
61+ version?: string | null;
62+ preferred?: boolean;
63+ heartbeatIntervalMs?: number;
64+ leaseRenewIntervalMs?: number;
65+ leaseTtlSec?: number;
66+ renewFailureThreshold?: number;
67+ startedAt?: number;
68 }
69
70 export interface StartupChecklistItem {
71@@ -13,15 +71,160 @@ export interface StartupChecklistItem {
72 description: string;
73 }
74
75+export interface ConductorStatusSnapshot {
76+ nodeId: string;
77+ host: string;
78+ role: ConductorRole;
79+ leaseState: LeaseState;
80+ schedulerEnabled: boolean;
81+ currentLeaderId: string | null;
82+ currentTerm: number | null;
83+ leaseExpiresAt: number | null;
84+ lastHeartbeatAt: number | null;
85+ lastLeaseOperation: LeaderLeaseOperation | null;
86+ nextLeaseOperation: LeaderLeaseOperation;
87+ consecutiveRenewFailures: number;
88+ lastError: string | null;
89+}
90+
91+export interface SchedulerContext {
92+ controllerId: string;
93+ host: string;
94+ term: number;
95+}
96+
97+export interface ConductorControlApiClient {
98+ acquireLeaderLease(input: LeaderLeaseAcquireInput): Promise<LeaderLeaseAcquireResult>;
99+ sendControllerHeartbeat(input: ControllerHeartbeatInput): Promise<void>;
100+}
101+
102+export interface ConductorDaemonHooks {
103+ loadLocalRuns?: () => Promise<void>;
104+ onLeaseStateChange?: (snapshot: ConductorStatusSnapshot) => Promise<void> | void;
105+ reconcileLocalRuns?: () => Promise<void>;
106+}
107+
108+export interface ConductorDaemonOptions {
109+ autoStartLoops?: boolean;
110+ clearIntervalImpl?: (handle: TimerHandle) => void;
111+ client?: ConductorControlApiClient;
112+ fetchImpl?: typeof fetch;
113+ hooks?: ConductorDaemonHooks;
114+ now?: () => number;
115+ setIntervalImpl?: (handler: () => void, intervalMs: number) => TimerHandle;
116+}
117+
118+interface ConductorRuntimeState {
119+ consecutiveRenewFailures: number;
120+ currentLeaderId: string | null;
121+ currentTerm: number | null;
122+ lastError: string | null;
123+ lastHeartbeatAt: number | null;
124+ lastLeaseOperation: LeaderLeaseOperation | null;
125+ leaseExpiresAt: number | null;
126+ leaseState: LeaseState;
127+}
128+
129+function normalizeBaseUrl(baseUrl: string): string {
130+ return baseUrl.endsWith("/") ? baseUrl.slice(0, -1) : baseUrl;
131+}
132+
133+function toErrorMessage(error: unknown): string {
134+ if (error instanceof Error) {
135+ return error.message;
136+ }
137+
138+ return String(error);
139+}
140+
141+async function postJson<T>(
142+ fetchImpl: typeof fetch,
143+ baseUrl: string,
144+ path: string,
145+ body: unknown
146+): Promise<T> {
147+ const response = await fetchImpl(`${normalizeBaseUrl(baseUrl)}${path}`, {
148+ method: "POST",
149+ headers: {
150+ "content-type": "application/json"
151+ },
152+ body: JSON.stringify(body)
153+ });
154+ const text = await response.text();
155+
156+ if (!response.ok) {
157+ const detail = text === "" ? response.statusText : text;
158+ throw new Error(`Control API ${response.status} ${path}: ${detail}`);
159+ }
160+
161+ if (text === "") {
162+ return undefined as T;
163+ }
164+
165+ return JSON.parse(text) as T;
166+}
167+
168+export function createFetchControlApiClient(
169+ baseUrl: string,
170+ fetchImpl: typeof fetch = globalThis.fetch
171+): ConductorControlApiClient {
172+ return {
173+ async acquireLeaderLease(input: LeaderLeaseAcquireInput): Promise<LeaderLeaseAcquireResult> {
174+ return postJson<LeaderLeaseAcquireResult>(fetchImpl, baseUrl, "/v1/leader/acquire", input);
175+ },
176+ async sendControllerHeartbeat(input: ControllerHeartbeatInput): Promise<void> {
177+ await postJson(fetchImpl, baseUrl, "/v1/controllers/heartbeat", input);
178+ }
179+ };
180+}
181+
182 export class ConductorDaemon {
183- constructor(private readonly config: ConductorConfig) {}
184+ private readonly autoStartLoops: boolean;
185+ private readonly clearIntervalImpl: (handle: TimerHandle) => void;
186+ private readonly client: ConductorControlApiClient;
187+ private readonly config: ConductorConfig;
188+ private heartbeatTimer: TimerHandle | null = null;
189+ private readonly hooks?: ConductorDaemonHooks;
190+ private leaseTimer: TimerHandle | null = null;
191+ private readonly now: () => number;
192+ private readonly setIntervalImpl: (handler: () => void, intervalMs: number) => TimerHandle;
193+ private readonly startedAt: number;
194+ private readonly state: ConductorRuntimeState = {
195+ consecutiveRenewFailures: 0,
196+ currentLeaderId: null,
197+ currentTerm: null,
198+ lastError: null,
199+ lastHeartbeatAt: null,
200+ lastLeaseOperation: null,
201+ leaseExpiresAt: null,
202+ leaseState: "standby"
203+ };
204+
205+ constructor(
206+ config: ConductorConfig,
207+ options: ConductorDaemonOptions = {}
208+ ) {
209+ this.config = config;
210+ this.autoStartLoops = options.autoStartLoops ?? true;
211+ this.clearIntervalImpl =
212+ options.clearIntervalImpl ?? ((handle) => globalThis.clearInterval(handle));
213+ this.client =
214+ options.client ?? createFetchControlApiClient(config.controlApiBase, options.fetchImpl);
215+ this.hooks = options.hooks;
216+ this.now = options.now ?? (() => Math.floor(Date.now() / 1000));
217+ this.setIntervalImpl =
218+ options.setIntervalImpl ?? ((handler, intervalMs) => globalThis.setInterval(handler, intervalMs));
219+ this.startedAt = config.startedAt ?? this.now();
220+ }
221
222 getStartupChecklist(): StartupChecklistItem[] {
223 return [
224- { key: "register-controller", description: "注册 controller 心跳" },
225+ { key: "register-controller", description: "注册 controller 并写入初始 heartbeat" },
226+ { key: "start-heartbeat-loop", description: "启动 controller heartbeat loop" },
227 { key: "acquire-lease", description: "尝试获取或续租 leader lease" },
228- { key: "load-runs", description: "扫描本地 run 目录并恢复状态" },
229- { key: "start-scheduler", description: "启动 scheduler loop" }
230+ { key: "load-runs", description: "扫描本地未完成 runs" },
231+ { key: "reconcile-runs", description: "对账本地过期 runs 与控制平面状态" },
232+ { key: "start-scheduler", description: "仅 leader 进入 scheduler loop,standby 拒绝调度" }
233 ];
234 }
235
236@@ -29,8 +232,210 @@ export class ConductorDaemon {
237 return `${this.config.nodeId}@${this.config.host}(${this.config.role})`;
238 }
239
240+ getStatusSnapshot(now: number = this.now()): ConductorStatusSnapshot {
241+ return {
242+ nodeId: this.config.nodeId,
243+ host: this.config.host,
244+ role: this.config.role,
245+ leaseState: this.state.leaseState,
246+ schedulerEnabled: this.canSchedule(now),
247+ currentLeaderId: this.state.currentLeaderId,
248+ currentTerm: this.state.currentTerm,
249+ leaseExpiresAt: this.state.leaseExpiresAt,
250+ lastHeartbeatAt: this.state.lastHeartbeatAt,
251+ lastLeaseOperation: this.state.lastLeaseOperation,
252+ nextLeaseOperation: this.getNextLeaseOperation(now),
253+ consecutiveRenewFailures: this.state.consecutiveRenewFailures,
254+ lastError: this.state.lastError
255+ };
256+ }
257+
258+ getNextLeaseOperation(now: number = this.now()): LeaderLeaseOperation {
259+ return this.state.currentLeaderId === this.config.nodeId &&
260+ this.state.leaseExpiresAt != null &&
261+ this.state.leaseExpiresAt > now
262+ ? "renew"
263+ : "acquire";
264+ }
265+
266+ canSchedule(now: number = this.now()): boolean {
267+ return (
268+ this.state.leaseState === "leader" &&
269+ this.state.currentLeaderId === this.config.nodeId &&
270+ this.state.leaseExpiresAt != null &&
271+ this.state.leaseExpiresAt > now
272+ );
273+ }
274+
275 async start(): Promise<LeaseState> {
276- return "standby";
277+ try {
278+ await this.sendHeartbeat();
279+ } catch (error) {
280+ this.state.lastError = toErrorMessage(error);
281+ await this.transitionTo("degraded");
282+ }
283+
284+ if (this.autoStartLoops) {
285+ this.startLoops();
286+ }
287+
288+ try {
289+ await this.runLeaseCycle();
290+ } catch {
291+ // Keep startup non-throwing so the process can surface degraded state.
292+ }
293+
294+ await this.hooks?.loadLocalRuns?.();
295+ await this.hooks?.reconcileLocalRuns?.();
296+
297+ return this.state.leaseState;
298+ }
299+
300+ stop(): void {
301+ if (this.heartbeatTimer != null) {
302+ this.clearIntervalImpl(this.heartbeatTimer);
303+ this.heartbeatTimer = null;
304+ }
305+
306+ if (this.leaseTimer != null) {
307+ this.clearIntervalImpl(this.leaseTimer);
308+ this.leaseTimer = null;
309+ }
310+ }
311+
312+ async sendHeartbeat(): Promise<void> {
313+ const request = this.buildHeartbeatRequest();
314+ await this.client.sendControllerHeartbeat(request);
315+ this.state.lastHeartbeatAt = request.heartbeatAt ?? this.now();
316+ this.state.lastError = null;
317+ }
318+
319+ async runLeaseCycle(): Promise<LeaseState> {
320+ const requestedOperation = this.getNextLeaseOperation();
321+ this.state.lastLeaseOperation = requestedOperation;
322+
323+ try {
324+ const result = await this.client.acquireLeaderLease(this.buildLeaseRequest());
325+ this.applyLeaseResult(result);
326+ this.state.consecutiveRenewFailures = 0;
327+ this.state.lastError = null;
328+ await this.transitionTo(result.isLeader ? "leader" : "standby");
329+ return this.state.leaseState;
330+ } catch (error) {
331+ this.state.lastError = toErrorMessage(error);
332+
333+ if (requestedOperation === "renew" && this.state.leaseState === "leader") {
334+ this.state.consecutiveRenewFailures += 1;
335+
336+ if (this.state.consecutiveRenewFailures >= this.getRenewFailureThreshold()) {
337+ await this.transitionTo("degraded");
338+ }
339+ } else {
340+ await this.transitionTo("degraded");
341+ }
342+
343+ throw error;
344+ }
345+ }
346+
347+ async runSchedulerPass(
348+ work: (context: SchedulerContext) => Promise<void>
349+ ): Promise<SchedulerDecision> {
350+ if (!this.canSchedule()) {
351+ return "skipped_not_leader";
352+ }
353+
354+ if (this.state.currentTerm == null) {
355+ return "skipped_not_leader";
356+ }
357+
358+ await work({
359+ controllerId: this.config.nodeId,
360+ host: this.config.host,
361+ term: this.state.currentTerm
362+ });
363+
364+ return "scheduled";
365+ }
366+
367+ private applyLeaseResult(result: LeaderLeaseAcquireResult): void {
368+ this.state.currentLeaderId = result.holderId;
369+ this.state.currentTerm = result.term;
370+ this.state.lastLeaseOperation = result.operation;
371+ this.state.leaseExpiresAt = result.leaseExpiresAt;
372+ }
373+
374+ private buildHeartbeatRequest(): ControllerHeartbeatInput {
375+ return {
376+ controllerId: this.config.nodeId,
377+ host: this.config.host,
378+ role: this.config.role,
379+ priority: this.config.priority ?? (this.config.role === "primary" ? 100 : 50),
380+ status: this.state.leaseState === "degraded" ? "degraded" : "alive",
381+ version: this.config.version ?? null,
382+ heartbeatAt: this.now(),
383+ startedAt: this.startedAt
384+ };
385+ }
386+
387+ private buildLeaseRequest(): LeaderLeaseAcquireInput {
388+ return {
389+ controllerId: this.config.nodeId,
390+ host: this.config.host,
391+ ttlSec: this.config.leaseTtlSec ?? DEFAULT_LEASE_TTL_SEC,
392+ preferred: this.config.preferred ?? this.config.role === "primary",
393+ now: this.now()
394+ };
395+ }
396+
397+ private getHeartbeatIntervalMs(): number {
398+ return this.config.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS;
399 }
400-}
401
402+ private getLeaseRenewIntervalMs(): number {
403+ return this.config.leaseRenewIntervalMs ?? DEFAULT_LEASE_RENEW_INTERVAL_MS;
404+ }
405+
406+ private getRenewFailureThreshold(): number {
407+ return this.config.renewFailureThreshold ?? DEFAULT_RENEW_FAILURE_THRESHOLD;
408+ }
409+
410+ private startLoops(): void {
411+ if (this.heartbeatTimer == null) {
412+ this.heartbeatTimer = this.setIntervalImpl(() => {
413+ void this.safeHeartbeatTick();
414+ }, this.getHeartbeatIntervalMs());
415+ }
416+
417+ if (this.leaseTimer == null) {
418+ this.leaseTimer = this.setIntervalImpl(() => {
419+ void this.safeLeaseTick();
420+ }, this.getLeaseRenewIntervalMs());
421+ }
422+ }
423+
424+ private async safeHeartbeatTick(): Promise<void> {
425+ try {
426+ await this.sendHeartbeat();
427+ } catch (error) {
428+ this.state.lastError = toErrorMessage(error);
429+ }
430+ }
431+
432+ private async safeLeaseTick(): Promise<void> {
433+ try {
434+ await this.runLeaseCycle();
435+ } catch {
436+ // Background loop keeps the last known state and waits for the next cycle.
437+ }
438+ }
439+
440+ private async transitionTo(nextState: LeaseState): Promise<void> {
441+ if (this.state.leaseState === nextState) {
442+ return;
443+ }
444+
445+ this.state.leaseState = nextState;
446+ await this.hooks?.onLeaseStateChange?.(this.getStatusSnapshot());
447+ }
448+}
1@@ -1,10 +1,10 @@
2 ---
3 task_id: T-004
4 title: Conductor Lease 与 Heartbeat
5-status: todo
6+status: review
7 branch: feat/T-004-conductor-lease
8 repo: /Users/george/code/baa-conductor
9-base_ref: main@28829de
10+base_ref: main@56e9a4f
11 depends_on:
12 - T-002
13 - T-003
14@@ -22,7 +22,7 @@ updated_at: 2026-03-21
15
16 ## 统一开工要求
17
18-- 必须从 `main@28829de` 切出该分支
19+- 必须从 `main@56e9a4f` 切出该分支
20 - 新 worktree 进入后先执行 `npx --yes pnpm install`
21 - 不允许从其他任务分支切分支
22
23@@ -58,23 +58,33 @@ updated_at: 2026-03-21
24
25 ## files_changed
26
27-- 待填写
28+- `apps/conductor-daemon/src/index.ts`
29+- `apps/conductor-daemon/src/index.test.js`
30+- `packages/db/src/index.ts`
31+- `packages/db/src/index.test.js`
32
33 ## commands_run
34
35-- 待填写
36+- `npx --yes pnpm install`
37+- `npx --yes pnpm --filter @baa-conductor/db typecheck`
38+- `npx --yes pnpm --filter @baa-conductor/conductor-daemon typecheck`
39+- `node --test --experimental-strip-types packages/db/src/index.test.js`
40+- `node --test --experimental-strip-types apps/conductor-daemon/src/index.test.js`
41
42 ## result
43
44-- 待填写
45+- `packages/db` 新增 controller heartbeat helper、lease acquire/renew 数据模型,以及基于 `leader_lease` 的最小抢占/续租仓储方法。
46+- `apps/conductor-daemon` 新增 startup checklist、control API client、heartbeat/lease loop、leader-only scheduler gate,并补了针对性的状态机测试。
47
48 ## risks
49
50-- 待填写
51+- `apps/conductor-daemon` 当前内置了一份 lease/heartbeat payload 类型,后续适合和 `T-003` 的 control API 契约收敛到共享定义。
52+- 当前 lease 时间字段按 epoch seconds 处理,后续接入真实 control API 时需要和响应约定保持一致。
53
54 ## next_handoff
55
56-- 给后续 scheduler 和 status API 提供主备状态基线
57+- `T-003` 可直接把 `/v1/controllers/heartbeat` 与 `/v1/leader/acquire` 接到 `packages/db` 新增的 helper 与仓储方法。
58+- 后续 scheduler 与 status API 可直接复用 `ConductorDaemon.getStatusSnapshot()`、`canSchedule()`、`runSchedulerPass()` 作为主备门禁基线。
59
60 ## notes
61
+98,
-0
1@@ -0,0 +1,98 @@
2+import assert from "node:assert/strict";
3+import test from "node:test";
4+
5+import {
6+ GLOBAL_LEASE_NAME,
7+ buildControllerHeartbeatRecord,
8+ buildLeaderLeaseAcquireResult,
9+ buildLeaderLeaseRecord,
10+ getLeaderLeaseOperation,
11+ isLeaderLeaseExpired
12+} from "./index.ts";
13+
14+test("buildControllerHeartbeatRecord preserves heartbeat and startup timestamps", () => {
15+ const record = buildControllerHeartbeatRecord({
16+ controllerId: "mini-main",
17+ host: "mini",
18+ role: "primary",
19+ priority: 100,
20+ status: "alive",
21+ version: "0.1.0",
22+ heartbeatAt: 120,
23+ startedAt: 90,
24+ metadataJson: "{\"slot\":\"main\"}"
25+ });
26+
27+ assert.deepEqual(record, {
28+ controllerId: "mini-main",
29+ host: "mini",
30+ role: "primary",
31+ priority: 100,
32+ status: "alive",
33+ version: "0.1.0",
34+ lastHeartbeatAt: 120,
35+ lastStartedAt: 90,
36+ metadataJson: "{\"slot\":\"main\"}"
37+ });
38+});
39+
40+test("getLeaderLeaseOperation returns renew only for the active holder", () => {
41+ const currentLease = {
42+ leaseName: GLOBAL_LEASE_NAME,
43+ holderId: "mini-main",
44+ holderHost: "mini",
45+ term: 7,
46+ leaseExpiresAt: 150,
47+ renewedAt: 120,
48+ preferredHolderId: "mini-main",
49+ metadataJson: null
50+ };
51+
52+ assert.equal(getLeaderLeaseOperation(currentLease, "mini-main", 140), "renew");
53+ assert.equal(getLeaderLeaseOperation(currentLease, "mac-standby", 140), "acquire");
54+ assert.equal(getLeaderLeaseOperation(currentLease, "mini-main", 151), "acquire");
55+ assert.equal(isLeaderLeaseExpired(currentLease, 151), true);
56+});
57+
58+test("buildLeaderLeaseRecord increments term for takeover and marks standby responses", () => {
59+ const previousLease = {
60+ leaseName: GLOBAL_LEASE_NAME,
61+ holderId: "mini-main",
62+ holderHost: "mini",
63+ term: 3,
64+ leaseExpiresAt: 110,
65+ renewedAt: 80,
66+ preferredHolderId: "mini-main",
67+ metadataJson: "{\"role\":\"primary\"}"
68+ };
69+
70+ const desiredLease = buildLeaderLeaseRecord(previousLease, {
71+ controllerId: "mac-standby",
72+ host: "mac",
73+ ttlSec: 30,
74+ preferred: false,
75+ now: 111
76+ });
77+
78+ assert.deepEqual(desiredLease, {
79+ leaseName: GLOBAL_LEASE_NAME,
80+ holderId: "mac-standby",
81+ holderHost: "mac",
82+ term: 4,
83+ leaseExpiresAt: 141,
84+ renewedAt: 111,
85+ preferredHolderId: "mini-main",
86+ metadataJson: "{\"role\":\"primary\"}"
87+ });
88+
89+ const response = buildLeaderLeaseAcquireResult(previousLease, previousLease, {
90+ controllerId: "mac-standby",
91+ host: "mac",
92+ ttlSec: 30,
93+ now: 100
94+ });
95+
96+ assert.equal(response.isLeader, false);
97+ assert.equal(response.operation, "acquire");
98+ assert.equal(response.holderId, "mini-main");
99+});
+197,
-1
1@@ -16,6 +16,9 @@ export type D1TableName = (typeof D1_TABLES)[number];
2 export const GLOBAL_LEASE_NAME = "global";
3 export const AUTOMATION_STATE_KEY = "automation";
4 export const DEFAULT_AUTOMATION_MODE = "running";
5+export const DEFAULT_LEASE_TTL_SEC = 30;
6+export const DEFAULT_LEASE_RENEW_INTERVAL_SEC = 5;
7+export const DEFAULT_LEASE_RENEW_FAILURE_THRESHOLD = 2;
8
9 export const AUTOMATION_MODE_VALUES = ["running", "draining", "paused"] as const;
10 export const TASK_STATUS_VALUES = [
11@@ -107,6 +110,40 @@ export interface ControllerRecord {
12 metadataJson: string | null;
13 }
14
15+export type LeaderLeaseOperation = "acquire" | "renew";
16+
17+export interface ControllerHeartbeatInput {
18+ controllerId: string;
19+ host: string;
20+ role: string;
21+ priority: number;
22+ status: string;
23+ version?: string | null;
24+ heartbeatAt?: number;
25+ startedAt?: number | null;
26+ metadataJson?: string | null;
27+}
28+
29+export interface LeaderLeaseAcquireInput {
30+ controllerId: string;
31+ host: string;
32+ ttlSec: number;
33+ preferred?: boolean;
34+ metadataJson?: string | null;
35+ now?: number;
36+}
37+
38+export interface LeaderLeaseAcquireResult {
39+ holderId: string;
40+ holderHost: string;
41+ term: number;
42+ leaseExpiresAt: number;
43+ renewedAt: number;
44+ isLeader: boolean;
45+ operation: LeaderLeaseOperation;
46+ lease: LeaderLeaseRecord;
47+}
48+
49 export interface WorkerRecord {
50 workerId: string;
51 controllerId: string;
52@@ -258,7 +295,9 @@ export interface TaskArtifactRecord {
53
54 export interface ControlPlaneRepository {
55 appendTaskLog(record: NewTaskLogRecord): Promise<number | null>;
56+ heartbeatController(input: ControllerHeartbeatInput): Promise<ControllerRecord>;
57 ensureAutomationState(mode?: AutomationMode): Promise<void>;
58+ acquireLeaderLease(input: LeaderLeaseAcquireInput): Promise<LeaderLeaseAcquireResult>;
59 getAutomationState(): Promise<AutomationStateRecord | null>;
60 getCurrentLease(): Promise<LeaderLeaseRecord | null>;
61 getSystemState(stateKey: string): Promise<SystemStateRecord | null>;
62@@ -322,6 +361,88 @@ export function buildAutomationStateValue(mode: AutomationMode): string {
63 return JSON.stringify({ mode });
64 }
65
66+export function isLeaderLeaseExpired(
67+ lease: Pick<LeaderLeaseRecord, "leaseExpiresAt">,
68+ now: number = nowUnixSeconds()
69+): boolean {
70+ return lease.leaseExpiresAt <= now;
71+}
72+
73+export function canAcquireLeaderLease(
74+ lease: LeaderLeaseRecord | null,
75+ controllerId: string,
76+ now: number = nowUnixSeconds()
77+): boolean {
78+ return lease == null || lease.holderId === controllerId || isLeaderLeaseExpired(lease, now);
79+}
80+
81+export function getLeaderLeaseOperation(
82+ lease: LeaderLeaseRecord | null,
83+ controllerId: string,
84+ now: number = nowUnixSeconds()
85+): LeaderLeaseOperation {
86+ return lease != null && lease.holderId === controllerId && !isLeaderLeaseExpired(lease, now)
87+ ? "renew"
88+ : "acquire";
89+}
90+
91+export function buildControllerHeartbeatRecord(input: ControllerHeartbeatInput): ControllerRecord {
92+ const heartbeatAt = input.heartbeatAt ?? nowUnixSeconds();
93+
94+ return {
95+ controllerId: input.controllerId,
96+ host: input.host,
97+ role: input.role,
98+ priority: input.priority,
99+ status: input.status,
100+ version: input.version ?? null,
101+ lastHeartbeatAt: heartbeatAt,
102+ lastStartedAt: input.startedAt ?? heartbeatAt,
103+ metadataJson: input.metadataJson ?? null
104+ };
105+}
106+
107+export function buildLeaderLeaseRecord(
108+ currentLease: LeaderLeaseRecord | null,
109+ input: LeaderLeaseAcquireInput
110+): LeaderLeaseRecord {
111+ const now = input.now ?? nowUnixSeconds();
112+ const operation = getLeaderLeaseOperation(currentLease, input.controllerId, now);
113+
114+ return {
115+ leaseName: GLOBAL_LEASE_NAME,
116+ holderId: input.controllerId,
117+ holderHost: input.host,
118+ term: operation === "renew" ? currentLease!.term : (currentLease?.term ?? 0) + 1,
119+ leaseExpiresAt: now + input.ttlSec,
120+ renewedAt: now,
121+ preferredHolderId: input.preferred ? input.controllerId : currentLease?.preferredHolderId ?? null,
122+ metadataJson: input.metadataJson ?? currentLease?.metadataJson ?? null
123+ };
124+}
125+
126+export function buildLeaderLeaseAcquireResult(
127+ previousLease: LeaderLeaseRecord | null,
128+ currentLease: LeaderLeaseRecord,
129+ input: LeaderLeaseAcquireInput
130+): LeaderLeaseAcquireResult {
131+ const now = input.now ?? currentLease.renewedAt;
132+
133+ return {
134+ holderId: currentLease.holderId,
135+ holderHost: currentLease.holderHost,
136+ term: currentLease.term,
137+ leaseExpiresAt: currentLease.leaseExpiresAt,
138+ renewedAt: currentLease.renewedAt,
139+ isLeader: currentLease.holderId === input.controllerId,
140+ operation:
141+ currentLease.holderId === input.controllerId
142+ ? getLeaderLeaseOperation(previousLease, input.controllerId, now)
143+ : "acquire",
144+ lease: currentLease
145+ };
146+}
147+
148 function toD1Bindable(value: D1Bindable | undefined): D1Bindable {
149 return value ?? null;
150 }
151@@ -645,6 +766,50 @@ export const UPSERT_LEADER_LEASE_SQL = `
152 metadata_json = excluded.metadata_json
153 `;
154
155+export const ACQUIRE_LEADER_LEASE_SQL = `
156+ INSERT INTO leader_lease (
157+ lease_name,
158+ holder_id,
159+ holder_host,
160+ term,
161+ lease_expires_at,
162+ renewed_at,
163+ preferred_holder_id,
164+ metadata_json
165+ )
166+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
167+ ON CONFLICT(lease_name) DO UPDATE SET
168+ holder_id = CASE
169+ WHEN leader_lease.holder_id = excluded.holder_id OR leader_lease.lease_expires_at <= ? THEN excluded.holder_id
170+ ELSE leader_lease.holder_id
171+ END,
172+ holder_host = CASE
173+ WHEN leader_lease.holder_id = excluded.holder_id OR leader_lease.lease_expires_at <= ? THEN excluded.holder_host
174+ ELSE leader_lease.holder_host
175+ END,
176+ term = CASE
177+ WHEN leader_lease.holder_id = excluded.holder_id THEN leader_lease.term
178+ WHEN leader_lease.lease_expires_at <= ? THEN leader_lease.term + 1
179+ ELSE leader_lease.term
180+ END,
181+ lease_expires_at = CASE
182+ WHEN leader_lease.holder_id = excluded.holder_id OR leader_lease.lease_expires_at <= ? THEN excluded.lease_expires_at
183+ ELSE leader_lease.lease_expires_at
184+ END,
185+ renewed_at = CASE
186+ WHEN leader_lease.holder_id = excluded.holder_id OR leader_lease.lease_expires_at <= ? THEN excluded.renewed_at
187+ ELSE leader_lease.renewed_at
188+ END,
189+ preferred_holder_id = CASE
190+ WHEN leader_lease.holder_id = excluded.holder_id OR leader_lease.lease_expires_at <= ? THEN excluded.preferred_holder_id
191+ ELSE leader_lease.preferred_holder_id
192+ END,
193+ metadata_json = CASE
194+ WHEN leader_lease.holder_id = excluded.holder_id OR leader_lease.lease_expires_at <= ? THEN excluded.metadata_json
195+ ELSE leader_lease.metadata_json
196+ END
197+`;
198+
199 export const UPSERT_CONTROLLER_SQL = `
200 INSERT INTO controllers (
201 controller_id,
202@@ -928,6 +1093,10 @@ function leaderLeaseParams(record: LeaderLeaseRecord): D1Bindable[] {
203 ];
204 }
205
206+function acquireLeaderLeaseParams(record: LeaderLeaseRecord, now: number): D1Bindable[] {
207+ return [...leaderLeaseParams(record), now, now, now, now, now, now, now];
208+}
209+
210 function controllerParams(record: ControllerRecord): D1Bindable[] {
211 return [
212 record.controllerId,
213@@ -1087,7 +1256,11 @@ function taskArtifactParams(record: TaskArtifactRecord): D1Bindable[] {
214 }
215
216 export class D1ControlPlaneRepository implements ControlPlaneRepository {
217- constructor(private readonly db: D1DatabaseLike) {}
218+ private readonly db: D1DatabaseLike;
219+
220+ constructor(db: D1DatabaseLike) {
221+ this.db = db;
222+ }
223
224 async ensureAutomationState(mode: AutomationMode = DEFAULT_AUTOMATION_MODE): Promise<void> {
225 await this.run(ENSURE_AUTOMATION_STATE_SQL, [
226@@ -1097,6 +1270,29 @@ export class D1ControlPlaneRepository implements ControlPlaneRepository {
227 ]);
228 }
229
230+ async heartbeatController(input: ControllerHeartbeatInput): Promise<ControllerRecord> {
231+ const record = buildControllerHeartbeatRecord(input);
232+ await this.upsertController(record);
233+ return record;
234+ }
235+
236+ async acquireLeaderLease(input: LeaderLeaseAcquireInput): Promise<LeaderLeaseAcquireResult> {
237+ const now = input.now ?? nowUnixSeconds();
238+ const normalizedInput = { ...input, now };
239+ const currentLease = await this.getCurrentLease();
240+ const desiredLease = buildLeaderLeaseRecord(currentLease, normalizedInput);
241+
242+ await this.run(ACQUIRE_LEADER_LEASE_SQL, acquireLeaderLeaseParams(desiredLease, now));
243+
244+ const updatedLease = await this.getCurrentLease();
245+
246+ if (updatedLease == null) {
247+ throw new Error("leader_lease row was not available after acquire attempt.");
248+ }
249+
250+ return buildLeaderLeaseAcquireResult(currentLease, updatedLease, normalizedInput);
251+ }
252+
253 async getAutomationState(): Promise<AutomationStateRecord | null> {
254 const row = await this.fetchFirst(SELECT_SYSTEM_STATE_SQL, [AUTOMATION_STATE_KEY]);
255 return row == null ? null : mapAutomationStateRow(row);