initial dapt prep
This commit is contained in:
parent
273a862cdf
commit
c4d7732c87
@ -4,7 +4,8 @@
|
||||
"type": "module",
|
||||
"private": true,
|
||||
"devDependencies": {
|
||||
"@types/bun": "latest",
|
||||
"@types/bun": "^1.3.11",
|
||||
"@types/node": "^25.5.0",
|
||||
"@types/uuid": "^11.0.0"
|
||||
},
|
||||
"scripts": {
|
||||
|
||||
192
ts/scripts/dapt-corpus-prep.ts
Normal file
192
ts/scripts/dapt-corpus-prep.ts
Normal file
@ -0,0 +1,192 @@
|
||||
/**
|
||||
* DAPT corpus preparation: strip HTML from all cached 10-K filings
|
||||
* and write clean plain text as sharded JSONL to data/dapt-corpus/.
|
||||
*
|
||||
* Usage: bun run ts/scripts/dapt-corpus-prep.ts
|
||||
*
|
||||
* Spawns N worker subprocesses (one per CPU core) for parallel regex processing.
|
||||
* Each worker handles a slice of the file list, writes to a temp file.
|
||||
* Main process merges results into final shards.
|
||||
*
|
||||
* Input: data/raw/html/*.html (14K+ cached 10-K filing HTML files)
|
||||
* Output: data/dapt-corpus/shard-{00..N}.jsonl
|
||||
*/
|
||||
import { readdirSync, readFileSync, writeFileSync, mkdirSync, unlinkSync, createReadStream } from "node:fs";
|
||||
import { createInterface } from "node:readline";
|
||||
import { cpus } from "node:os";
|
||||
import { stripHtml } from "../src/extract/html-cleaner.ts";
|
||||
|
||||
const HTML_CACHE_DIR = "data/raw/html";
|
||||
const OUTPUT_DIR = "data/dapt-corpus";
|
||||
const DOCS_PER_SHARD = 1000;
|
||||
const MIN_CHARS = 500;
|
||||
|
||||
// ─── Post-HTML cleaning for DAPT corpus ───
|
||||
|
||||
function cleanForDapt(raw: string): string {
|
||||
const lines = raw.split("\n");
|
||||
const cleaned: string[] = [];
|
||||
|
||||
const shortLineCounts = new Map<string, number>();
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim();
|
||||
if (trimmed.length > 5 && trimmed.length < 80) {
|
||||
shortLineCounts.set(trimmed, (shortLineCounts.get(trimmed) ?? 0) + 1);
|
||||
}
|
||||
}
|
||||
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim();
|
||||
|
||||
if (trimmed.length === 0) { cleaned.push(""); continue; }
|
||||
|
||||
if (/^\d{1,3}$/.test(trimmed)) continue;
|
||||
if (/^(page\s+\d+|[-–—]\s*\d+\s*[-–—])$/i.test(trimmed)) continue;
|
||||
if (/^table\s+of\s+contents?\s*$/i.test(trimmed)) continue;
|
||||
|
||||
// XBRL metadata
|
||||
if (/^(0000\d{6}\s|xbrli:|iso4217:|http:\/\/fasb\.org|http:\/\/xbrl\.)/.test(trimmed)) continue;
|
||||
if (/^[a-z]{1,5}-\d{8}\s/.test(trimmed)) continue;
|
||||
if (/http:\/\/fasb\.org\/us-gaap/.test(trimmed) && trimmed.length > 100) continue;
|
||||
if (/^(FY|CY)\d{4,}/.test(trimmed) && /http:/.test(trimmed)) continue;
|
||||
if (trimmed.length > 20) {
|
||||
const tokens = trimmed.split(/\s+/);
|
||||
const xbrlCount = tokens.filter(t =>
|
||||
/^(0000\d{6}|us-gaap:|dei:|srt:|[a-z]+:\s*[A-Z][a-zA-Z]+Member$|iso4217:|xbrli:|P\d+Y$|\d{4}-\d{2}-\d{2}$)/.test(t)
|
||||
).length;
|
||||
if (tokens.length > 3 && xbrlCount / tokens.length > 0.5) continue;
|
||||
}
|
||||
|
||||
// SEC boilerplate / filenames
|
||||
if (/^(10-K|10-Q|8-K)\s*$/i.test(trimmed)) continue;
|
||||
if (/generated by sec publisher/i.test(trimmed)) continue;
|
||||
if (/^\S+\.(htm|html)\s*$/i.test(trimmed)) continue;
|
||||
if (/^\S+\.(htm|html)\s+-\s+Generated/i.test(trimmed)) continue;
|
||||
|
||||
// Repeated headers
|
||||
if (trimmed.length > 5 && trimmed.length < 80) {
|
||||
if ((shortLineCounts.get(trimmed) ?? 0) >= 5) continue;
|
||||
}
|
||||
|
||||
if (/^part\s+[iv]+\s*$/i.test(trimmed) && trimmed.length < 15) continue;
|
||||
if (/^\(?\s*back\s+to\s+(index|top|toc)\s*\)?$/i.test(trimmed)) continue;
|
||||
if (/^index$/i.test(trimmed)) continue;
|
||||
|
||||
cleaned.push(line);
|
||||
}
|
||||
|
||||
return cleaned.join("\n").replace(/\n{3,}/g, "\n\n").trim();
|
||||
}
|
||||
|
||||
// ─── Worker mode: process a slice of files ───
|
||||
|
||||
const args = process.argv.slice(2);
|
||||
if (args[0] === "--worker") {
|
||||
const startIdx = parseInt(args[1]!);
|
||||
const endIdx = parseInt(args[2]!);
|
||||
const outFile = args[3]!;
|
||||
|
||||
const htmlFiles = readdirSync(HTML_CACHE_DIR)
|
||||
.filter((f: string) => f.endsWith(".html"))
|
||||
.sort()
|
||||
.slice(startIdx, endIdx);
|
||||
|
||||
const records: string[] = [];
|
||||
for (const file of htmlFiles) {
|
||||
const accession = file.replace(".html", "");
|
||||
const html = readFileSync(`${HTML_CACHE_DIR}/${file}`, "utf-8");
|
||||
const text = cleanForDapt(stripHtml(html));
|
||||
if (text.length < MIN_CHARS) continue;
|
||||
const words = text.split(/\s+/).length;
|
||||
records.push(JSON.stringify({ text, accession, chars: text.length, words }));
|
||||
}
|
||||
|
||||
writeFileSync(outFile, records.join("\n") + (records.length > 0 ? "\n" : ""));
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
// ─── Main mode: orchestrate workers ───
|
||||
|
||||
const start = Date.now();
|
||||
mkdirSync(OUTPUT_DIR, { recursive: true });
|
||||
|
||||
const htmlFiles = readdirSync(HTML_CACHE_DIR)
|
||||
.filter((f: string) => f.endsWith(".html"))
|
||||
.sort();
|
||||
|
||||
const nproc = cpus().length;
|
||||
const chunkSize = Math.ceil(htmlFiles.length / nproc);
|
||||
|
||||
process.stderr.write(` ${htmlFiles.length} HTML files, ${nproc} workers\n\n`);
|
||||
|
||||
// Spawn workers
|
||||
const tmpFiles: string[] = [];
|
||||
const workers: ReturnType<typeof Bun.spawn>[] = [];
|
||||
|
||||
for (let i = 0; i < nproc; i++) {
|
||||
const startIdx = i * chunkSize;
|
||||
const endIdx = Math.min(startIdx + chunkSize, htmlFiles.length);
|
||||
if (startIdx >= htmlFiles.length) break;
|
||||
|
||||
const tmpFile = `${OUTPUT_DIR}/.tmp-${i}.jsonl`;
|
||||
tmpFiles.push(tmpFile);
|
||||
|
||||
workers.push(
|
||||
Bun.spawn(
|
||||
["bun", "run", import.meta.filename, "--worker", String(startIdx), String(endIdx), tmpFile],
|
||||
{ stderr: "inherit" },
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// Wait for all workers
|
||||
for (const worker of workers) {
|
||||
await worker.exited;
|
||||
}
|
||||
|
||||
process.stderr.write(` Workers done, merging shards...\n`);
|
||||
|
||||
// Merge tmp files into sorted shards
|
||||
type DocRecord = { text: string; accession: string; chars: number; words: number };
|
||||
const allRecords: DocRecord[] = [];
|
||||
|
||||
for (const tmpFile of tmpFiles) {
|
||||
const rl = createInterface({ input: createReadStream(tmpFile) });
|
||||
for await (const line of rl) {
|
||||
if (line.trim()) allRecords.push(JSON.parse(line));
|
||||
}
|
||||
}
|
||||
|
||||
allRecords.sort((a, b) => a.accession.localeCompare(b.accession));
|
||||
|
||||
// Write final shards
|
||||
let shardIdx = 0;
|
||||
let totalChars = 0;
|
||||
let totalWords = 0;
|
||||
|
||||
for (let i = 0; i < allRecords.length; i += DOCS_PER_SHARD) {
|
||||
const shard = allRecords.slice(i, i + DOCS_PER_SHARD);
|
||||
const name = `shard-${String(shardIdx).padStart(3, "0")}.jsonl`;
|
||||
writeFileSync(`${OUTPUT_DIR}/${name}`, shard.map(r => JSON.stringify(r)).join("\n") + "\n");
|
||||
shardIdx++;
|
||||
|
||||
for (const r of shard) {
|
||||
totalChars += r.chars;
|
||||
totalWords += r.words;
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup tmp files
|
||||
for (const tmpFile of tmpFiles) {
|
||||
try { unlinkSync(tmpFile); } catch {}
|
||||
}
|
||||
|
||||
const elapsed = ((Date.now() - start) / 1000).toFixed(1);
|
||||
const skipped = htmlFiles.length - allRecords.length;
|
||||
const estTokens = (totalChars / 4 / 1e6).toFixed(0);
|
||||
process.stderr.write(
|
||||
`\n Done in ${elapsed}s\n` +
|
||||
` ${allRecords.length} filings -> ${shardIdx} shards\n` +
|
||||
` ${skipped} skipped (< ${MIN_CHARS} chars)\n` +
|
||||
` ${(totalChars / 1e6).toFixed(1)}M chars | ${(totalWords / 1e6).toFixed(1)}M words | ~${estTokens}M tokens (est)\n`,
|
||||
);
|
||||
Loading…
x
Reference in New Issue
Block a user