SEC-cyBERT/python/audit_corpus.py
2026-03-29 20:33:39 -04:00

249 lines
9.1 KiB
Python

"""
Quality audit of the SEC-cyBERT DAPT training corpus.
Reads sharded JSONL files and performs qualitative checks on document content.
READ-ONLY — does not modify any files.
"""
import json
import os
import random
import re
import sys
from pathlib import Path
CORPUS_DIR = Path(__file__).resolve().parent.parent / "data" / "dapt-corpus"
SHARDS = sorted(CORPUS_DIR.glob("shard-*.jsonl"))
random.seed(42)
def load_all_docs() -> list[dict]:
"""Load all documents from all shards."""
docs = []
for shard in SHARDS:
with open(shard) as f:
for line in f:
line = line.strip()
if line:
docs.append(json.loads(line))
return docs
def separator(title: str) -> None:
print("\n" + "=" * 80)
print(f" {title}")
print("=" * 80 + "\n")
def audit_smallest(docs: list[dict]) -> None:
separator("1. SMALLEST 20 DOCUMENTS (by chars)")
sorted_docs = sorted(docs, key=lambda d: d["chars"])
for i, doc in enumerate(sorted_docs[:20], 1):
text = doc["text"]
print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} | words={doc['words']} ---")
# Show full text for tiny docs, cap at 2000 chars
display = text if len(text) <= 2000 else text[:2000] + "\n... [TRUNCATED]"
print(display)
print()
def audit_largest(docs: list[dict]) -> None:
separator("2. LARGEST 5 DOCUMENTS (first/last 500 chars)")
sorted_docs = sorted(docs, key=lambda d: d["chars"], reverse=True)
for i, doc in enumerate(sorted_docs[:5], 1):
text = doc["text"]
print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} | words={doc['words']} ---")
print("FIRST 500 CHARS:")
print(text[:500])
print("\n... [GAP] ...\n")
print("LAST 500 CHARS:")
print(text[-500:])
print()
def audit_mid_samples(docs: list[dict]) -> None:
separator("3. RANDOM MID-DOCUMENT SAMPLES (10 docs, 500 chars from 50% point)")
sample = random.sample(docs, 10)
for i, doc in enumerate(sample, 1):
text = doc["text"]
mid = len(text) // 2
start = max(0, mid - 250)
end = min(len(text), mid + 250)
print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} ---")
print(text[start:end])
print()
def audit_xbrl_contamination(docs: list[dict]) -> None:
separator("4. XBRL-CONTAMINATED STARTS (first 200 chars with XBRL patterns)")
xbrl_pattern = re.compile(
r"(0000\d{6}|xbrli:|fasb\.org|us-gaap:|dei:|srt:|^\d{4}-\d{2}-\d{2}\s*$)",
re.MULTILINE,
)
found = []
for doc in docs:
first200 = doc["text"][:200]
if xbrl_pattern.search(first200):
found.append(doc)
if len(found) >= 10:
break
if not found:
print("No XBRL-contaminated documents found in initial scan.")
print("Trying broader pattern...")
# Try a broader search
broad_pattern = re.compile(r"(xmlns|xbrl|0001\d{6})", re.IGNORECASE)
for doc in docs:
first200 = doc["text"][:200]
if broad_pattern.search(first200):
found.append(doc)
if len(found) >= 10:
break
for i, doc in enumerate(found[:10], 1):
text = doc["text"]
print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} ---")
print("FIRST 500 CHARS:")
print(text[:500])
# Find where XBRL junk ends and real text begins
# Look for "UNITED STATES" or "FORM 10-K" as transition marker
for marker in ["UNITED STATES", "FORM 10-K", "FORM 10-k", "ANNUAL REPORT"]:
idx = text.find(marker)
if idx > 0 and idx < 5000:
print(f"\n >> Transition to real text at char {idx} (marker: '{marker}')")
break
print()
def audit_short_lines(docs: list[dict]) -> None:
separator("5. DOCS WITH MOST SHORT LINES (<10 chars, excluding empty)")
scored = []
for doc in docs:
lines = doc["text"].split("\n")
non_empty = [l for l in lines if l.strip()]
short = [l for l in non_empty if 0 < len(l.strip()) < 10]
if non_empty:
ratio = len(short) / len(non_empty)
scored.append((ratio, len(short), len(non_empty), doc, short))
scored.sort(key=lambda x: x[0], reverse=True)
for i, (ratio, n_short, n_total, doc, short_lines) in enumerate(scored[:10], 1):
print(
f"--- #{i} | accession={doc['accession']} | ratio={ratio:.2%} "
f"| {n_short}/{n_total} short lines ---"
)
# Show 20 short lines with surrounding context
text = doc["text"]
lines = text.split("\n")
shown = 0
for j, line in enumerate(lines):
stripped = line.strip()
if 0 < len(stripped) < 10 and shown < 20:
# Show line with 1 line of context on each side
ctx_start = max(0, j - 1)
ctx_end = min(len(lines), j + 2)
for k in range(ctx_start, ctx_end):
prefix = ">>>" if k == j else " "
print(f" {prefix} L{k+1}: {lines[k][:100]}")
print()
shown += 1
print()
def audit_transitions(docs: list[dict]) -> None:
separator("6. TRANSITION ZONES (SEC cover page -> company content)")
# Find docs that have the SEC header
candidates = [d for d in docs if "SECURITIES AND EXCHANGE COMMISSION" in d["text"][:2000]]
sample = random.sample(candidates, min(5, len(candidates)))
for i, doc in enumerate(sample, 1):
text = doc["text"]
idx = text.find("SECURITIES AND EXCHANGE COMMISSION")
if idx < 0:
continue
# Find end of cover page area — look for company-specific content markers
# like "Item 1" or "PART I" or "Table of Contents"
transition_markers = ["Item 1", "ITEM 1", "PART I", "TABLE OF CONTENTS", "Table of Contents"]
transition_idx = -1
for marker in transition_markers:
t = text.find(marker, idx + 100)
if t > 0 and (transition_idx < 0 or t < transition_idx):
transition_idx = t
if transition_idx > 0:
start = max(0, transition_idx - 250)
end = min(len(text), transition_idx + 250)
print(f"--- #{i} | accession={doc['accession']} ---")
print(f"Cover page at char {idx}, transition at char {transition_idx}")
print(f"SHOWING chars {start}-{end}:")
print(text[start:end])
else:
# Just show around the SEC header
start = max(0, idx - 50)
end = min(len(text), idx + 450)
print(f"--- #{i} | accession={doc['accession']} ---")
print(f"Cover page at char {idx}, no clear transition marker found")
print(text[start:end])
print()
def audit_financial_tables(docs: list[dict]) -> None:
separator("7. FINANCIAL TABLE QUALITY (>30% lines with $ or mostly numeric)")
scored = []
dollar_or_numeric = re.compile(r"(\$|^\s*[\d,.\-()]+\s*$)")
for doc in docs:
lines = doc["text"].split("\n")
non_empty = [l for l in lines if l.strip()]
if not non_empty:
continue
matching = sum(1 for l in non_empty if dollar_or_numeric.search(l))
ratio = matching / len(non_empty)
if ratio > 0.30:
scored.append((ratio, doc))
scored.sort(key=lambda x: x[0], reverse=True)
for i, (ratio, doc) in enumerate(scored[:5], 1):
text = doc["text"]
print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} | numeric ratio={ratio:.1%} ---")
# Find a dense numeric section
lines = text.split("\n")
# Find a window of 20 lines with the most dollar/numeric content
best_start = 0
best_count = 0
window = 20
for j in range(len(lines) - window):
count = sum(1 for l in lines[j : j + window] if dollar_or_numeric.search(l))
if count > best_count:
best_count = count
best_start = j
print(f"DENSEST 20-LINE WINDOW (starting at line {best_start + 1}, {best_count}/{window} numeric):")
for l in lines[best_start : best_start + window]:
print(f" | {l[:120]}")
print()
def audit_endings(docs: list[dict]) -> None:
separator("8. END-OF-DOCUMENT QUALITY (last 300 chars of 15 random docs)")
sample = random.sample(docs, 15)
for i, doc in enumerate(sample, 1):
text = doc["text"]
print(f"--- #{i} | accession={doc['accession']} | chars={doc['chars']} ---")
print(text[-300:])
print()
def main() -> None:
print("Loading all documents from corpus...")
docs = load_all_docs()
print(f"Loaded {len(docs)} documents from {len(SHARDS)} shards.\n")
audit_smallest(docs)
audit_largest(docs)
audit_mid_samples(docs)
audit_xbrl_contamination(docs)
audit_short_lines(docs)
audit_transitions(docs)
audit_financial_tables(docs)
audit_endings(docs)
separator("AUDIT COMPLETE")
print(f"Total documents audited: {len(docs)}")
if __name__ == "__main__":
main()