""" Quality audit of the SEC-cyBERT DAPT training corpus. Reads sharded JSONL files and performs qualitative checks on document content. READ-ONLY — does not modify any files. """ import json import os import random import re import sys from pathlib import Path CORPUS_DIR = Path(__file__).resolve().parent.parent / "data" / "dapt-corpus" SHARDS = sorted(CORPUS_DIR.glob("shard-*.jsonl")) random.seed(42) def load_all_docs() -> list[dict]: """Load all documents from all shards.""" docs = [] for shard in SHARDS: with open(shard) as f: for line in f: line = line.strip() if line: docs.append(json.loads(line)) return docs def separator(title: str) -> None: print("\n" + "=" * 80) print(f" {title}") print("=" * 80 + "\n") def audit_smallest(docs: list[dict]) -> None: separator("1. SMALLEST 20 DOCUMENTS (by chars)") sorted_docs = sorted(docs, key=lambda d: d["chars"]) for i, doc in enumerate(sorted_docs[:20], 1): text = doc["text"] print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} | words={doc['words']} ---") # Show full text for tiny docs, cap at 2000 chars display = text if len(text) <= 2000 else text[:2000] + "\n... [TRUNCATED]" print(display) print() def audit_largest(docs: list[dict]) -> None: separator("2. LARGEST 5 DOCUMENTS (first/last 500 chars)") sorted_docs = sorted(docs, key=lambda d: d["chars"], reverse=True) for i, doc in enumerate(sorted_docs[:5], 1): text = doc["text"] print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} | words={doc['words']} ---") print("FIRST 500 CHARS:") print(text[:500]) print("\n... [GAP] ...\n") print("LAST 500 CHARS:") print(text[-500:]) print() def audit_mid_samples(docs: list[dict]) -> None: separator("3. RANDOM MID-DOCUMENT SAMPLES (10 docs, 500 chars from 50% point)") sample = random.sample(docs, 10) for i, doc in enumerate(sample, 1): text = doc["text"] mid = len(text) // 2 start = max(0, mid - 250) end = min(len(text), mid + 250) print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} ---") print(text[start:end]) print() def audit_xbrl_contamination(docs: list[dict]) -> None: separator("4. XBRL-CONTAMINATED STARTS (first 200 chars with XBRL patterns)") xbrl_pattern = re.compile( r"(0000\d{6}|xbrli:|fasb\.org|us-gaap:|dei:|srt:|^\d{4}-\d{2}-\d{2}\s*$)", re.MULTILINE, ) found = [] for doc in docs: first200 = doc["text"][:200] if xbrl_pattern.search(first200): found.append(doc) if len(found) >= 10: break if not found: print("No XBRL-contaminated documents found in initial scan.") print("Trying broader pattern...") # Try a broader search broad_pattern = re.compile(r"(xmlns|xbrl|0001\d{6})", re.IGNORECASE) for doc in docs: first200 = doc["text"][:200] if broad_pattern.search(first200): found.append(doc) if len(found) >= 10: break for i, doc in enumerate(found[:10], 1): text = doc["text"] print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} ---") print("FIRST 500 CHARS:") print(text[:500]) # Find where XBRL junk ends and real text begins # Look for "UNITED STATES" or "FORM 10-K" as transition marker for marker in ["UNITED STATES", "FORM 10-K", "FORM 10-k", "ANNUAL REPORT"]: idx = text.find(marker) if idx > 0 and idx < 5000: print(f"\n >> Transition to real text at char {idx} (marker: '{marker}')") break print() def audit_short_lines(docs: list[dict]) -> None: separator("5. DOCS WITH MOST SHORT LINES (<10 chars, excluding empty)") scored = [] for doc in docs: lines = doc["text"].split("\n") non_empty = [l for l in lines if l.strip()] short = [l for l in non_empty if 0 < len(l.strip()) < 10] if non_empty: ratio = len(short) / len(non_empty) scored.append((ratio, len(short), len(non_empty), doc, short)) scored.sort(key=lambda x: x[0], reverse=True) for i, (ratio, n_short, n_total, doc, short_lines) in enumerate(scored[:10], 1): print( f"--- #{i} | accession={doc['accession']} | ratio={ratio:.2%} " f"| {n_short}/{n_total} short lines ---" ) # Show 20 short lines with surrounding context text = doc["text"] lines = text.split("\n") shown = 0 for j, line in enumerate(lines): stripped = line.strip() if 0 < len(stripped) < 10 and shown < 20: # Show line with 1 line of context on each side ctx_start = max(0, j - 1) ctx_end = min(len(lines), j + 2) for k in range(ctx_start, ctx_end): prefix = ">>>" if k == j else " " print(f" {prefix} L{k+1}: {lines[k][:100]}") print() shown += 1 print() def audit_transitions(docs: list[dict]) -> None: separator("6. TRANSITION ZONES (SEC cover page -> company content)") # Find docs that have the SEC header candidates = [d for d in docs if "SECURITIES AND EXCHANGE COMMISSION" in d["text"][:2000]] sample = random.sample(candidates, min(5, len(candidates))) for i, doc in enumerate(sample, 1): text = doc["text"] idx = text.find("SECURITIES AND EXCHANGE COMMISSION") if idx < 0: continue # Find end of cover page area — look for company-specific content markers # like "Item 1" or "PART I" or "Table of Contents" transition_markers = ["Item 1", "ITEM 1", "PART I", "TABLE OF CONTENTS", "Table of Contents"] transition_idx = -1 for marker in transition_markers: t = text.find(marker, idx + 100) if t > 0 and (transition_idx < 0 or t < transition_idx): transition_idx = t if transition_idx > 0: start = max(0, transition_idx - 250) end = min(len(text), transition_idx + 250) print(f"--- #{i} | accession={doc['accession']} ---") print(f"Cover page at char {idx}, transition at char {transition_idx}") print(f"SHOWING chars {start}-{end}:") print(text[start:end]) else: # Just show around the SEC header start = max(0, idx - 50) end = min(len(text), idx + 450) print(f"--- #{i} | accession={doc['accession']} ---") print(f"Cover page at char {idx}, no clear transition marker found") print(text[start:end]) print() def audit_financial_tables(docs: list[dict]) -> None: separator("7. FINANCIAL TABLE QUALITY (>30% lines with $ or mostly numeric)") scored = [] dollar_or_numeric = re.compile(r"(\$|^\s*[\d,.\-()]+\s*$)") for doc in docs: lines = doc["text"].split("\n") non_empty = [l for l in lines if l.strip()] if not non_empty: continue matching = sum(1 for l in non_empty if dollar_or_numeric.search(l)) ratio = matching / len(non_empty) if ratio > 0.30: scored.append((ratio, doc)) scored.sort(key=lambda x: x[0], reverse=True) for i, (ratio, doc) in enumerate(scored[:5], 1): text = doc["text"] print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} | numeric ratio={ratio:.1%} ---") # Find a dense numeric section lines = text.split("\n") # Find a window of 20 lines with the most dollar/numeric content best_start = 0 best_count = 0 window = 20 for j in range(len(lines) - window): count = sum(1 for l in lines[j : j + window] if dollar_or_numeric.search(l)) if count > best_count: best_count = count best_start = j print(f"DENSEST 20-LINE WINDOW (starting at line {best_start + 1}, {best_count}/{window} numeric):") for l in lines[best_start : best_start + window]: print(f" | {l[:120]}") print() def audit_endings(docs: list[dict]) -> None: separator("8. END-OF-DOCUMENT QUALITY (last 300 chars of 15 random docs)") sample = random.sample(docs, 15) for i, doc in enumerate(sample, 1): text = doc["text"] print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} ---") print(text[-300:]) print() def main() -> None: print("Loading all documents from corpus...") docs = load_all_docs() print(f"Loaded {len(docs)} documents from {len(SHARDS)} shards.\n") audit_smallest(docs) audit_largest(docs) audit_mid_samples(docs) audit_xbrl_contamination(docs) audit_short_lines(docs) audit_transitions(docs) audit_financial_tables(docs) audit_endings(docs) separator("AUDIT COMPLETE") print(f"Total documents audited: {len(docs)}") if __name__ == "__main__": main()