SEC-cyBERT/scripts/analyze_generator_quality.py
2026-03-29 20:33:39 -04:00

335 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Quantify how EFiling/XDX generator quality issues affect the annotated paragraph set.
READ-ONLY analysis — does not modify any files.
"""
import json
import re
import sys
from collections import Counter, defaultdict
from pathlib import Path
# Reuse detect_generator from the existing script
sys.path.insert(0, str(Path(__file__).parent))
from detect_generators import detect_generator
# Paths
HTML_DIR = Path("/home/joey/Documents/sec-cyBERT/data/raw/html")
PARAGRAPHS_PATH = Path("/home/joey/Documents/sec-cyBERT/data/paragraphs/paragraphs-clean.jsonl")
ANNOTATIONS_PATH = Path("/home/joey/Documents/sec-cyBERT/data/annotations/stage1.jsonl")
SEP = "=" * 100
def load_paragraphs():
"""Load paragraphs, return dict: id -> paragraph dict."""
paragraphs = {}
with open(PARAGRAPHS_PATH) as f:
for line in f:
p = json.loads(line)
paragraphs[p["id"]] = p
return paragraphs
def load_annotations():
"""Load annotations, return dict: paragraphId -> annotation dict."""
annotations = {}
with open(ANNOTATIONS_PATH) as f:
for line in f:
a = json.loads(line)
pid = a["paragraphId"]
# Keep the first annotation per paragraph (or overwrite — doesn't matter for counts)
annotations[pid] = a
return annotations
def detect_all_generators():
"""Detect generators for all HTML files. Return dict: accession -> generator."""
accession_to_gen = {}
files = sorted(HTML_DIR.glob("*.html"))
total = len(files)
for i, fp in enumerate(files):
accession = fp.stem
gen, _evidence = detect_generator(str(fp))
accession_to_gen[accession] = gen
if (i + 1) % 3000 == 0:
print(f" Scanned {i + 1}/{total} HTML files...", file=sys.stderr)
print(f" Scanned {total}/{total} HTML files.", file=sys.stderr)
return accession_to_gen
def starts_lowercase(text: str) -> bool:
"""True if text starts with a lowercase letter (orphan word candidate)."""
if not text:
return False
return text[0].islower()
def is_list_item(text: str) -> bool:
"""True if text looks like a list item (starts with bullet, dash, number+period, etc.)."""
stripped = text.strip()
if not stripped:
return False
# Common list patterns: "- ", "• ", "* ", "1. ", "a) ", "(a) ", "(i) "
if re.match(r'^[-•*▪◦]\s', stripped):
return True
if re.match(r'^\d+[.)]\s', stripped):
return True
if re.match(r'^\([a-z0-9ivx]+\)\s', stripped, re.I):
return True
if re.match(r'^[a-z][.)]\s', stripped):
return True
return False
def looks_like_inlined_header(text: str) -> bool:
"""
True if text starts with a section heading run into body text, e.g.:
"Risk Management and Strategy We recognize the importance..."
"Cybersecurity Governance Our Board of Directors oversees..."
Key distinction from normal sentences: the heading portion is a noun phrase
(not a full sentence subject like "Our Board" or "The Company"), and is
immediately followed by a new sentence that starts a different thought.
We look for known SEC cybersecurity section heading patterns followed by
body text starting with a capital letter (new sentence) with no punctuation
separating them (no period, colon, or newline — just a space).
"""
# Known heading patterns for SEC Item 1C disclosures
heading_patterns = [
r'(?:Cybersecurity\s+)?Risk\s+Management(?:\s+and\s+Strategy)?',
r'(?:Cybersecurity\s+)?Governance(?:\s+and\s+Risk\s+Management)?',
r'Cybersecurity\s+Governance',
r'Cybersecurity\s+Risk\s+Management\s+and\s+Strategy',
r'Board\s+Oversight(?:\s+of\s+(?:Risks?\s+from\s+)?Cybersecurity(?:\s+(?:Threats?|Risks?))?)?',
r'Management(?:\'s)?\s+Role\s+in\s+(?:Managing\s+)?Cybersecurity',
r'Governance\s+(?:Related\s+to|Oversight\s+of)\s+Cybersecurity(?:\s+Risks?)?',
r'Impact\s+of\s+Cybersecurity\s+(?:Risks?|Threats?)',
r'Cybersecurity\s+(?:Strategy|Overview|Program)',
r'(?:Management\s+and|Management|Governance)\s+(?:Strategy|Overview)',
r'Risk\s+Factors?',
r'Oversight\s+of\s+Cybersecurity\s+Risk\s+Management',
]
for pat in heading_patterns:
# Heading immediately followed by body text (capital letter starting new sentence)
m = re.match(rf'^({pat})\s+([A-Z])', text)
if m:
return True
# Also catch heading followed by lowercase (rarer but possible)
m = re.match(rf'^({pat})\s+([a-z])', text)
if m:
return True
return False
def main():
print("Loading data...")
paragraphs = load_paragraphs()
annotations = load_annotations()
print(f" Paragraphs: {len(paragraphs):,}")
print(f" Annotations: {len(annotations):,}")
# Unique annotated paragraph IDs
annotated_ids = set(annotations.keys()) & set(paragraphs.keys())
print(f" Annotated paragraphs with matching paragraph data: {len(annotated_ids):,}")
print("\nDetecting generators for all HTML files...")
accession_to_gen = detect_all_generators()
print(f" HTML files scanned: {len(accession_to_gen):,}")
# Map each paragraph to its generator
para_to_gen = {}
missing_accessions = set()
for pid, p in paragraphs.items():
acc = p["filing"]["accessionNumber"]
gen = accession_to_gen.get(acc)
if gen is None:
missing_accessions.add(acc)
gen = "NO_HTML_FILE"
para_to_gen[pid] = gen
if missing_accessions:
print(f"\n WARNING: {len(missing_accessions)} accession numbers in paragraphs have no HTML file")
# =====================================================================
# SECTION 1: Annotated paragraphs by generator
# =====================================================================
print(f"\n{SEP}")
print("SECTION 1: Annotated paragraphs by generator")
print(SEP)
ann_gen_counts = Counter()
for pid in annotated_ids:
ann_gen_counts[para_to_gen[pid]] += 1
total_ann = len(annotated_ids)
print(f"\n{'Generator':<50} {'Count':>7} {'%':>7}")
print("-" * 70)
for gen, count in ann_gen_counts.most_common():
pct = count / total_ann * 100
print(f"{gen:<50} {count:>7} {pct:>6.1f}%")
print("-" * 70)
print(f"{'TOTAL':<50} {total_ann:>7} {100.0:>6.1f}%")
# =====================================================================
# SECTION 2: Lowercase-start (orphan word) analysis for annotated set
# =====================================================================
print(f"\n{SEP}")
print("SECTION 2: Lowercase-start paragraphs in annotated set")
print(SEP)
# All annotated lowercase-start
ann_lc = {pid for pid in annotated_ids if starts_lowercase(paragraphs[pid]["text"])}
ann_lc_nonlist = {pid for pid in ann_lc if not is_list_item(paragraphs[pid]["text"])}
print(f"\nAnnotated paragraphs starting with lowercase: {len(ann_lc):,} / {total_ann:,} ({len(ann_lc)/total_ann*100:.2f}%)")
print(f" Of those, excluding list items: {len(ann_lc_nonlist):,} ({len(ann_lc_nonlist)/total_ann*100:.2f}%)")
# Breakdown by generator for lowercase-start non-list
lc_by_gen = Counter()
for pid in ann_lc_nonlist:
lc_by_gen[para_to_gen[pid]] += 1
print(f"\n{'Generator':<50} {'LC-start':>9} {'Total ann':>10} {'% of gen':>9}")
print("-" * 85)
for gen, _ in ann_gen_counts.most_common():
lc_count = lc_by_gen.get(gen, 0)
gen_total = ann_gen_counts[gen]
pct = lc_count / gen_total * 100 if gen_total else 0
if lc_count > 0:
print(f"{gen:<50} {lc_count:>9} {gen_total:>10} {pct:>8.1f}%")
# Specific callouts
efiling_gens = {"EFiling/EDGAR Agent", "EFiling XDX"}
efiling_ann = {pid for pid in annotated_ids if para_to_gen[pid] in efiling_gens}
efiling_lc = {pid for pid in ann_lc_nonlist if para_to_gen[pid] in efiling_gens}
compsci_ann = {pid for pid in annotated_ids if para_to_gen[pid] == "CompSci Transform"}
compsci_lc = {pid for pid in ann_lc_nonlist if para_to_gen[pid] == "CompSci Transform"}
print(f"\n--- Specific callouts ---")
print(f"EFiling/XDX annotated paragraphs starting lowercase (non-list): {len(efiling_lc):,} / {len(efiling_ann):,} ({len(efiling_lc)/len(efiling_ann)*100:.1f}% of EFiling/XDX)" if efiling_ann else "EFiling/XDX: 0 annotated paragraphs")
print(f"CompSci Transform annotated paragraphs starting lowercase (non-list): {len(compsci_lc):,} / {len(compsci_ann):,} ({len(compsci_lc)/len(compsci_ann)*100:.1f}% of CompSci)" if compsci_ann else "CompSci Transform: 0 annotated paragraphs")
print(f"\nTotal affected annotated paragraphs (LC non-list): {len(ann_lc_nonlist):,} / {total_ann:,} = {len(ann_lc_nonlist)/total_ann*100:.2f}%")
# =====================================================================
# SECTION 3: Orphan-word paragraphs detail
# =====================================================================
print(f"\n{SEP}")
print("SECTION 3: Orphan-word paragraph details (LC-start, non-list, annotated)")
print(SEP)
# Breakdown by generator
print(f"\nBreakdown by generator:")
print(f"{'Generator':<50} {'Count':>7} {'% of orphan':>12}")
print("-" * 75)
for gen, count in lc_by_gen.most_common():
pct = count / len(ann_lc_nonlist) * 100
print(f"{gen:<50} {count:>7} {pct:>11.1f}%")
# 10 example texts with labels
print(f"\n10 example orphan-word annotated paragraphs:")
print("-" * 100)
examples = sorted(ann_lc_nonlist)[:10]
for pid in examples:
text = paragraphs[pid]["text"][:150]
ann = annotations[pid]
label = ann.get("label", {})
cat = label.get("content_category", "?")
spec = label.get("specificity_level", "?")
gen = para_to_gen[pid]
print(f" [{gen}] cat={cat}, spec={spec}")
print(f" \"{text}...\"")
print()
# Category distribution in orphan-word paragraphs vs overall
print(f"\nCategory distribution: orphan-word vs overall annotated set")
print("-" * 80)
orphan_cats = Counter()
for pid in ann_lc_nonlist:
cat = annotations[pid].get("label", {}).get("content_category", "Unknown")
orphan_cats[cat] += 1
overall_cats = Counter()
for pid in annotated_ids:
cat = annotations[pid].get("label", {}).get("content_category", "Unknown")
overall_cats[cat] += 1
all_cats = sorted(set(orphan_cats.keys()) | set(overall_cats.keys()))
print(f"{'Category':<40} {'Orphan':>7} {'Orphan%':>8} {'Overall':>8} {'Overall%':>9} {'Over-rep':>9}")
print("-" * 85)
for cat in all_cats:
o_count = orphan_cats.get(cat, 0)
a_count = overall_cats.get(cat, 0)
o_pct = o_count / len(ann_lc_nonlist) * 100 if ann_lc_nonlist else 0
a_pct = a_count / total_ann * 100
ratio = (o_pct / a_pct) if a_pct > 0 else 0
flag = " <<<" if ratio > 1.5 else ""
print(f"{cat:<40} {o_count:>7} {o_pct:>7.1f}% {a_count:>8} {a_pct:>8.1f}% {ratio:>8.2f}x{flag}")
# =====================================================================
# SECTION 4: Inlined headers analysis
# =====================================================================
print(f"\n{SEP}")
print("SECTION 4: Inlined headers in annotated paragraphs")
print(SEP)
ann_inlined = set()
for pid in annotated_ids:
text = paragraphs[pid]["text"]
if looks_like_inlined_header(text):
ann_inlined.add(pid)
print(f"\nAnnotated paragraphs with inlined headers: {len(ann_inlined):,} / {total_ann:,} ({len(ann_inlined)/total_ann*100:.2f}%)")
inlined_by_gen = Counter()
for pid in ann_inlined:
inlined_by_gen[para_to_gen[pid]] += 1
print(f"\n{'Generator':<50} {'Inlined':>8} {'Total ann':>10} {'% of gen':>9}")
print("-" * 85)
for gen, _ in ann_gen_counts.most_common():
ih_count = inlined_by_gen.get(gen, 0)
gen_total = ann_gen_counts[gen]
pct = ih_count / gen_total * 100 if gen_total else 0
if ih_count > 0:
print(f"{gen:<50} {ih_count:>8} {gen_total:>10} {pct:>8.1f}%")
# Show some examples
print(f"\n10 example inlined-header paragraphs:")
print("-" * 100)
examples_ih = sorted(ann_inlined)[:10]
for pid in examples_ih:
text = paragraphs[pid]["text"][:150]
gen = para_to_gen[pid]
cat = annotations[pid].get("label", {}).get("content_category", "?")
print(f" [{gen}] cat={cat}")
print(f" \"{text}...\"")
print()
# =====================================================================
# SECTION 5: Combined impact summary
# =====================================================================
print(f"\n{SEP}")
print("SECTION 5: Combined impact summary")
print(SEP)
affected = ann_lc_nonlist | ann_inlined
print(f"\nOrphan-word (LC non-list): {len(ann_lc_nonlist):>6} ({len(ann_lc_nonlist)/total_ann*100:.2f}%)")
print(f"Inlined headers: {len(ann_inlined):>6} ({len(ann_inlined)/total_ann*100:.2f}%)")
print(f"Either issue (union): {len(affected):>6} ({len(affected)/total_ann*100:.2f}%)")
print(f"Total annotated set: {total_ann:>6}")
# EFiling/XDX specifically
efiling_affected = {pid for pid in affected if para_to_gen[pid] in efiling_gens}
print(f"\nEFiling/XDX affected (either issue): {len(efiling_affected):,} / {len(efiling_ann):,}")
if __name__ == "__main__":
main()