#!/usr/bin/env python3 """ Novel data quality audit for paragraphs-clean.jsonl. READ-ONLY: prints findings to stdout, does not modify any files. """ import json import re import sys from collections import Counter, defaultdict from pathlib import Path DATA_PATH = Path(__file__).resolve().parent.parent / "data" / "paragraphs" / "paragraphs-clean.jsonl" # ── Cybersecurity domain keywords (broad) ────────────────────────────── CYBER_KEYWORDS = { "cyber", "cybersecurity", "security", "breach", "incident", "threat", "vulnerability", "malware", "ransomware", "phishing", "firewall", "encryption", "intrusion", "unauthorized", "attack", "hacker", "data protection", "information security", "network security", "access control", "authentication", "risk management", "ciso", "chief information security", "chief information officer", "information technology", "it systems", "data privacy", "privacy", "personally identifiable", "pii", "soc", "nist", "iso 27001", "penetration test", "disaster recovery", "business continuity", "third party", "vendor", "supply chain", "cloud", "endpoint", "monitoring", "detection", "response", "remediation", "patch", "compliance", "regulatory", "safeguard", "protect", "secure", "confidential", "integrity", "availability", "resilience", "governance", "oversight", "board of directors", "audit committee", "risk factor", "material", "disclosure", "1c", "item 1c", } # ── Non-cyber legal boilerplate patterns ──────────────────────────────── BOILERPLATE_PATTERNS = [ re.compile(r"forward[- ]looking\s+statements?", re.I), re.compile(r"safe\s+harbor", re.I), re.compile(r"private\s+securities\s+litigation\s+reform\s+act", re.I), re.compile(r"cautionary\s+statement", re.I), re.compile(r"except\s+as\s+required\s+by\s+law.*no\s+obligation\s+to\s+update", re.I), re.compile(r"this\s+(annual\s+)?report\s+(on\s+form\s+10-k\s+)?contains?\s+forward", re.I), ] # ── SEC item cross-reference pattern ──────────────────────────────────── SEC_ITEM_RE = re.compile(r"\bItem\s+(\d+[A-Z]?)\b", re.I) # ── Dollar amount pattern ────────────────────────────────────────────── DOLLAR_RE = re.compile(r"\$[\d,]+(?:\.\d+)?\s*(?:thousand|million|billion|trillion)?", re.I) # ── Date patterns (unusual formats) ──────────────────────────────────── DATE_PATTERNS = [ # MM/DD/YYYY or MM-DD-YYYY re.compile(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b"), # Month DD, YYYY re.compile(r"\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b", re.I), # DD Month YYYY re.compile(r"\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b", re.I), # YYYY-MM-DD (ISO) re.compile(r"\b\d{4}-\d{2}-\d{2}\b"), ] # ── Bullet point characters ──────────────────────────────────────────── BULLET_RE = re.compile(r"[\u2022\u2023\u25E6\u2043\u2219\u25AA\u25AB\u25CF\u25CB\u25A0\u25A1]") # ── Helpers ───────────────────────────────────────────────────────────── def truncate(text: str, max_len: int = 200) -> str: if len(text) <= max_len: return text return text[:max_len] + "..." def print_section(title: str): print(f"\n{'=' * 80}") print(f" {title}") print(f"{'=' * 80}") def print_finding(name: str, concern: str, count: int, total: int, examples: list[dict]): pct = count / total * 100 if total else 0 print(f"\n--- {name} [{concern} CONCERN] ---") print(f" Count: {count:,} / {total:,} ({pct:.2f}%)") for i, ex in enumerate(examples[:5]): filing = ex.get("filing", {}) company = filing.get("companyName", "?") print(f" Example {i+1} [{company}]:") print(f" {truncate(ex['text'], 300)}") if count > 5: print(f" ... and {count - 5:,} more") def has_cyber_relevance(text_lower: str) -> bool: for kw in CYBER_KEYWORDS: if kw in text_lower: return True return False # ── Load data ────────────────────────────────────────────────────────── def load_data(): paragraphs = [] with open(DATA_PATH) as f: for line in f: paragraphs.append(json.loads(line)) return paragraphs def main(): print("Loading data...") paragraphs = load_data() total = len(paragraphs) print(f"Loaded {total:,} paragraphs.\n") # Pre-compute lowercase texts texts_lower = [p["text"].lower() for p in paragraphs] # ════════════════════════════════════════════════════════════════════ print_section("1. CHARACTER-LEVEL ANOMALIES") # ════════════════════════════════════════════════════════════════════ # 1a. High uppercase ratio (>30%) high_upper = [] for p in paragraphs: t = p["text"] alpha = sum(1 for c in t if c.isalpha()) if alpha < 10: continue upper = sum(1 for c in t if c.isupper()) ratio = upper / alpha if ratio > 0.30: high_upper.append({**p, "_ratio": ratio}) high_upper.sort(key=lambda x: x["_ratio"], reverse=True) print_finding("High uppercase ratio (>30% of alpha chars)", "MEDIUM", len(high_upper), total, high_upper) # 1b. Unusual punctuation density high_punct = [] for p in paragraphs: t = p["text"] if len(t) < 30: continue semis = t.count(";") colons = t.count(":") dashes = t.count("—") + t.count("–") + t.count("-") punct_count = semis + colons + dashes density = punct_count / len(t) if density > 0.05: high_punct.append({**p, "_density": density, "_semis": semis, "_colons": colons, "_dashes": dashes}) high_punct.sort(key=lambda x: x["_density"], reverse=True) print_finding("High punctuation density (semicolons/colons/dashes >5% of chars)", "LOW", len(high_punct), total, high_punct) # 1c. Non-ASCII characters non_ascii_paras = [] non_ascii_chars_all = Counter() for p in paragraphs: t = p["text"] non_ascii = [(c, hex(ord(c)), ord(c)) for c in t if ord(c) > 127] if non_ascii: chars_found = set((c, h) for c, h, _ in non_ascii) for c, h, _ in non_ascii: non_ascii_chars_all[f"{c} ({h})"] += 1 non_ascii_paras.append({**p, "_chars": chars_found}) print_finding("Paragraphs with non-ASCII characters", "MEDIUM", len(non_ascii_paras), total, non_ascii_paras) if non_ascii_chars_all: print("\n Non-ASCII character frequency:") for char_repr, cnt in non_ascii_chars_all.most_common(20): print(f" {char_repr}: {cnt:,} occurrences") # 1d. Unusual whitespace (multiple spaces, tabs) multi_space_re = re.compile(r" +") tab_re = re.compile(r"\t") whitespace_issues = [] for p in paragraphs: t = p["text"] multi = len(multi_space_re.findall(t)) tabs = len(tab_re.findall(t)) if multi > 0 or tabs > 0: whitespace_issues.append({**p, "_multi_spaces": multi, "_tabs": tabs}) print_finding("Unusual whitespace (multiple spaces or tabs)", "MEDIUM", len(whitespace_issues), total, whitespace_issues) # ════════════════════════════════════════════════════════════════════ print_section("2. CONTENT ANOMALIES") # ════════════════════════════════════════════════════════════════════ # 2a. Dollar amounts dollar_paras = [] for p in paragraphs: matches = DOLLAR_RE.findall(p["text"]) if matches: dollar_paras.append({**p, "_amounts": matches}) print_finding("Paragraphs with dollar amounts", "MEDIUM", len(dollar_paras), total, dollar_paras) if dollar_paras: # Show distribution of dollar amounts all_amounts = [] for dp in dollar_paras: all_amounts.extend(dp["_amounts"]) print(f"\n Total dollar amount mentions: {len(all_amounts):,}") amount_counter = Counter(all_amounts) print(" Most common amounts:") for amt, cnt in amount_counter.most_common(10): print(f" {amt}: {cnt:,}") # 2b. Dates in text date_paras = [] for p in paragraphs: t = p["text"] found_dates = [] for pat in DATE_PATTERNS: found_dates.extend(pat.findall(t)) if found_dates: date_paras.append({**p, "_dates": found_dates}) print_finding("Paragraphs containing dates", "LOW", len(date_paras), total, date_paras) if date_paras: all_dates = [] for dp in date_paras: all_dates.extend(dp["_dates"]) print(f"\n Total date mentions: {len(all_dates):,}") # 2c. Cross-references to other SEC items cross_ref_paras = [] for p in paragraphs: matches = SEC_ITEM_RE.findall(p["text"]) # Filter out Item 1C (that's expected) other_items = [m for m in matches if m.upper() != "1C"] if other_items: cross_ref_paras.append({**p, "_items": other_items}) # Count which items are referenced item_counts = Counter() for crp in cross_ref_paras: for item in crp["_items"]: item_counts[f"Item {item}"] += 1 print_finding("Cross-references to non-1C SEC items", "HIGH", len(cross_ref_paras), total, cross_ref_paras) if item_counts: print("\n Referenced items:") for item, cnt in item_counts.most_common(): print(f" {item}: {cnt:,}") # 2d. Non-cyber legal boilerplate boilerplate_paras = [] for p in paragraphs: t = p["text"] matched = [] for pat in BOILERPLATE_PATTERNS: if pat.search(t): matched.append(pat.pattern[:60]) if matched: boilerplate_paras.append({**p, "_patterns": matched}) print_finding("Non-cybersecurity legal boilerplate", "HIGH", len(boilerplate_paras), total, boilerplate_paras) # ════════════════════════════════════════════════════════════════════ print_section("3. STRUCTURAL ANOMALIES") # ════════════════════════════════════════════════════════════════════ # 3a. Bullet points mid-text bullet_paras = [] for p in paragraphs: t = p["text"] if BULLET_RE.search(t): bullet_paras.append(p) elif re.search(r"(?:^|\n)\s*[-*]\s+\w", t): bullet_paras.append(p) print_finding("Paragraphs with bullet points mid-text", "MEDIUM", len(bullet_paras), total, bullet_paras) # 3b. Embedded newlines newline_paras = [] for p in paragraphs: t = p["text"] nl_count = t.count("\n") if nl_count > 0: newline_paras.append({**p, "_newlines": nl_count}) newline_paras.sort(key=lambda x: x["_newlines"], reverse=True) print_finding("Paragraphs with embedded newlines", "MEDIUM", len(newline_paras), total, newline_paras) # 3c. Mid-paragraph headings (ALL CAPS phrase of 3+ words followed by different content) mid_heading_re = re.compile(r"(?<=\. )([A-Z][A-Z\s]{10,}[A-Z])(?=\.?\s+[A-Z][a-z])") mid_heading_paras = [] for p in paragraphs: t = p["text"] matches = mid_heading_re.findall(t) if matches: mid_heading_paras.append({**p, "_headings": matches}) print_finding("Mid-paragraph headings (ALL CAPS phrase mid-sentence)", "MEDIUM", len(mid_heading_paras), total, mid_heading_paras) # ════════════════════════════════════════════════════════════════════ print_section("4. OUTLIER DETECTION") # ════════════════════════════════════════════════════════════════════ # 4a. Extremely high word count (>400) long_paras = [p for p in paragraphs if p["wordCount"] > 400] long_paras.sort(key=lambda x: x["wordCount"], reverse=True) print_finding("Extremely long paragraphs (>400 words)", "HIGH", len(long_paras), total, long_paras) if long_paras: wc_values = [p["wordCount"] for p in long_paras] print(f"\n Word count range: {min(wc_values)} - {max(wc_values)}") print(f" Mean: {sum(wc_values)/len(wc_values):.0f}") # 4b. Low information density # Common English stopwords STOPWORDS = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "shall", "can", "that", "which", "who", "whom", "this", "these", "those", "it", "its", "we", "our", "us", "they", "their", "them", "he", "she", "his", "her", "as", "if", "not", "no", "nor", "so", "than", "too", "very", "such", "also", "each", "any", "all", "both", "other", "some", "into", "through", "during", "before", "after", "about", "between", "under", "over", "above", "up", "down", "out", "off", "then", "once", } low_info_paras = [] for p in paragraphs: words = re.findall(r"[a-z]+", p["text"].lower()) if len(words) < 20: continue stop_ratio = sum(1 for w in words if w in STOPWORDS) / len(words) if stop_ratio > 0.65: low_info_paras.append({**p, "_stop_ratio": stop_ratio}) low_info_paras.sort(key=lambda x: x["_stop_ratio"], reverse=True) print_finding("Low information density (>65% stopwords)", "LOW", len(low_info_paras), total, low_info_paras) # 4c. Exact substring matches across filings print("\n--- Exact substring matches across filings [HIGH CONCERN] ---") print(" (Checking paragraphs that appear as substrings of others in different filings...)") # Group by accession number for efficiency by_accession = defaultdict(list) for p in paragraphs: acc = p["filing"]["accessionNumber"] by_accession[acc].append(p) # For efficiency, only check paragraphs 50-200 chars (likely fragments/duplicates) # Sort by length so shorter ones are checked as substrings of longer ones candidates = [(p["text"], p["filing"]["accessionNumber"], p["filing"]["companyName"], p["id"]) for p in paragraphs if 50 <= len(p["text"]) <= 200] longer_texts = [(p["text"], p["filing"]["accessionNumber"], p["filing"]["companyName"]) for p in paragraphs if len(p["text"]) > 200] substring_matches = [] # Use a set for dedup seen = set() # Only check a sample for performance check_limit = min(len(candidates), 3000) for i in range(check_limit): cand_text, cand_acc, cand_co, cand_id = candidates[i] for long_text, long_acc, long_co in longer_texts[:5000]: if cand_acc == long_acc: continue # same filing, skip if cand_text in long_text and cand_id not in seen: seen.add(cand_id) substring_matches.append({ "text": cand_text, "filing": {"companyName": cand_co, "accessionNumber": cand_acc}, "_found_in": long_co, }) break print(f" Count (sampled {check_limit:,} short paras against {min(len(longer_texts), 5000):,} long paras): {len(substring_matches):,}") for i, ex in enumerate(substring_matches[:5]): print(f" Example {i+1} [{ex['filing']['companyName']}] (also in {ex['_found_in']}):") print(f" {truncate(ex['text'], 300)}") if len(substring_matches) > 5: print(f" ... and {len(substring_matches) - 5:,} more") # ════════════════════════════════════════════════════════════════════ print_section("5. SEMANTIC COHERENCE") # ════════════════════════════════════════════════════════════════════ # 5a. Company name mismatch — look for SPECIFIC named companies in text # that differ from the filing company. Filter out generic refs like "the Company". company_name_mismatches = [] # Pattern: proper noun(s) + legal suffix at end, NOT preceded by "the " specific_company_re = re.compile( r"(? 20) dupes = {s: c for s, c in sent_counter.items() if c > 1} if dupes: repeated_sent_paras.append({**p, "_dupes": dupes}) print_finding("Paragraphs with repeated sentences", "HIGH", len(repeated_sent_paras), total, repeated_sent_paras) # ════════════════════════════════════════════════════════════════════ print_section("SUMMARY") # ════════════════════════════════════════════════════════════════════ print(f"\n Total paragraphs analyzed: {total:,}") print(f"\n HIGH concern findings:") print(f" - Cross-references to non-1C items: {len(cross_ref_paras):,}") print(f" - Non-cyber legal boilerplate: {len(boilerplate_paras):,}") print(f" - Extremely long paragraphs (>400 words): {len(long_paras):,}") print(f" - Company name mismatches: {len(company_name_mismatches):,}") print(f" - No cybersecurity keywords: {len(no_cyber):,}") print(f" - Table/numeric data: {len(table_paras):,}") print(f" - Encoding artifacts: {len(encoding_paras):,}") print(f" - Repeated sentences: {len(repeated_sent_paras):,}") print(f" - Exact substring matches (sampled): {len(substring_matches):,}") print(f"\n MEDIUM concern findings:") print(f" - High uppercase ratio: {len(high_upper):,}") print(f" - Non-ASCII characters: {len(non_ascii_paras):,}") print(f" - Unusual whitespace: {len(whitespace_issues):,}") print(f" - Dollar amounts: {len(dollar_paras):,}") print(f" - Bullet points mid-text: {len(bullet_paras):,}") print(f" - Embedded newlines: {len(newline_paras):,}") print(f" - Mid-paragraph headings: {len(mid_heading_paras):,}") print(f" - URLs in text: {len(url_paras):,}") print(f"\n LOW concern findings:") print(f" - High punctuation density: {len(high_punct):,}") print(f" - Date mentions: {len(date_paras):,}") print(f" - Low information density: {len(low_info_paras):,}") print(f" - Footnote references: {len(footnote_paras):,}") if __name__ == "__main__": main()