import { appendFile, readFile, writeFile, mkdir } from "node:fs/promises"; import { dirname } from "node:path"; import { existsSync } from "node:fs"; import type { z } from "zod"; /** * Append a single record to a JSONL file. Creates parent dirs if needed. * Uses appendFile which is atomic on Linux for reasonable line sizes. */ export async function appendJsonl(path: string, record: T): Promise { const dir = dirname(path); if (!existsSync(dir)) await mkdir(dir, { recursive: true }); await appendFile(path, JSON.stringify(record) + "\n"); } /** * Read a JSONL file, parse each line with the given Zod schema. * Skips blank lines and lines that fail JSON.parse (truncated from crash). * Returns { records, skipped } so callers can log corruption. */ export async function readJsonl( path: string, schema: S, ): Promise<{ records: z.infer[]; skipped: number }> { if (!existsSync(path)) return { records: [], skipped: 0 }; const text = await readFile(path, "utf-8"); const lines = text.split("\n").filter((l) => l.trim().length > 0); const records: z.infer[] = []; let skipped = 0; for (const line of lines) { try { const parsed = JSON.parse(line); const result = schema.safeParse(parsed); if (result.success) { records.push(result.data); } else { skipped++; } } catch { skipped++; } } return { records, skipped }; } /** * Read raw JSON objects from JSONL without schema validation. * Useful for checkpoint recovery where we only need IDs. */ export async function readJsonlRaw( path: string, ): Promise<{ records: unknown[]; skipped: number }> { if (!existsSync(path)) return { records: [], skipped: 0 }; const text = await readFile(path, "utf-8"); const lines = text.split("\n").filter((l) => l.trim().length > 0); const records: unknown[] = []; let skipped = 0; for (const line of lines) { try { records.push(JSON.parse(line)); } catch { skipped++; } } return { records, skipped }; } /** Write an array of records as JSONL (overwrite). */ export async function writeJsonl(path: string, records: T[]): Promise { const dir = dirname(path); if (!existsSync(dir)) await mkdir(dir, { recursive: true }); const content = records.map((r) => JSON.stringify(r)).join("\n") + "\n"; await writeFile(path, content); }