335 lines
13 KiB
Python
335 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Quantify how EFiling/XDX generator quality issues affect the annotated paragraph set.
|
|
READ-ONLY analysis — does not modify any files.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import Counter, defaultdict
|
|
from pathlib import Path
|
|
|
|
# Reuse detect_generator from the existing script
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from detect_generators import detect_generator
|
|
|
|
# Paths
|
|
HTML_DIR = Path("/home/joey/Documents/sec-cyBERT/data/raw/html")
|
|
PARAGRAPHS_PATH = Path("/home/joey/Documents/sec-cyBERT/data/paragraphs/paragraphs-clean.jsonl")
|
|
ANNOTATIONS_PATH = Path("/home/joey/Documents/sec-cyBERT/data/annotations/stage1.jsonl")
|
|
|
|
SEP = "=" * 100
|
|
|
|
|
|
def load_paragraphs():
|
|
"""Load paragraphs, return dict: id -> paragraph dict."""
|
|
paragraphs = {}
|
|
with open(PARAGRAPHS_PATH) as f:
|
|
for line in f:
|
|
p = json.loads(line)
|
|
paragraphs[p["id"]] = p
|
|
return paragraphs
|
|
|
|
|
|
def load_annotations():
|
|
"""Load annotations, return dict: paragraphId -> annotation dict."""
|
|
annotations = {}
|
|
with open(ANNOTATIONS_PATH) as f:
|
|
for line in f:
|
|
a = json.loads(line)
|
|
pid = a["paragraphId"]
|
|
# Keep the first annotation per paragraph (or overwrite — doesn't matter for counts)
|
|
annotations[pid] = a
|
|
return annotations
|
|
|
|
|
|
def detect_all_generators():
|
|
"""Detect generators for all HTML files. Return dict: accession -> generator."""
|
|
accession_to_gen = {}
|
|
files = sorted(HTML_DIR.glob("*.html"))
|
|
total = len(files)
|
|
for i, fp in enumerate(files):
|
|
accession = fp.stem
|
|
gen, _evidence = detect_generator(str(fp))
|
|
accession_to_gen[accession] = gen
|
|
if (i + 1) % 3000 == 0:
|
|
print(f" Scanned {i + 1}/{total} HTML files...", file=sys.stderr)
|
|
print(f" Scanned {total}/{total} HTML files.", file=sys.stderr)
|
|
return accession_to_gen
|
|
|
|
|
|
def starts_lowercase(text: str) -> bool:
|
|
"""True if text starts with a lowercase letter (orphan word candidate)."""
|
|
if not text:
|
|
return False
|
|
return text[0].islower()
|
|
|
|
|
|
def is_list_item(text: str) -> bool:
|
|
"""True if text looks like a list item (starts with bullet, dash, number+period, etc.)."""
|
|
stripped = text.strip()
|
|
if not stripped:
|
|
return False
|
|
# Common list patterns: "- ", "• ", "* ", "1. ", "a) ", "(a) ", "(i) "
|
|
if re.match(r'^[-•*▪◦]\s', stripped):
|
|
return True
|
|
if re.match(r'^\d+[.)]\s', stripped):
|
|
return True
|
|
if re.match(r'^\([a-z0-9ivx]+\)\s', stripped, re.I):
|
|
return True
|
|
if re.match(r'^[a-z][.)]\s', stripped):
|
|
return True
|
|
return False
|
|
|
|
|
|
def looks_like_inlined_header(text: str) -> bool:
|
|
"""
|
|
True if text starts with a section heading run into body text, e.g.:
|
|
"Risk Management and Strategy We recognize the importance..."
|
|
"Cybersecurity Governance Our Board of Directors oversees..."
|
|
|
|
Key distinction from normal sentences: the heading portion is a noun phrase
|
|
(not a full sentence subject like "Our Board" or "The Company"), and is
|
|
immediately followed by a new sentence that starts a different thought.
|
|
|
|
We look for known SEC cybersecurity section heading patterns followed by
|
|
body text starting with a capital letter (new sentence) with no punctuation
|
|
separating them (no period, colon, or newline — just a space).
|
|
"""
|
|
# Known heading patterns for SEC Item 1C disclosures
|
|
heading_patterns = [
|
|
r'(?:Cybersecurity\s+)?Risk\s+Management(?:\s+and\s+Strategy)?',
|
|
r'(?:Cybersecurity\s+)?Governance(?:\s+and\s+Risk\s+Management)?',
|
|
r'Cybersecurity\s+Governance',
|
|
r'Cybersecurity\s+Risk\s+Management\s+and\s+Strategy',
|
|
r'Board\s+Oversight(?:\s+of\s+(?:Risks?\s+from\s+)?Cybersecurity(?:\s+(?:Threats?|Risks?))?)?',
|
|
r'Management(?:\'s)?\s+Role\s+in\s+(?:Managing\s+)?Cybersecurity',
|
|
r'Governance\s+(?:Related\s+to|Oversight\s+of)\s+Cybersecurity(?:\s+Risks?)?',
|
|
r'Impact\s+of\s+Cybersecurity\s+(?:Risks?|Threats?)',
|
|
r'Cybersecurity\s+(?:Strategy|Overview|Program)',
|
|
r'(?:Management\s+and|Management|Governance)\s+(?:Strategy|Overview)',
|
|
r'Risk\s+Factors?',
|
|
r'Oversight\s+of\s+Cybersecurity\s+Risk\s+Management',
|
|
]
|
|
|
|
for pat in heading_patterns:
|
|
# Heading immediately followed by body text (capital letter starting new sentence)
|
|
m = re.match(rf'^({pat})\s+([A-Z])', text)
|
|
if m:
|
|
return True
|
|
# Also catch heading followed by lowercase (rarer but possible)
|
|
m = re.match(rf'^({pat})\s+([a-z])', text)
|
|
if m:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def main():
|
|
print("Loading data...")
|
|
paragraphs = load_paragraphs()
|
|
annotations = load_annotations()
|
|
print(f" Paragraphs: {len(paragraphs):,}")
|
|
print(f" Annotations: {len(annotations):,}")
|
|
|
|
# Unique annotated paragraph IDs
|
|
annotated_ids = set(annotations.keys()) & set(paragraphs.keys())
|
|
print(f" Annotated paragraphs with matching paragraph data: {len(annotated_ids):,}")
|
|
|
|
print("\nDetecting generators for all HTML files...")
|
|
accession_to_gen = detect_all_generators()
|
|
print(f" HTML files scanned: {len(accession_to_gen):,}")
|
|
|
|
# Map each paragraph to its generator
|
|
para_to_gen = {}
|
|
missing_accessions = set()
|
|
for pid, p in paragraphs.items():
|
|
acc = p["filing"]["accessionNumber"]
|
|
gen = accession_to_gen.get(acc)
|
|
if gen is None:
|
|
missing_accessions.add(acc)
|
|
gen = "NO_HTML_FILE"
|
|
para_to_gen[pid] = gen
|
|
|
|
if missing_accessions:
|
|
print(f"\n WARNING: {len(missing_accessions)} accession numbers in paragraphs have no HTML file")
|
|
|
|
# =====================================================================
|
|
# SECTION 1: Annotated paragraphs by generator
|
|
# =====================================================================
|
|
print(f"\n{SEP}")
|
|
print("SECTION 1: Annotated paragraphs by generator")
|
|
print(SEP)
|
|
|
|
ann_gen_counts = Counter()
|
|
for pid in annotated_ids:
|
|
ann_gen_counts[para_to_gen[pid]] += 1
|
|
|
|
total_ann = len(annotated_ids)
|
|
print(f"\n{'Generator':<50} {'Count':>7} {'%':>7}")
|
|
print("-" * 70)
|
|
for gen, count in ann_gen_counts.most_common():
|
|
pct = count / total_ann * 100
|
|
print(f"{gen:<50} {count:>7} {pct:>6.1f}%")
|
|
print("-" * 70)
|
|
print(f"{'TOTAL':<50} {total_ann:>7} {100.0:>6.1f}%")
|
|
|
|
# =====================================================================
|
|
# SECTION 2: Lowercase-start (orphan word) analysis for annotated set
|
|
# =====================================================================
|
|
print(f"\n{SEP}")
|
|
print("SECTION 2: Lowercase-start paragraphs in annotated set")
|
|
print(SEP)
|
|
|
|
# All annotated lowercase-start
|
|
ann_lc = {pid for pid in annotated_ids if starts_lowercase(paragraphs[pid]["text"])}
|
|
ann_lc_nonlist = {pid for pid in ann_lc if not is_list_item(paragraphs[pid]["text"])}
|
|
|
|
print(f"\nAnnotated paragraphs starting with lowercase: {len(ann_lc):,} / {total_ann:,} ({len(ann_lc)/total_ann*100:.2f}%)")
|
|
print(f" Of those, excluding list items: {len(ann_lc_nonlist):,} ({len(ann_lc_nonlist)/total_ann*100:.2f}%)")
|
|
|
|
# Breakdown by generator for lowercase-start non-list
|
|
lc_by_gen = Counter()
|
|
for pid in ann_lc_nonlist:
|
|
lc_by_gen[para_to_gen[pid]] += 1
|
|
|
|
print(f"\n{'Generator':<50} {'LC-start':>9} {'Total ann':>10} {'% of gen':>9}")
|
|
print("-" * 85)
|
|
for gen, _ in ann_gen_counts.most_common():
|
|
lc_count = lc_by_gen.get(gen, 0)
|
|
gen_total = ann_gen_counts[gen]
|
|
pct = lc_count / gen_total * 100 if gen_total else 0
|
|
if lc_count > 0:
|
|
print(f"{gen:<50} {lc_count:>9} {gen_total:>10} {pct:>8.1f}%")
|
|
|
|
# Specific callouts
|
|
efiling_gens = {"EFiling/EDGAR Agent", "EFiling XDX"}
|
|
efiling_ann = {pid for pid in annotated_ids if para_to_gen[pid] in efiling_gens}
|
|
efiling_lc = {pid for pid in ann_lc_nonlist if para_to_gen[pid] in efiling_gens}
|
|
|
|
compsci_ann = {pid for pid in annotated_ids if para_to_gen[pid] == "CompSci Transform"}
|
|
compsci_lc = {pid for pid in ann_lc_nonlist if para_to_gen[pid] == "CompSci Transform"}
|
|
|
|
print(f"\n--- Specific callouts ---")
|
|
print(f"EFiling/XDX annotated paragraphs starting lowercase (non-list): {len(efiling_lc):,} / {len(efiling_ann):,} ({len(efiling_lc)/len(efiling_ann)*100:.1f}% of EFiling/XDX)" if efiling_ann else "EFiling/XDX: 0 annotated paragraphs")
|
|
print(f"CompSci Transform annotated paragraphs starting lowercase (non-list): {len(compsci_lc):,} / {len(compsci_ann):,} ({len(compsci_lc)/len(compsci_ann)*100:.1f}% of CompSci)" if compsci_ann else "CompSci Transform: 0 annotated paragraphs")
|
|
print(f"\nTotal affected annotated paragraphs (LC non-list): {len(ann_lc_nonlist):,} / {total_ann:,} = {len(ann_lc_nonlist)/total_ann*100:.2f}%")
|
|
|
|
# =====================================================================
|
|
# SECTION 3: Orphan-word paragraphs detail
|
|
# =====================================================================
|
|
print(f"\n{SEP}")
|
|
print("SECTION 3: Orphan-word paragraph details (LC-start, non-list, annotated)")
|
|
print(SEP)
|
|
|
|
# Breakdown by generator
|
|
print(f"\nBreakdown by generator:")
|
|
print(f"{'Generator':<50} {'Count':>7} {'% of orphan':>12}")
|
|
print("-" * 75)
|
|
for gen, count in lc_by_gen.most_common():
|
|
pct = count / len(ann_lc_nonlist) * 100
|
|
print(f"{gen:<50} {count:>7} {pct:>11.1f}%")
|
|
|
|
# 10 example texts with labels
|
|
print(f"\n10 example orphan-word annotated paragraphs:")
|
|
print("-" * 100)
|
|
examples = sorted(ann_lc_nonlist)[:10]
|
|
for pid in examples:
|
|
text = paragraphs[pid]["text"][:150]
|
|
ann = annotations[pid]
|
|
label = ann.get("label", {})
|
|
cat = label.get("content_category", "?")
|
|
spec = label.get("specificity_level", "?")
|
|
gen = para_to_gen[pid]
|
|
print(f" [{gen}] cat={cat}, spec={spec}")
|
|
print(f" \"{text}...\"")
|
|
print()
|
|
|
|
# Category distribution in orphan-word paragraphs vs overall
|
|
print(f"\nCategory distribution: orphan-word vs overall annotated set")
|
|
print("-" * 80)
|
|
|
|
orphan_cats = Counter()
|
|
for pid in ann_lc_nonlist:
|
|
cat = annotations[pid].get("label", {}).get("content_category", "Unknown")
|
|
orphan_cats[cat] += 1
|
|
|
|
overall_cats = Counter()
|
|
for pid in annotated_ids:
|
|
cat = annotations[pid].get("label", {}).get("content_category", "Unknown")
|
|
overall_cats[cat] += 1
|
|
|
|
all_cats = sorted(set(orphan_cats.keys()) | set(overall_cats.keys()))
|
|
print(f"{'Category':<40} {'Orphan':>7} {'Orphan%':>8} {'Overall':>8} {'Overall%':>9} {'Over-rep':>9}")
|
|
print("-" * 85)
|
|
for cat in all_cats:
|
|
o_count = orphan_cats.get(cat, 0)
|
|
a_count = overall_cats.get(cat, 0)
|
|
o_pct = o_count / len(ann_lc_nonlist) * 100 if ann_lc_nonlist else 0
|
|
a_pct = a_count / total_ann * 100
|
|
ratio = (o_pct / a_pct) if a_pct > 0 else 0
|
|
flag = " <<<" if ratio > 1.5 else ""
|
|
print(f"{cat:<40} {o_count:>7} {o_pct:>7.1f}% {a_count:>8} {a_pct:>8.1f}% {ratio:>8.2f}x{flag}")
|
|
|
|
# =====================================================================
|
|
# SECTION 4: Inlined headers analysis
|
|
# =====================================================================
|
|
print(f"\n{SEP}")
|
|
print("SECTION 4: Inlined headers in annotated paragraphs")
|
|
print(SEP)
|
|
|
|
ann_inlined = set()
|
|
for pid in annotated_ids:
|
|
text = paragraphs[pid]["text"]
|
|
if looks_like_inlined_header(text):
|
|
ann_inlined.add(pid)
|
|
|
|
print(f"\nAnnotated paragraphs with inlined headers: {len(ann_inlined):,} / {total_ann:,} ({len(ann_inlined)/total_ann*100:.2f}%)")
|
|
|
|
inlined_by_gen = Counter()
|
|
for pid in ann_inlined:
|
|
inlined_by_gen[para_to_gen[pid]] += 1
|
|
|
|
print(f"\n{'Generator':<50} {'Inlined':>8} {'Total ann':>10} {'% of gen':>9}")
|
|
print("-" * 85)
|
|
for gen, _ in ann_gen_counts.most_common():
|
|
ih_count = inlined_by_gen.get(gen, 0)
|
|
gen_total = ann_gen_counts[gen]
|
|
pct = ih_count / gen_total * 100 if gen_total else 0
|
|
if ih_count > 0:
|
|
print(f"{gen:<50} {ih_count:>8} {gen_total:>10} {pct:>8.1f}%")
|
|
|
|
# Show some examples
|
|
print(f"\n10 example inlined-header paragraphs:")
|
|
print("-" * 100)
|
|
examples_ih = sorted(ann_inlined)[:10]
|
|
for pid in examples_ih:
|
|
text = paragraphs[pid]["text"][:150]
|
|
gen = para_to_gen[pid]
|
|
cat = annotations[pid].get("label", {}).get("content_category", "?")
|
|
print(f" [{gen}] cat={cat}")
|
|
print(f" \"{text}...\"")
|
|
print()
|
|
|
|
# =====================================================================
|
|
# SECTION 5: Combined impact summary
|
|
# =====================================================================
|
|
print(f"\n{SEP}")
|
|
print("SECTION 5: Combined impact summary")
|
|
print(SEP)
|
|
|
|
affected = ann_lc_nonlist | ann_inlined
|
|
print(f"\nOrphan-word (LC non-list): {len(ann_lc_nonlist):>6} ({len(ann_lc_nonlist)/total_ann*100:.2f}%)")
|
|
print(f"Inlined headers: {len(ann_inlined):>6} ({len(ann_inlined)/total_ann*100:.2f}%)")
|
|
print(f"Either issue (union): {len(affected):>6} ({len(affected)/total_ann*100:.2f}%)")
|
|
print(f"Total annotated set: {total_ann:>6}")
|
|
|
|
# EFiling/XDX specifically
|
|
efiling_affected = {pid for pid in affected if para_to_gen[pid] in efiling_gens}
|
|
print(f"\nEFiling/XDX affected (either issue): {len(efiling_affected):,} / {len(efiling_ann):,}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|