SEC-cyBERT/ts/src/cli.ts
2026-03-30 22:02:52 -04:00

369 lines
12 KiB
TypeScript

import { readJsonl } from "./lib/jsonl.ts";
import { Paragraph } from "@sec-cybert/schemas/paragraph.ts";
import { Annotation } from "@sec-cybert/schemas/annotation.ts";
import { STAGE1_MODELS } from "./lib/openrouter.ts";
import { runBatch } from "./label/batch.ts";
import { runGoldenBatch } from "./label/golden.ts";
import { computeConsensus } from "./label/consensus.ts";
import { judgeParagraph } from "./label/annotate.ts";
import { appendJsonl, readJsonlRaw } from "./lib/jsonl.ts";
import { v4 as uuidv4 } from "uuid";
import { PROMPT_VERSION } from "./label/prompts.ts";
import { extract10K, extract8K, reparse10K, reparse8K, mergeTrainingData } from "./extract/pipeline.ts";
const DATA = "../data";
function usage(): never {
console.error(`Usage: bun sec <command> [options]
Commands:
extract:10k [--start-date YYYY-MM-DD] [--end-date YYYY-MM-DD] [--limit N]
extract:8k [--start-date YYYY-MM-DD] [--end-date YYYY-MM-DD] [--limit N]
extract:reparse Re-parse cached 10-K HTML files with current parser (no network)
extract:reparse-8k Re-parse cached 8-K HTML files with current parser (no network)
extract:merge Merge 10-K + 8-K, remove truncated filings, dedup → training.jsonl
label:annotate --model <id> [--limit N] [--concurrency N]
label:annotate-all [--limit N] [--concurrency N]
label:consensus
label:judge [--concurrency N]
label:golden [--paragraphs <path>] [--limit N] [--delay N] (Opus via Agent SDK)
label:cost`);
process.exit(1);
}
const [command, ...rest] = process.argv.slice(2);
if (!command) usage();
function flag(name: string): string | undefined {
const idx = rest.indexOf(`--${name}`);
if (idx === -1) return undefined;
return rest[idx + 1];
}
function flagInt(name: string, fallback: number): number {
const v = flag(name);
return v !== undefined ? parseInt(v, 10) : fallback;
}
const PARAGRAPHS_PATH = `${DATA}/paragraphs/paragraphs.jsonl`;
const PARAGRAPHS_8K_PATH = `${DATA}/paragraphs/paragraphs-8k.jsonl`;
const SESSIONS_PATH = `${DATA}/metadata/sessions.jsonl`;
async function loadParagraphs(): Promise<Paragraph[]> {
const { records, skipped } = await readJsonl(PARAGRAPHS_PATH, Paragraph);
if (skipped > 0) process.stderr.write(` ⚠ Skipped ${skipped} invalid paragraph lines\n`);
if (records.length === 0) {
process.stderr.write(` ✖ No paragraphs found at ${PARAGRAPHS_PATH}\n`);
process.exit(1);
}
process.stderr.write(` Loaded ${records.length} paragraphs\n`);
return records;
}
async function cmdAnnotate(): Promise<void> {
const modelId = flag("model");
if (!modelId) {
console.error("--model is required");
process.exit(1);
}
const paragraphs = await loadParagraphs();
const modelShort = modelId.split("/")[1]!;
await runBatch(paragraphs, {
modelId,
stage: "stage1",
outputPath: `${DATA}/annotations/stage1/${modelShort}.jsonl`,
errorsPath: `${DATA}/annotations/stage1/${modelShort}-errors.jsonl`,
sessionsPath: SESSIONS_PATH,
concurrency: flagInt("concurrency", 12),
limit: flag("limit") !== undefined ? flagInt("limit", 50) : undefined,
});
}
async function cmdAnnotateAll(): Promise<void> {
const paragraphs = await loadParagraphs();
const concurrency = flagInt("concurrency", 12);
const limit = flag("limit") !== undefined ? flagInt("limit", 50) : undefined;
for (const modelId of STAGE1_MODELS) {
const modelShort = modelId.split("/")[1]!;
process.stderr.write(`\n ═══ ${modelId} ═══\n`);
await runBatch(paragraphs, {
modelId,
stage: "stage1",
outputPath: `${DATA}/annotations/stage1/${modelShort}.jsonl`,
errorsPath: `${DATA}/annotations/stage1/${modelShort}-errors.jsonl`,
sessionsPath: SESSIONS_PATH,
concurrency,
limit,
});
}
}
async function cmdConsensus(): Promise<void> {
// Load all Stage 1 annotations
const allAnnotations: Map<string, Annotation[]> = new Map();
for (const modelId of STAGE1_MODELS) {
const modelShort = modelId.split("/")[1]!;
const path = `${DATA}/annotations/stage1/${modelShort}.jsonl`;
const { records } = await readJsonl(path, Annotation);
process.stderr.write(` Loaded ${records.length} annotations from ${modelShort}\n`);
for (const ann of records) {
const existing = allAnnotations.get(ann.paragraphId) ?? [];
existing.push(ann);
allAnnotations.set(ann.paragraphId, existing);
}
}
// Only process paragraphs with all 3 annotations
let consensus = 0;
let needsJudge = 0;
const outputPath = `${DATA}/annotations/consensus.jsonl`;
for (const [paragraphId, anns] of allAnnotations) {
if (anns.length !== 3) continue;
const { result, needsJudge: needs } = computeConsensus(paragraphId, anns);
await appendJsonl(outputPath, result);
if (needs) needsJudge++;
else consensus++;
}
const total = consensus + needsJudge;
process.stderr.write(
`\n ✓ Consensus: ${consensus}/${total} (${((consensus / total) * 100).toFixed(1)}%) agreed\n` +
` ${needsJudge} paragraphs need Stage 2 judge\n`,
);
}
async function cmdJudge(): Promise<void> {
// Load paragraphs and consensus results needing judge
const paragraphs = await loadParagraphs();
const paragraphMap = new Map(paragraphs.map((p) => [p.id, p]));
const consensusPath = `${DATA}/annotations/consensus.jsonl`;
const { records: rawConsensus } = await readJsonlRaw(consensusPath);
// Load all stage 1 annotations for lookup
const stage1Map: Map<string, Annotation[]> = new Map();
for (const modelId of STAGE1_MODELS) {
const modelShort = modelId.split("/")[1]!;
const { records } = await readJsonl(
`${DATA}/annotations/stage1/${modelShort}.jsonl`,
Annotation,
);
for (const ann of records) {
const existing = stage1Map.get(ann.paragraphId) ?? [];
existing.push(ann);
stage1Map.set(ann.paragraphId, existing);
}
}
const unresolvedIds: string[] = [];
for (const raw of rawConsensus) {
const r = raw as { paragraphId?: string; method?: string };
if (
r.method === "unresolved" &&
r.paragraphId
) {
unresolvedIds.push(r.paragraphId);
}
}
// Check what's already judged
const judgePath = `${DATA}/annotations/stage2/judge.jsonl`;
const { records: existing } = await readJsonlRaw(judgePath);
const judgedIds = new Set(
existing
.filter((r): r is { paragraphId: string } =>
!!r && typeof r === "object" && "paragraphId" in r)
.map((r) => r.paragraphId),
);
const toJudge = unresolvedIds.filter((id) => !judgedIds.has(id));
process.stderr.write(` ${toJudge.length} paragraphs to judge (${judgedIds.size} already done)\n`);
const runId = uuidv4();
let processed = 0;
for (const paragraphId of toJudge) {
const paragraph = paragraphMap.get(paragraphId);
if (!paragraph) continue;
const stage1Anns = stage1Map.get(paragraphId);
if (!stage1Anns || stage1Anns.length < 3) continue;
const priorLabels = stage1Anns.map((a) => ({
content_category: a.label.content_category,
specificity_level: a.label.specificity_level,
reasoning: a.label.reasoning,
}));
try {
const judgeAnn = await judgeParagraph(paragraph, priorLabels, {
runId,
promptVersion: PROMPT_VERSION,
});
await appendJsonl(judgePath, judgeAnn);
processed++;
if (processed % 10 === 0) {
process.stderr.write(` Judged ${processed}/${toJudge.length}\n`);
}
} catch (error) {
process.stderr.write(
` ✖ Judge error for ${paragraphId}: ${error instanceof Error ? error.message : String(error)}\n`,
);
}
}
process.stderr.write(`\n ✓ Judged ${processed} paragraphs\n`);
}
async function cmdGolden(): Promise<void> {
// Load the 1,200 human-labeled paragraph IDs from the labelapp sample
const sampledIdsPath = "../labelapp/.sampled-ids.json";
const sampledIds = new Set<string>(
JSON.parse(await import("node:fs/promises").then((fs) => fs.readFile(sampledIdsPath, "utf-8"))),
);
process.stderr.write(` Loaded ${sampledIds.size} sampled IDs from ${sampledIdsPath}\n`);
// Load patched paragraphs and filter to the human-labeled set
const paragraphsPath = flag("paragraphs") ?? `${DATA}/paragraphs/paragraphs-clean.patched.jsonl`;
const { records: allParagraphs, skipped } = await readJsonl(paragraphsPath, Paragraph);
if (skipped > 0) process.stderr.write(` ⚠ Skipped ${skipped} invalid paragraph lines\n`);
const paragraphs = allParagraphs.filter((p) => sampledIds.has(p.id));
process.stderr.write(` Matched ${paragraphs.length}/${sampledIds.size} paragraphs from ${paragraphsPath}\n`);
if (paragraphs.length === 0) {
process.stderr.write(" ✖ No matching paragraphs found\n");
process.exit(1);
}
await runGoldenBatch(paragraphs, {
outputPath: `${DATA}/annotations/golden/opus.jsonl`,
errorsPath: `${DATA}/annotations/golden/opus-errors.jsonl`,
limit: flag("limit") !== undefined ? flagInt("limit", 50) : undefined,
delayMs: flag("delay") !== undefined ? flagInt("delay", 1000) : 1000,
});
}
async function cmdCost(): Promise<void> {
const modelCosts: Record<string, { cost: number; count: number }> = {};
const stageCosts: Record<string, { cost: number; count: number }> = {};
// Stage 1
for (const modelId of STAGE1_MODELS) {
const modelShort = modelId.split("/")[1]!;
const path = `${DATA}/annotations/stage1/${modelShort}.jsonl`;
const { records } = await readJsonl(path, Annotation);
const cost = records.reduce((sum, a) => sum + a.provenance.costUsd, 0);
modelCosts[modelId] = { cost, count: records.length };
const stage = stageCosts["stage1"] ?? { cost: 0, count: 0 };
stage.cost += cost;
stage.count += records.length;
stageCosts["stage1"] = stage;
}
// Stage 2
const judgePath = `${DATA}/annotations/stage2/judge.jsonl`;
const { records: judgeRecords } = await readJsonl(judgePath, Annotation);
const judgeCost = judgeRecords.reduce((sum, a) => sum + a.provenance.costUsd, 0);
modelCosts["anthropic/claude-sonnet-4.6 (judge)"] = {
cost: judgeCost,
count: judgeRecords.length,
};
stageCosts["stage2"] = { cost: judgeCost, count: judgeRecords.length };
// Print
console.log("\n Cost Summary");
console.log(" " + "─".repeat(60));
console.log("\n By Model:");
for (const [model, { cost, count }] of Object.entries(modelCosts)) {
console.log(` ${model.padEnd(45)} $${cost.toFixed(4)} (${count} annotations)`);
}
console.log("\n By Stage:");
let total = 0;
for (const [stage, { cost, count }] of Object.entries(stageCosts)) {
console.log(` ${stage.padEnd(45)} $${cost.toFixed(4)} (${count} annotations)`);
total += cost;
}
console.log(`\n Total: $${total.toFixed(4)}`);
}
async function cmdExtract10K(): Promise<void> {
await extract10K({
outputPath: PARAGRAPHS_PATH,
errorsPath: `${DATA}/extracted/item1c/errors.jsonl`,
filingType: "10-K",
startDate: flag("start-date") ?? "2023-12-15",
endDate: flag("end-date") ?? "2025-12-31",
limit: flag("limit") !== undefined ? flagInt("limit", 100) : undefined,
});
}
async function cmdExtract8K(): Promise<void> {
await extract8K({
outputPath: PARAGRAPHS_8K_PATH,
errorsPath: `${DATA}/extracted/item105/errors.jsonl`,
filingType: "8-K",
startDate: flag("start-date") ?? "2023-12-18",
endDate: flag("end-date") ?? "2025-12-31",
limit: flag("limit") !== undefined ? flagInt("limit", 100) : undefined,
});
}
async function cmdReparse8K(): Promise<void> {
await reparse8K({ outputPath: PARAGRAPHS_8K_PATH });
}
// Dispatch
switch (command) {
case "extract:10k":
await cmdExtract10K();
break;
case "extract:8k":
await cmdExtract8K();
break;
case "extract:reparse":
await reparse10K({ outputPath: PARAGRAPHS_PATH });
break;
case "extract:reparse-8k":
await cmdReparse8K();
break;
case "extract:merge":
await mergeTrainingData({
tenKPath: PARAGRAPHS_PATH,
eightKPath: PARAGRAPHS_8K_PATH,
outputPath: `${DATA}/paragraphs/training.jsonl`,
});
break;
case "label:annotate":
await cmdAnnotate();
break;
case "label:annotate-all":
await cmdAnnotateAll();
break;
case "label:consensus":
await cmdConsensus();
break;
case "label:judge":
await cmdJudge();
break;
case "label:golden":
await cmdGolden();
break;
case "label:cost":
await cmdCost();
break;
default:
console.error(`Unknown command: ${command}`);
usage();
}