#!/usr/bin/env python3 """Audit sec-cyBERT paragraph corpus for text quality issues.""" import json import re import random import os from collections import Counter, defaultdict from pathlib import Path DATA_FILE = Path("data/paragraphs/paragraphs-clean.jsonl") HTML_DIR = Path("data/raw/html") # ── Load all paragraphs ────────────────────────────────────────────────────── print("Loading paragraphs...") paragraphs = [] with open(DATA_FILE) as f: for line in f: paragraphs.append(json.loads(line)) print(f"Loaded {len(paragraphs):,} paragraphs.\n") def show(text, limit=200): """Truncate text for display.""" if len(text) <= limit: return text return text[:limit] + "..." def header(title): print("\n" + "=" * 80) print(f" {title}") print("=" * 80 + "\n") # ══════════════════════════════════════════════════════════════════════════════ # CHECK 1: Inlined headers # ══════════════════════════════════════════════════════════════════════════════ header("CHECK 1: Inlined Headers") inlined_header_examples = [] # Detect heading+body merged into one paragraph. # A heading is a short (2-10 word) title-case or ALL-CAPS phrase at the start, # immediately followed (no colon/period separator) by a sentence starting with # a common sentence-opener like We/Our/The/As/In/This/A/An/Each/Management/For/Since/During. pat_merged_header = re.compile( r"^([A-Z][A-Za-z\s,&/\-\']+?)(? 5 if is_title or is_allcaps: kind = "ALLCAPS" if is_allcaps else "TITLECASE" inlined_header_examples.append((kind, p, heading_candidate)) print(f"Found {len(inlined_header_examples):,} paragraphs with potential inlined headers.") print(f" - ALLCAPS pattern: {sum(1 for t,_,_ in inlined_header_examples if t=='ALLCAPS'):,}") print(f" - TITLECASE pattern: {sum(1 for t,_,_ in inlined_header_examples if t=='TITLECASE'):,}") print() # Show 20 examples, mix of both types random.seed(42) sample = random.sample(inlined_header_examples, min(20, len(inlined_header_examples))) for i, (kind, p, hdr) in enumerate(sample, 1): print(f" [{i}] ({kind}) Header: \"{hdr}\" [{p['filing']['companyName'][:30]}]") print(f" {show(p['text'])}") print() # ══════════════════════════════════════════════════════════════════════════════ # CHECK 2: Sentence boundary violations # ══════════════════════════════════════════════════════════════════════════════ header("CHECK 2: Sentence Boundary Violations") boundary_examples = [] # word.Next — period followed immediately by uppercase letter (not abbreviations) pat_dotcap = re.compile(r"[a-z]\.([A-Z][a-z])") # word,Next — comma followed immediately by uppercase letter pat_commacap = re.compile(r"[a-z],([A-Z][a-z])") # Two words jammed: lowercase then uppercase with no space/punct pat_jammed = re.compile(r"[a-z]{2}[A-Z][a-z]{2}") # Common false positives for dot-cap: abbreviations, names false_pos_dot = re.compile( r"(?:Mr|Mrs|Ms|Dr|Jr|Sr|Inc|Corp|Ltd|Co|No|vs|St|Dept|Gen|Gov|Sec|Vol|Rev|etc|U\.S|U\.K)\." ) for p in paragraphs: text = p["text"] issues = [] for m in pat_dotcap.finditer(text): start = max(0, m.start() - 10) context = text[start : m.end() + 10] # skip if it's a known abbreviation if not false_pos_dot.search(text[max(0, m.start() - 5) : m.end()]): issues.append(("dot-cap", context)) for m in pat_commacap.finditer(text): start = max(0, m.start() - 10) context = text[start : m.end() + 10] issues.append(("comma-cap", context)) if issues: boundary_examples.append((p, issues)) print(f"Found {len(boundary_examples):,} paragraphs with sentence boundary violations.") print() random.seed(43) sample = random.sample(boundary_examples, min(20, len(boundary_examples))) for i, (p, issues) in enumerate(sample, 1): print(f" [{i}] [{p['filing']['companyName'][:30]}]") for kind, ctx in issues[:3]: print(f" ({kind}) ...{ctx}...") print(f" Full start: {show(p['text'], 150)}") print() # ══════════════════════════════════════════════════════════════════════════════ # CHECK 3: Garbled / nonsensical text # ══════════════════════════════════════════════════════════════════════════════ header("CHECK 3: Garbled / Nonsensical Text") garbled_examples = [] # Spaced-out characters: single chars separated by spaces pat_spaced = re.compile(r"(?:\b[a-zA-Z]\s){4,}") for p in paragraphs: text = p["text"] reason = None # Check spaced-out characters if pat_spaced.search(text): reason = "spaced-chars" # Check long non-ASCII runs non_ascii = sum(1 for c in text if ord(c) > 127) if non_ascii > len(text) * 0.15 and len(text) > 20: reason = f"non-ASCII ({non_ascii}/{len(text)} chars)" # Check mostly numbers/symbols (>50% non-alpha) alpha = sum(1 for c in text if c.isalpha()) if len(text) > 20 and alpha < len(text) * 0.4: reason = f"low-alpha ({alpha}/{len(text)} = {alpha/len(text):.0%})" if reason: garbled_examples.append((reason, p)) print(f"Found {len(garbled_examples):,} potentially garbled paragraphs.") reason_counts = Counter(r.split("(")[0].strip() for r, _ in garbled_examples) for r, c in reason_counts.most_common(): print(f" - {r}: {c}") print() random.seed(44) sample = random.sample(garbled_examples, min(10, len(garbled_examples))) for i, (reason, p) in enumerate(sample, 1): print(f" [{i}] ({reason}) [{p['filing']['companyName'][:30]}] wc={p['wordCount']}") print(f" {show(p['text'], 250)}") print() # ══════════════════════════════════════════════════════════════════════════════ # CHECK 4: HTML / markup artifacts # ══════════════════════════════════════════════════════════════════════════════ header("CHECK 4: HTML / Markup Artifacts") html_examples = [] pat_html_tag = re.compile(r"<[a-zA-Z/][^>]*>") pat_html_entity = re.compile(r"&(?:amp|lt|gt|nbsp|quot|#\d+|#x[0-9a-fA-F]+);") pat_xbrl = re.compile(r"\b(?:ix|us-gaap|dei|xbrli):") pat_css = re.compile(r"(?:font-family|font-size|color:|margin:|padding:|text-align|line-height)", re.IGNORECASE) for p in paragraphs: text = p["text"] reasons = [] if pat_html_tag.search(text): reasons.append("html-tag") if pat_html_entity.search(text): reasons.append("html-entity") if pat_xbrl.search(text): reasons.append("xbrl") if pat_css.search(text): reasons.append("css") if reasons: html_examples.append((reasons, p)) print(f"Found {len(html_examples):,} paragraphs with HTML/markup artifacts.") reason_counts = Counter() for reasons, _ in html_examples: for r in reasons: reason_counts[r] += 1 for r, c in reason_counts.most_common(): print(f" - {r}: {c}") print() random.seed(45) sample = random.sample(html_examples, min(10, len(html_examples))) for i, (reasons, p) in enumerate(sample, 1): print(f" [{i}] ({', '.join(reasons)}) [{p['filing']['companyName'][:30]}]") print(f" {show(p['text'], 250)}") print() # ══════════════════════════════════════════════════════════════════════════════ # CHECK 5: Truncated paragraphs # ══════════════════════════════════════════════════════════════════════════════ header("CHECK 5: Truncated Paragraphs") truncated = [] # Common abbreviations that end sentences without terminal punct being an issue abbrevs = {"inc", "corp", "ltd", "co", "mr", "mrs", "ms", "dr", "jr", "sr", "etc", "al", "eg", "ie", "vs", "no", "approx", "dept", "gov"} for p in paragraphs: text = p["text"].rstrip() if not text: continue # Check if ends with terminal punctuation last_char = text[-1] if last_char in ".!?:;)\"'""'": continue # Check if it's a very short text (likely a heading) if p["wordCount"] <= 5: continue # Check if last word is a common abbreviation last_word = text.split()[-1].lower().rstrip(".,;:!?") if last_word in abbrevs: continue truncated.append(p) print(f"Found {len(truncated):,} potentially truncated paragraphs (no terminal punctuation, >5 words).") print() random.seed(46) sample = random.sample(truncated, min(10, len(truncated))) for i, p in enumerate(sample, 1): text = p["text"] print(f" [{i}] [{p['filing']['companyName'][:30]}] wc={p['wordCount']}") # Show the END of the text if len(text) > 200: print(f" ...{text[-200:]}") else: print(f" {text}") print() # ══════════════════════════════════════════════════════════════════════════════ # CHECK 6: Duplicate text across filings # ══════════════════════════════════════════════════════════════════════════════ header("CHECK 6: Cross-Filing Duplicate Text") # Group by textHash hash_to_paras = defaultdict(list) for p in paragraphs: hash_to_paras[p["textHash"]].append(p) # Find hashes that appear in multiple different filings cross_filing_dupes = {} for h, ps in hash_to_paras.items(): accessions = set(p["filing"]["accessionNumber"] for p in ps) if len(accessions) > 1: cross_filing_dupes[h] = ps total_dupe_paragraphs = sum(len(ps) for ps in cross_filing_dupes.values()) print(f"Unique textHashes appearing in multiple filings: {len(cross_filing_dupes):,}") print(f"Total paragraphs involved: {total_dupe_paragraphs:,}") print() # Sort by number of filings (most duplicated first) sorted_dupes = sorted(cross_filing_dupes.items(), key=lambda x: len(set(p["filing"]["accessionNumber"] for p in x[1])), reverse=True) print("Top 15 most duplicated paragraphs:") for i, (h, ps) in enumerate(sorted_dupes[:15], 1): accessions = set(p["filing"]["accessionNumber"] for p in ps) companies = set(p["filing"]["companyName"] for p in ps) print(f"\n [{i}] Hash={h}, in {len(accessions)} filings, {len(companies)} companies") print(f" Companies: {', '.join(list(companies)[:5])}{'...' if len(companies) > 5 else ''}") print(f" Text: {show(ps[0]['text'], 200)}") # Check for same-company cross-year dupes vs different-company dupes same_company_dupes = 0 diff_company_dupes = 0 for h, ps in cross_filing_dupes.items(): companies = set(p["filing"]["companyName"] for p in ps) if len(companies) == 1: same_company_dupes += 1 else: diff_company_dupes += 1 print(f"\n\nBreakdown:") print(f" Same company, different filings (likely year-over-year boilerplate): {same_company_dupes:,}") print(f" Different companies (likely industry boilerplate or extraction error): {diff_company_dupes:,}") # ══════════════════════════════════════════════════════════════════════════════ # CHECK 7: Ground truth spot-check # ══════════════════════════════════════════════════════════════════════════════ header("CHECK 7: Ground Truth Spot-Check (10 random paragraphs vs. source HTML)") def normalize_html_to_plain(html_text): """Convert raw HTML to normalized plain text for comparison.""" plain = re.sub(r"<[^>]+>", " ", html_text) # Decode common HTML entities plain = re.sub(r" ?", " ", plain) plain = re.sub(r"&", "&", plain) plain = re.sub(r"<", "<", plain) plain = re.sub(r">", ">", plain) plain = re.sub(r"’|’|’", "\u2019", plain) plain = re.sub(r"‘|‘|‘", "\u2018", plain) plain = re.sub(r"”|”|”", "\u201D", plain) plain = re.sub(r"“|“|“", "\u201C", plain) plain = re.sub(r"—|—", "\u2014", plain) plain = re.sub(r"–|–", "\u2013", plain) plain = re.sub(r"&#(\d+);", lambda m: chr(int(m.group(1))), plain) plain = re.sub(r"&#x([0-9a-fA-F]+);", lambda m: chr(int(m.group(1), 16)), plain) plain = re.sub(r"&\w+;", " ", plain) plain = re.sub(r"\s+", " ", plain) return plain random.seed(99) spot_check_sample = random.sample(paragraphs, 10) match_count = 0 partial_count = 0 not_found_count = 0 for i, p in enumerate(spot_check_sample, 1): acc = p["filing"]["accessionNumber"] html_path = HTML_DIR / f"{acc}.html" print(f" [{i}] {p['filing']['companyName'][:40]} | {acc}") print(f" Paragraph index: {p['paragraphIndex']}, word count: {p['wordCount']}") corpus_text = p["text"] corpus_norm = re.sub(r"\s+", " ", corpus_text).strip() if not html_path.exists(): print(f" *** HTML file not found: {html_path}") print(f" Corpus text: {show(corpus_text, 150)}") not_found_count += 1 print() continue with open(html_path, "r", errors="replace") as f: html_content = f.read() plain_html = normalize_html_to_plain(html_content) # Check if the entire corpus text appears verbatim in the HTML plain text if corpus_norm in plain_html: print(f" VERBATIM MATCH: Corpus text found exactly in HTML source.") match_count += 1 else: # Try to find a distinctive substring to locate the paragraph # Use multiple probes from different positions found = False for start_frac in [0.3, 0.5, 0.1, 0.7]: start_pos = int(len(corpus_norm) * start_frac) probe = corpus_norm[start_pos:start_pos + 40] if not probe: continue idx = plain_html.find(probe) if idx >= 0: found = True # Show surrounding context from HTML ctx_start = max(0, idx - 80) ctx_end = min(len(plain_html), idx + len(corpus_norm) + 80) html_ctx = plain_html[ctx_start:ctx_end].strip() print(f" PARTIAL MATCH: Text found in HTML but paragraph boundaries differ.") print(f" Corpus first 120: {corpus_norm[:120]}") print(f" HTML context 120: {html_ctx[:120]}") partial_count += 1 break if not found: print(f" NOT FOUND in HTML plain text!") print(f" Corpus text: {show(corpus_text, 150)}") not_found_count += 1 print() print(f"Spot-check results: {match_count} verbatim, {partial_count} partial, {not_found_count} not found") # ══════════════════════════════════════════════════════════════════════════════ # SUMMARY # ══════════════════════════════════════════════════════════════════════════════ header("SUMMARY") print(f"Total paragraphs: {len(paragraphs):,}") print(f" 1. Inlined headers: {len(inlined_header_examples):,}") print(f" 2. Sentence boundary violations: {len(boundary_examples):,}") print(f" 3. Garbled / nonsensical text: {len(garbled_examples):,}") print(f" 4. HTML / markup artifacts: {len(html_examples):,}") print(f" 5. Truncated paragraphs: {len(truncated):,}") print(f" 6. Cross-filing duplicates: {len(cross_filing_dupes):,} unique texts in {total_dupe_paragraphs:,} paragraphs") print()