/** * DAPT corpus preparation: strip HTML from all cached 10-K filings * and write clean plain text as sharded JSONL to data/dapt-corpus/. * * Usage: bun run ts/scripts/dapt-corpus-prep.ts * * Spawns N worker subprocesses (one per CPU core) for parallel regex processing. * Each worker handles a slice of the file list, writes to a temp file. * Main process merges results into final shards. * * Input: data/raw/html/*.html (14K+ cached 10-K filing HTML files) * Output: data/dapt-corpus/shard-{00..N}.jsonl */ import { readdirSync, readFileSync, writeFileSync, mkdirSync, unlinkSync, createReadStream } from "node:fs"; import { createInterface } from "node:readline"; import { cpus } from "node:os"; import { stripHtml } from "../src/extract/html-cleaner.ts"; const HTML_CACHE_DIR = "data/raw/html"; const OUTPUT_DIR = "data/dapt-corpus"; const DOCS_PER_SHARD = 1000; const MIN_CHARS = 500; // ─── Post-HTML cleaning for DAPT corpus ─── function cleanForDapt(raw: string): string { const lines = raw.split("\n"); const cleaned: string[] = []; const shortLineCounts = new Map(); for (const line of lines) { const trimmed = line.trim(); if (trimmed.length > 5 && trimmed.length < 80) { shortLineCounts.set(trimmed, (shortLineCounts.get(trimmed) ?? 0) + 1); } } for (const line of lines) { const trimmed = line.trim(); if (trimmed.length === 0) { cleaned.push(""); continue; } // Page numbers: bare digits, "Page N", F-N financial page markers if (/^\d{1,3}$/.test(trimmed)) continue; if (/^(page\s+\d+)$/i.test(trimmed)) continue; if (/^F-\d{1,3}$/.test(trimmed)) continue; if (/^table\s+of\s+contents?\s*$/i.test(trimmed)) continue; // XBRL metadata lines if (/^(0000\d{6}\s|xbrli:|iso4217:|http:\/\/fasb\.org|http:\/\/xbrl\.)/.test(trimmed)) continue; if (/^[a-z]{1,5}-\d{8}\s/.test(trimmed)) continue; if (/http:\/\/fasb\.org\/us-gaap/.test(trimmed) && trimmed.length > 100) continue; if (/^(FY|CY)\d{4,}/.test(trimmed) && /http:/.test(trimmed)) continue; // XBRL exhibit listing lines (101.CAL, 101.DEF, cover page XBRL, etc.) if (/xbrl/i.test(trimmed) && !/cyber|secur|risk|board|manage|disclos/i.test(trimmed)) continue; // Lines that are majority XBRL namespace tokens if (trimmed.length > 20) { const tokens = trimmed.split(/\s+/); const xbrlCount = tokens.filter(t => /^(0000\d{6}|us-gaap:|dei:|srt:|[a-z]+:\s*[A-Z][a-zA-Z]+Member$|iso4217:|xbrli:|P\d+Y$|\d{4}-\d{2}-\d{2}$)/.test(t) ).length; if (tokens.length > 3 && xbrlCount / tokens.length > 0.5) continue; } // URLs — strip inline URLs (company sites, SEC, investor relations) if (/^https?:\/\/\S+$/.test(trimmed)) continue; // standalone URL lines // SEC boilerplate / filenames if (/^(10-K|10-Q|8-K)\s*$/i.test(trimmed)) continue; if (/generated by sec publisher/i.test(trimmed)) continue; if (/^\S+\.(htm|html)\s*$/i.test(trimmed)) continue; if (/^\S+\.(htm|html)\s+-\s+Generated/i.test(trimmed)) continue; // Repeated headers (running headers/footers) if (trimmed.length > 5 && trimmed.length < 80) { if ((shortLineCounts.get(trimmed) ?? 0) >= 5) continue; } if (/^part\s+[iv]+\s*$/i.test(trimmed) && trimmed.length < 15) continue; if (/^\(?\s*back\s+to\s+(index|top|toc)\s*\)?$/i.test(trimmed)) continue; if (/^index$/i.test(trimmed)) continue; // Strip inline URLs from prose (replace with empty string) cleaned.push(line.replace(/https?:\/\/\S+/g, "")); } return cleaned.join("\n").replace(/\n{3,}/g, "\n\n").trim(); } // ─── Worker mode: process a slice of files ─── const args = process.argv.slice(2); if (args[0] === "--worker") { const startIdx = parseInt(args[1]!); const endIdx = parseInt(args[2]!); const outFile = args[3]!; const htmlFiles = readdirSync(HTML_CACHE_DIR) .filter((f: string) => f.endsWith(".html")) .sort() .slice(startIdx, endIdx); const records: string[] = []; for (const file of htmlFiles) { const accession = file.replace(".html", ""); const html = readFileSync(`${HTML_CACHE_DIR}/${file}`, "utf-8"); const text = cleanForDapt(stripHtml(html)); if (text.length < MIN_CHARS) continue; const words = text.split(/\s+/).length; records.push(JSON.stringify({ text, accession, chars: text.length, words })); } writeFileSync(outFile, records.join("\n") + (records.length > 0 ? "\n" : "")); process.exit(0); } // ─── Main mode: orchestrate workers ─── const start = Date.now(); mkdirSync(OUTPUT_DIR, { recursive: true }); const htmlFiles = readdirSync(HTML_CACHE_DIR) .filter((f: string) => f.endsWith(".html")) .sort(); const nproc = cpus().length; const chunkSize = Math.ceil(htmlFiles.length / nproc); process.stderr.write(` ${htmlFiles.length} HTML files, ${nproc} workers\n\n`); // Spawn workers const tmpFiles: string[] = []; const workers: ReturnType[] = []; for (let i = 0; i < nproc; i++) { const startIdx = i * chunkSize; const endIdx = Math.min(startIdx + chunkSize, htmlFiles.length); if (startIdx >= htmlFiles.length) break; const tmpFile = `${OUTPUT_DIR}/.tmp-${i}.jsonl`; tmpFiles.push(tmpFile); workers.push( Bun.spawn( ["bun", "run", import.meta.filename, "--worker", String(startIdx), String(endIdx), tmpFile], { stderr: "inherit" }, ) ); } // Wait for all workers for (const worker of workers) { await worker.exited; } process.stderr.write(` Workers done, merging shards...\n`); // Merge tmp files into sorted shards type DocRecord = { text: string; accession: string; chars: number; words: number }; const allRecords: DocRecord[] = []; for (const tmpFile of tmpFiles) { const rl = createInterface({ input: createReadStream(tmpFile) }); for await (const line of rl) { if (line.trim()) allRecords.push(JSON.parse(line)); } } allRecords.sort((a, b) => a.accession.localeCompare(b.accession)); // Write final shards let shardIdx = 0; let totalChars = 0; let totalWords = 0; for (let i = 0; i < allRecords.length; i += DOCS_PER_SHARD) { const shard = allRecords.slice(i, i + DOCS_PER_SHARD); const name = `shard-${String(shardIdx).padStart(3, "0")}.jsonl`; writeFileSync(`${OUTPUT_DIR}/${name}`, shard.map(r => JSON.stringify(r)).join("\n") + "\n"); shardIdx++; for (const r of shard) { totalChars += r.chars; totalWords += r.words; } } // Cleanup tmp files for (const tmpFile of tmpFiles) { try { unlinkSync(tmpFile); } catch {} } const elapsed = ((Date.now() - start) / 1000).toFixed(1); const skipped = htmlFiles.length - allRecords.length; const estTokens = (totalChars / 4 / 1e6).toFixed(0); process.stderr.write( `\n Done in ${elapsed}s\n` + ` ${allRecords.length} filings -> ${shardIdx} shards\n` + ` ${skipped} skipped (< ${MIN_CHARS} chars)\n` + ` ${(totalChars / 1e6).toFixed(1)}M chars | ${(totalWords / 1e6).toFixed(1)}M words | ~${estTokens}M tokens (est)\n`, );