baa-conductor

git clone 

baa-conductor / packages / d1-client / src
im_wower  ·  2026-03-29

client.ts

  1import type {
  2  D1ApiResponse,
  3  D1ClientConfig,
  4  D1PreparedStatement
  5} from "./types.js";
  6import { DEFAULT_TIMEOUT_MS } from "./types.js";
  7
  8/**
  9 * Async Cloudflare D1 HTTP API client.
 10 *
 11 * Returns `null` from {@link createD1Client} when environment variables are
 12 * missing so that callers can treat D1 as optional without error handling.
 13 */
 14export class D1Client {
 15  private readonly url: string;
 16  private readonly apiToken: string;
 17  private readonly fetchImpl: typeof globalThis.fetch;
 18  private readonly timeoutMs: number;
 19
 20  constructor(config: D1ClientConfig) {
 21    this.url = `https://api.cloudflare.com/client/v4/accounts/${config.accountId}/d1/database/${config.databaseId}/query`;
 22    this.apiToken = config.apiToken;
 23    this.fetchImpl = config.fetchImpl ?? globalThis.fetch;
 24    this.timeoutMs = config.timeoutMs ?? DEFAULT_TIMEOUT_MS;
 25  }
 26
 27  /**
 28   * Low-level query — sends a single SQL statement with optional params.
 29   */
 30  private async query<T = Record<string, unknown>>(
 31    sql: string,
 32    params: unknown[] = []
 33  ): Promise<{ results: T[]; meta: { changes: number } }> {
 34    const controller = new AbortController();
 35    const timer = setTimeout(() => controller.abort(), this.timeoutMs);
 36
 37    let response: Response;
 38    try {
 39      response = await this.fetchImpl(this.url, {
 40        method: "POST",
 41        headers: {
 42          Authorization: `Bearer ${this.apiToken}`,
 43          "Content-Type": "application/json"
 44        },
 45        body: JSON.stringify({ sql, params }),
 46        signal: controller.signal
 47      });
 48    } catch (error) {
 49      throw new Error(
 50        `D1 network error: ${error instanceof Error ? error.message : String(error)}`
 51      );
 52    } finally {
 53      clearTimeout(timer);
 54    }
 55
 56    let data: D1ApiResponse<T>;
 57    try {
 58      data = (await response.json()) as D1ApiResponse<T>;
 59    } catch {
 60      throw new Error(
 61        `D1 response parse error: HTTP ${response.status} ${response.statusText}`
 62      );
 63    }
 64
 65    if (!data.success) {
 66      const errMsg = (data.errors ?? []).map((e) => e.message).join("; ");
 67      // DDL "already exists" errors are silently ignored (matches IF NOT EXISTS semantics).
 68      if (/already exists/i.test(errMsg)) {
 69        return { results: [], meta: { changes: 0 } };
 70      }
 71      throw new Error(`D1 query error: ${errMsg}`);
 72    }
 73
 74    const first = data.result[0];
 75    return {
 76      results: first?.results ?? [],
 77      meta: { changes: first?.meta?.changes ?? 0 }
 78    };
 79  }
 80
 81  /**
 82   * Execute one or more SQL statements (separated by `;`).
 83   * DDL "already exists" errors are silently ignored.
 84   */
 85  async exec(sql: string): Promise<void> {
 86    const statements = sql
 87      .split(";")
 88      .map((s) => s.trim())
 89      .filter((s) => s.length > 0);
 90
 91    for (const stmt of statements) {
 92      try {
 93        await this.query(stmt);
 94      } catch (error) {
 95        if (
 96          error instanceof Error &&
 97          /already exists|duplicate column/i.test(error.message)
 98        ) {
 99          continue;
100        }
101        throw error;
102      }
103    }
104  }
105
106  /**
107   * Prepare a parameterised statement.
108   * Returns an object with `run`, `get`, and `all` — all returning Promises.
109   */
110  prepare(sql: string): D1PreparedStatement {
111    return {
112      run: async (...params: unknown[]): Promise<{ changes: number }> => {
113        const result = await this.query(sql, params);
114        return { changes: result.meta.changes };
115      },
116
117      get: async <T = Record<string, unknown>>(
118        ...params: unknown[]
119      ): Promise<T | undefined> => {
120        const result = await this.query<T>(sql, params);
121        return result.results[0] ?? undefined;
122      },
123
124      all: async <T = Record<string, unknown>>(
125        ...params: unknown[]
126      ): Promise<T[]> => {
127        const result = await this.query<T>(sql, params);
128        return result.results;
129      }
130    };
131  }
132}
133
134// ---------------------------------------------------------------------------
135// Factory — returns null when D1 is not configured
136// ---------------------------------------------------------------------------
137
138export interface D1EnvVars {
139  D1_DATABASE_ID?: string;
140  D1_ACCOUNT_ID?: string;
141  CLOUDFLARE_API_TOKEN?: string;
142}
143
144/**
145 * Create a D1Client if all required environment variables are present.
146 * Returns `null` (without throwing) when D1 is not configured.
147 */
148export function createD1Client(
149  env: D1EnvVars,
150  options?: { fetchImpl?: typeof globalThis.fetch; timeoutMs?: number }
151): D1Client | null {
152  const accountId = env.D1_ACCOUNT_ID?.trim();
153  const databaseId = env.D1_DATABASE_ID?.trim();
154  const apiToken = env.CLOUDFLARE_API_TOKEN?.trim();
155
156  if (!accountId || !databaseId || !apiToken) {
157    return null;
158  }
159
160  return new D1Client({
161    accountId,
162    databaseId,
163    apiToken,
164    fetchImpl: options?.fetchImpl,
165    timeoutMs: options?.timeoutMs
166  });
167}