diff --git a/ts/package.json b/ts/package.json index 35100fa..8b7e1fe 100644 --- a/ts/package.json +++ b/ts/package.json @@ -4,7 +4,8 @@ "type": "module", "private": true, "devDependencies": { - "@types/bun": "latest", + "@types/bun": "^1.3.11", + "@types/node": "^25.5.0", "@types/uuid": "^11.0.0" }, "scripts": { diff --git a/ts/scripts/dapt-corpus-prep.ts b/ts/scripts/dapt-corpus-prep.ts new file mode 100644 index 0000000..d38c847 --- /dev/null +++ b/ts/scripts/dapt-corpus-prep.ts @@ -0,0 +1,192 @@ +/** + * DAPT corpus preparation: strip HTML from all cached 10-K filings + * and write clean plain text as sharded JSONL to data/dapt-corpus/. + * + * Usage: bun run ts/scripts/dapt-corpus-prep.ts + * + * Spawns N worker subprocesses (one per CPU core) for parallel regex processing. + * Each worker handles a slice of the file list, writes to a temp file. + * Main process merges results into final shards. + * + * Input: data/raw/html/*.html (14K+ cached 10-K filing HTML files) + * Output: data/dapt-corpus/shard-{00..N}.jsonl + */ +import { readdirSync, readFileSync, writeFileSync, mkdirSync, unlinkSync, createReadStream } from "node:fs"; +import { createInterface } from "node:readline"; +import { cpus } from "node:os"; +import { stripHtml } from "../src/extract/html-cleaner.ts"; + +const HTML_CACHE_DIR = "data/raw/html"; +const OUTPUT_DIR = "data/dapt-corpus"; +const DOCS_PER_SHARD = 1000; +const MIN_CHARS = 500; + +// ─── Post-HTML cleaning for DAPT corpus ─── + +function cleanForDapt(raw: string): string { + const lines = raw.split("\n"); + const cleaned: string[] = []; + + const shortLineCounts = new Map(); + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed.length > 5 && trimmed.length < 80) { + shortLineCounts.set(trimmed, (shortLineCounts.get(trimmed) ?? 0) + 1); + } + } + + for (const line of lines) { + const trimmed = line.trim(); + + if (trimmed.length === 0) { cleaned.push(""); continue; } + + if (/^\d{1,3}$/.test(trimmed)) continue; + if (/^(page\s+\d+|[-–—]\s*\d+\s*[-–—])$/i.test(trimmed)) continue; + if (/^table\s+of\s+contents?\s*$/i.test(trimmed)) continue; + + // XBRL metadata + if (/^(0000\d{6}\s|xbrli:|iso4217:|http:\/\/fasb\.org|http:\/\/xbrl\.)/.test(trimmed)) continue; + if (/^[a-z]{1,5}-\d{8}\s/.test(trimmed)) continue; + if (/http:\/\/fasb\.org\/us-gaap/.test(trimmed) && trimmed.length > 100) continue; + if (/^(FY|CY)\d{4,}/.test(trimmed) && /http:/.test(trimmed)) continue; + if (trimmed.length > 20) { + const tokens = trimmed.split(/\s+/); + const xbrlCount = tokens.filter(t => + /^(0000\d{6}|us-gaap:|dei:|srt:|[a-z]+:\s*[A-Z][a-zA-Z]+Member$|iso4217:|xbrli:|P\d+Y$|\d{4}-\d{2}-\d{2}$)/.test(t) + ).length; + if (tokens.length > 3 && xbrlCount / tokens.length > 0.5) continue; + } + + // SEC boilerplate / filenames + if (/^(10-K|10-Q|8-K)\s*$/i.test(trimmed)) continue; + if (/generated by sec publisher/i.test(trimmed)) continue; + if (/^\S+\.(htm|html)\s*$/i.test(trimmed)) continue; + if (/^\S+\.(htm|html)\s+-\s+Generated/i.test(trimmed)) continue; + + // Repeated headers + if (trimmed.length > 5 && trimmed.length < 80) { + if ((shortLineCounts.get(trimmed) ?? 0) >= 5) continue; + } + + if (/^part\s+[iv]+\s*$/i.test(trimmed) && trimmed.length < 15) continue; + if (/^\(?\s*back\s+to\s+(index|top|toc)\s*\)?$/i.test(trimmed)) continue; + if (/^index$/i.test(trimmed)) continue; + + cleaned.push(line); + } + + return cleaned.join("\n").replace(/\n{3,}/g, "\n\n").trim(); +} + +// ─── Worker mode: process a slice of files ─── + +const args = process.argv.slice(2); +if (args[0] === "--worker") { + const startIdx = parseInt(args[1]!); + const endIdx = parseInt(args[2]!); + const outFile = args[3]!; + + const htmlFiles = readdirSync(HTML_CACHE_DIR) + .filter((f: string) => f.endsWith(".html")) + .sort() + .slice(startIdx, endIdx); + + const records: string[] = []; + for (const file of htmlFiles) { + const accession = file.replace(".html", ""); + const html = readFileSync(`${HTML_CACHE_DIR}/${file}`, "utf-8"); + const text = cleanForDapt(stripHtml(html)); + if (text.length < MIN_CHARS) continue; + const words = text.split(/\s+/).length; + records.push(JSON.stringify({ text, accession, chars: text.length, words })); + } + + writeFileSync(outFile, records.join("\n") + (records.length > 0 ? "\n" : "")); + process.exit(0); +} + +// ─── Main mode: orchestrate workers ─── + +const start = Date.now(); +mkdirSync(OUTPUT_DIR, { recursive: true }); + +const htmlFiles = readdirSync(HTML_CACHE_DIR) + .filter((f: string) => f.endsWith(".html")) + .sort(); + +const nproc = cpus().length; +const chunkSize = Math.ceil(htmlFiles.length / nproc); + +process.stderr.write(` ${htmlFiles.length} HTML files, ${nproc} workers\n\n`); + +// Spawn workers +const tmpFiles: string[] = []; +const workers: ReturnType[] = []; + +for (let i = 0; i < nproc; i++) { + const startIdx = i * chunkSize; + const endIdx = Math.min(startIdx + chunkSize, htmlFiles.length); + if (startIdx >= htmlFiles.length) break; + + const tmpFile = `${OUTPUT_DIR}/.tmp-${i}.jsonl`; + tmpFiles.push(tmpFile); + + workers.push( + Bun.spawn( + ["bun", "run", import.meta.filename, "--worker", String(startIdx), String(endIdx), tmpFile], + { stderr: "inherit" }, + ) + ); +} + +// Wait for all workers +for (const worker of workers) { + await worker.exited; +} + +process.stderr.write(` Workers done, merging shards...\n`); + +// Merge tmp files into sorted shards +type DocRecord = { text: string; accession: string; chars: number; words: number }; +const allRecords: DocRecord[] = []; + +for (const tmpFile of tmpFiles) { + const rl = createInterface({ input: createReadStream(tmpFile) }); + for await (const line of rl) { + if (line.trim()) allRecords.push(JSON.parse(line)); + } +} + +allRecords.sort((a, b) => a.accession.localeCompare(b.accession)); + +// Write final shards +let shardIdx = 0; +let totalChars = 0; +let totalWords = 0; + +for (let i = 0; i < allRecords.length; i += DOCS_PER_SHARD) { + const shard = allRecords.slice(i, i + DOCS_PER_SHARD); + const name = `shard-${String(shardIdx).padStart(3, "0")}.jsonl`; + writeFileSync(`${OUTPUT_DIR}/${name}`, shard.map(r => JSON.stringify(r)).join("\n") + "\n"); + shardIdx++; + + for (const r of shard) { + totalChars += r.chars; + totalWords += r.words; + } +} + +// Cleanup tmp files +for (const tmpFile of tmpFiles) { + try { unlinkSync(tmpFile); } catch {} +} + +const elapsed = ((Date.now() - start) / 1000).toFixed(1); +const skipped = htmlFiles.length - allRecords.length; +const estTokens = (totalChars / 4 / 1e6).toFixed(0); +process.stderr.write( + `\n Done in ${elapsed}s\n` + + ` ${allRecords.length} filings -> ${shardIdx} shards\n` + + ` ${skipped} skipped (< ${MIN_CHARS} chars)\n` + + ` ${(totalChars / 1e6).toFixed(1)}M chars | ${(totalWords / 1e6).toFixed(1)}M words | ~${estTokens}M tokens (est)\n`, +);