SEC-cyBERT/scripts/examine-hard-cases.py
2026-04-03 14:43:53 -04:00

737 lines
30 KiB
Python

#!/usr/bin/env python3
"""Examine hardest disagreement cases in the SEC cybersecurity holdout dataset.
Identifies paragraphs where the 13 annotation sources split on the three main
confusion axes (MR<->RMP, BG<->MR, SI<->N/O), shows representative examples,
extracts linguistic patterns, and recommends codebook rulings.
Run: uv run --with numpy scripts/examine-hard-cases.py
"""
import json
import os
import re
import textwrap
from collections import Counter, defaultdict
from pathlib import Path
import numpy as np
# ── Constants ──────────────────────────────────────────────────────────────────
ROOT = Path(__file__).resolve().parent.parent
CAT_ABBREV = {
"Board Governance": "BG",
"Incident Disclosure": "ID",
"Management Role": "MR",
"None/Other": "N/O",
"Risk Management Process": "RMP",
"Strategy Integration": "SI",
"Third-Party Risk": "TPR",
}
ABBREV_CAT = {v: k for k, v in CAT_ABBREV.items()}
AXES = [
("MR", "RMP", "MR <-> RMP"),
("BG", "MR", "BG <-> MR"),
("SI", "N/O", "SI <-> N/O"),
]
BENCH_FILES = [
"gpt-5.4.jsonl",
"gemini-3.1-pro-preview.jsonl",
"glm-5:exacto.jsonl",
"kimi-k2.5.jsonl",
"mimo-v2-pro:exacto.jsonl",
"minimax-m2.7:exacto.jsonl",
]
STAGE1_MODEL_SHORT = {
"google/gemini-3.1-flash-lite-preview": "s1:gemini-flash",
"x-ai/grok-4.1-fast": "s1:grok-fast",
"xiaomi/mimo-v2-flash": "s1:mimo-flash",
}
BENCH_MODEL_SHORT = {
"gpt-5.4.jsonl": "bench:gpt5.4",
"gemini-3.1-pro-preview.jsonl": "bench:gemini-pro",
"glm-5:exacto.jsonl": "bench:glm5",
"kimi-k2.5.jsonl": "bench:kimi",
"mimo-v2-pro:exacto.jsonl": "bench:mimo-pro",
"minimax-m2.7:exacto.jsonl": "bench:minimax",
}
# ── Load data ──────────────────────────────────────────────────────────────────
def load_jsonl(path: str | Path) -> list[dict]:
records = []
with open(path) as f:
for line in f:
line = line.strip()
if line:
records.append(json.loads(line))
return records
def abbrev(cat: str) -> str:
return CAT_ABBREV.get(cat, cat)
def build_signal_matrix() -> tuple[dict[str, dict[str, str]], dict[str, dict[str, int]]]:
"""Build paragraphId -> {source: category_abbrev} and {source: specificity}."""
# Only for the 1200 gold PIDs
gold_pids: set[str] = set()
human_labels = load_jsonl(ROOT / "data/gold/human-labels-raw.jsonl")
for rec in human_labels:
gold_pids.add(rec["paragraphId"])
cat_matrix: dict[str, dict[str, str]] = defaultdict(dict)
spec_matrix: dict[str, dict[str, int]] = defaultdict(dict)
# 1) Human annotators (3 per paragraph)
for rec in human_labels:
pid = rec["paragraphId"]
src = f"human:{rec['annotatorName']}"
cat_matrix[pid][src] = abbrev(rec["contentCategory"])
spec_matrix[pid][src] = rec["specificityLevel"]
# 2) Stage 1 models (filter to gold PIDs)
stage1_path = ROOT / "data/annotations/stage1.patched.jsonl"
with open(stage1_path) as f:
for line in f:
rec = json.loads(line)
pid = rec["paragraphId"]
if pid not in gold_pids:
continue
model_id = rec["provenance"]["modelId"]
src = STAGE1_MODEL_SHORT.get(model_id, model_id)
cat_matrix[pid][src] = abbrev(rec["label"]["content_category"])
spec_matrix[pid][src] = rec["label"]["specificity_level"]
# 3) Opus
for rec in load_jsonl(ROOT / "data/annotations/golden/opus.jsonl"):
pid = rec["paragraphId"]
if pid in gold_pids:
cat_matrix[pid]["opus"] = abbrev(rec["label"]["content_category"])
spec_matrix[pid]["opus"] = rec["label"]["specificity_level"]
# 4) Bench-holdout models
for fn in BENCH_FILES:
src = BENCH_MODEL_SHORT[fn]
for rec in load_jsonl(ROOT / "data/annotations/bench-holdout" / fn):
pid = rec["paragraphId"]
if pid in gold_pids:
cat_matrix[pid][src] = abbrev(rec["label"]["content_category"])
spec_matrix[pid][src] = rec["label"]["specificity_level"]
return dict(cat_matrix), dict(spec_matrix)
def load_paragraphs(gold_pids: set[str]) -> dict[str, dict]:
"""Load paragraph text for gold PIDs."""
paragraphs = {}
for rec in load_jsonl(ROOT / "data/gold/paragraphs-holdout.jsonl"):
if rec["id"] in gold_pids:
paragraphs[rec["id"]] = rec
return paragraphs
# ── Analysis helpers ───────────────────────────────────────────────────────────
def find_axis_paragraphs(
cat_matrix: dict[str, dict[str, str]], a: str, b: str
) -> list[tuple[str, dict[str, str], int, int]]:
"""Find paragraphs where the primary disagreement is between categories a and b.
Returns list of (pid, signals, count_a, count_b) sorted by disagreement strength.
"""
results = []
for pid, signals in cat_matrix.items():
cats = list(signals.values())
counts = Counter(cats)
ca, cb = counts.get(a, 0), counts.get(b, 0)
if ca >= 1 and cb >= 1 and ca + cb >= len(cats) * 0.5:
# This paragraph has a meaningful split on this axis
results.append((pid, signals, ca, cb))
# Sort by how evenly split (closer to 50/50 = harder)
results.sort(key=lambda x: -min(x[2], x[3]))
return results
def truncate_text(text: str, max_chars: int = 200) -> str:
if len(text) <= max_chars:
return text
return text[:max_chars].rstrip() + "..."
def source_order() -> list[str]:
"""Canonical order for displaying sources."""
humans = [f"human:{n}" for n in ["Joey", "Anuj", "Aaryan", "Elisabeth", "Meghan", "Xander"]]
stage1 = ["s1:gemini-flash", "s1:grok-fast", "s1:mimo-flash"]
opus = ["opus"]
bench = [BENCH_MODEL_SHORT[fn] for fn in BENCH_FILES]
return humans + stage1 + opus + bench
def format_signal_breakdown(
signals: dict[str, str], axis_cats: tuple[str, str]
) -> str:
"""Format which sources said which category."""
a, b = axis_cats
a_sources = []
b_sources = []
other_sources = []
for src in source_order():
if src not in signals:
continue
cat = signals[src]
if cat == a:
a_sources.append(src)
elif cat == b:
b_sources.append(src)
else:
other_sources.append(f"{src}={cat}")
parts = [
f" {a} ({len(a_sources)}): {', '.join(a_sources)}",
f" {b} ({len(b_sources)}): {', '.join(b_sources)}",
]
if other_sources:
parts.append(f" Other: {', '.join(other_sources)}")
return "\n".join(parts)
def extract_keyword_frequencies(
paragraphs: dict[str, dict],
axis_pids: list[str],
cat_matrix: dict[str, dict[str, str]],
cat_a: str,
cat_b: str,
) -> tuple[Counter, Counter, Counter]:
"""Extract keyword frequencies for paragraphs leaning toward cat_a vs cat_b."""
# Keywords to look for (domain-relevant)
all_keywords = [
"board", "director", "committee", "audit", "oversee", "oversight",
"ciso", "officer", "chief", "vp", "vice president", "manager",
"manage", "manages", "managing", "management", "responsible",
"program", "team", "department", "staff", "personnel",
"report", "reports", "reporting", "brief", "briefing", "informed",
"incident", "breach", "attack", "compromise", "unauthorized",
"material", "immaterial", "not material", "no material",
"strategy", "strategic", "integrate", "integration", "aligned",
"risk", "assess", "assessment", "framework", "nist", "iso",
"policy", "policies", "procedure", "procedures",
"third party", "third-party", "vendor", "supplier", "service provider",
"insurance", "cyber insurance",
"training", "awareness", "employee",
"monitor", "monitoring", "detect", "detection",
"govern", "governance",
"experience", "experienced", "background", "qualification", "expertise",
"day-to-day", "daily", "operational",
"enterprise", "enterprise-wide",
"designate", "designated", "appoint", "appointed",
]
lean_a_pids = []
lean_b_pids = []
for pid in axis_pids:
signals = cat_matrix[pid]
counts = Counter(signals.values())
if counts.get(cat_a, 0) > counts.get(cat_b, 0):
lean_a_pids.append(pid)
elif counts.get(cat_b, 0) > counts.get(cat_a, 0):
lean_b_pids.append(pid)
def count_keywords(pids: list[str]) -> Counter:
kw_counts = Counter()
for pid in pids:
if pid not in paragraphs:
continue
text_lower = paragraphs[pid]["text"].lower()
for kw in all_keywords:
if kw in text_lower:
kw_counts[kw] += 1
return kw_counts
freq_a = count_keywords(lean_a_pids)
freq_b = count_keywords(lean_b_pids)
freq_all = count_keywords(axis_pids)
return freq_a, freq_b, freq_all
def analyze_human_vs_genai_splits(
axis_pids: list[str],
cat_matrix: dict[str, dict[str, str]],
cat_a: str,
cat_b: str,
) -> tuple[list[str], list[str]]:
"""Find cases where humans lean one way but GenAI leans the other."""
human_a_genai_b = [] # humans say A, GenAI says B
human_b_genai_a = [] # humans say B, GenAI says A
human_prefixes = ["human:"]
genai_prefixes = ["s1:", "opus", "bench:"]
for pid in axis_pids:
signals = cat_matrix[pid]
human_cats = []
genai_cats = []
for src, cat in signals.items():
if any(src.startswith(p) for p in human_prefixes):
human_cats.append(cat)
else:
genai_cats.append(cat)
human_a = sum(1 for c in human_cats if c == cat_a)
human_b = sum(1 for c in human_cats if c == cat_b)
genai_a = sum(1 for c in genai_cats if c == cat_a)
genai_b = sum(1 for c in genai_cats if c == cat_b)
if human_a > human_b and genai_b > genai_a:
human_a_genai_b.append(pid)
elif human_b > human_a and genai_a > genai_b:
human_b_genai_a.append(pid)
return human_a_genai_b, human_b_genai_a
# ── Main analysis ──────────────────────────────────────────────────────────────
def main():
print("=" * 100)
print("HARDEST CASES ANALYSIS: SEC CYBERSECURITY HOLDOUT DATASET")
print("Examining disagreements across 13 annotation sources to inform codebook rulings")
print("=" * 100)
# Load data
print("\nLoading data...")
cat_matrix, spec_matrix = build_signal_matrix()
gold_pids = set(cat_matrix.keys())
paragraphs = load_paragraphs(gold_pids)
print(f" Loaded {len(gold_pids)} gold paragraphs with {len(source_order())} potential sources each")
# Verify source coverage
source_coverage = Counter()
for pid in gold_pids:
for src in cat_matrix[pid]:
source_coverage[src] += 1
print("\n Source coverage:")
for src in source_order():
print(f" {src}: {source_coverage.get(src, 0)} paragraphs")
# ── Overall disagreement stats ─────────────────────────────────────────
print("\n" + "=" * 100)
print("OVERALL DISAGREEMENT STATISTICS")
print("=" * 100)
unanimous = 0
near_unanimous = 0 # 1 dissenter
split = 0
for pid in gold_pids:
cats = list(cat_matrix[pid].values())
counts = Counter(cats)
top = counts.most_common(1)[0][1]
n = len(cats)
if top == n:
unanimous += 1
elif top >= n - 1:
near_unanimous += 1
else:
split += 1
print(f"\n Unanimous (all sources agree): {unanimous} ({unanimous/len(gold_pids)*100:.1f}%)")
print(f" Near-unanimous (1 dissenter): {near_unanimous} ({near_unanimous/len(gold_pids)*100:.1f}%)")
print(f" Split (2+ dissenters): {split} ({split/len(gold_pids)*100:.1f}%)")
# Count all pairwise disagreement axes
axis_counts = Counter()
for pid in gold_pids:
cats = list(cat_matrix[pid].values())
unique = set(cats)
if len(unique) >= 2:
for c1 in unique:
for c2 in unique:
if c1 < c2:
axis_counts[(c1, c2)] += 1
print("\n All disagreement axes (paragraph has at least 1 source saying each):")
for (c1, c2), ct in axis_counts.most_common(30):
print(f" {c1} <-> {c2}: {ct} paragraphs")
# ── Axis-specific analysis ─────────────────────────────────────────────
all_axis_results = {}
for cat_a, cat_b, axis_name in AXES:
print("\n" + "=" * 100)
print(f"AXIS: {axis_name}")
print("=" * 100)
axis_pids_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
axis_pids = [x[0] for x in axis_pids_data]
all_axis_results[axis_name] = axis_pids
print(f"\n Paragraphs with primary {cat_a}/{cat_b} disagreement: {len(axis_pids)}")
if not axis_pids:
print(" No paragraphs found on this axis.")
continue
# ── Signal split statistics ────────────────────────────────────────
# Count how the split goes (majority A vs majority B)
majority_a = sum(1 for _, _, ca, cb in axis_pids_data if ca > cb)
majority_b = sum(1 for _, _, ca, cb in axis_pids_data if cb > ca)
tied = sum(1 for _, _, ca, cb in axis_pids_data if ca == cb)
print(f" Majority {cat_a}: {majority_a} | Majority {cat_b}: {majority_b} | Tied: {tied}")
# ── Human vs GenAI splits ──────────────────────────────────────────
human_a_genai_b, human_b_genai_a = analyze_human_vs_genai_splits(
axis_pids, cat_matrix, cat_a, cat_b
)
print(f"\n Human/GenAI disagreements:")
print(f" Humans say {cat_a}, GenAI says {cat_b}: {len(human_a_genai_b)}")
print(f" Humans say {cat_b}, GenAI says {cat_a}: {len(human_b_genai_a)}")
# ── Representative examples ────────────────────────────────────────
# Show hardest cases (most evenly split)
n_examples = min(10, len(axis_pids_data))
print(f"\n {'' * 90}")
print(f" TOP {n_examples} MOST CONTENTIOUS PARAGRAPHS")
print(f" {'' * 90}")
for i, (pid, signals, ca, cb) in enumerate(axis_pids_data[:n_examples]):
para = paragraphs.get(pid, {})
text = para.get("text", "[text not found]")
company = para.get("companyName", "?")
word_count = para.get("wordCount", "?")
print(f"\n [{i+1}] PID: {pid[:12]}... Company: {company}")
print(f" Words: {word_count} | Split: {ca} say {cat_a}, {cb} say {cat_b}, {len(signals)-ca-cb} say other")
print(f" Text: {truncate_text(text, 250)}")
print(format_signal_breakdown(signals, (cat_a, cat_b)))
# ── Human-A / GenAI-B examples ─────────────────────────────────────
if human_a_genai_b:
print(f"\n {'' * 90}")
print(f" HUMANS SAY {cat_a}, GenAI SAYS {cat_b} (up to 5 examples)")
print(f" {'' * 90}")
for pid in human_a_genai_b[:5]:
para = paragraphs.get(pid, {})
text = para.get("text", "[text not found]")
print(f"\n PID: {pid[:12]}...")
print(f" Text: {truncate_text(text, 250)}")
print(format_signal_breakdown(cat_matrix[pid], (cat_a, cat_b)))
if human_b_genai_a:
print(f"\n {'' * 90}")
print(f" HUMANS SAY {cat_b}, GenAI SAYS {cat_a} (up to 5 examples)")
print(f" {'' * 90}")
for pid in human_b_genai_a[:5]:
para = paragraphs.get(pid, {})
text = para.get("text", "[text not found]")
print(f"\n PID: {pid[:12]}...")
print(f" Text: {truncate_text(text, 250)}")
print(format_signal_breakdown(cat_matrix[pid], (cat_a, cat_b)))
# ── Keyword / linguistic patterns ──────────────────────────────────
print(f"\n {'' * 90}")
print(f" LINGUISTIC PATTERNS")
print(f" {'' * 90}")
freq_a, freq_b, freq_all = extract_keyword_frequencies(
paragraphs, axis_pids, cat_matrix, cat_a, cat_b
)
# Compute over-representation: keywords more common when majority says A vs B
lean_a_ct = sum(
1 for pid in axis_pids
if Counter(cat_matrix[pid].values()).get(cat_a, 0) > Counter(cat_matrix[pid].values()).get(cat_b, 0)
)
lean_b_ct = sum(
1 for pid in axis_pids
if Counter(cat_matrix[pid].values()).get(cat_b, 0) > Counter(cat_matrix[pid].values()).get(cat_a, 0)
)
print(f"\n Paragraphs leaning {cat_a}: {lean_a_ct} | leaning {cat_b}: {lean_b_ct}")
# Show keywords sorted by differential
all_kws = set(freq_a.keys()) | set(freq_b.keys())
diffs = []
for kw in all_kws:
fa = freq_a.get(kw, 0)
fb = freq_b.get(kw, 0)
total = freq_all.get(kw, 0)
if total < 3:
continue
# Normalize by group size
rate_a = fa / max(lean_a_ct, 1)
rate_b = fb / max(lean_b_ct, 1)
diff = rate_a - rate_b
diffs.append((kw, fa, fb, total, rate_a, rate_b, diff))
diffs.sort(key=lambda x: -abs(x[6]))
print(f"\n Keywords by differential (rate in {cat_a}-leaning vs {cat_b}-leaning paragraphs):")
print(f" {'Keyword':<22} {'In '+cat_a:>8} {'In '+cat_b:>8} {'Total':>8} {'Rate '+cat_a:>10} {'Rate '+cat_b:>10} {'Diff':>8}")
print(f" {''*22} {''*8} {''*8} {''*8} {''*10} {''*10} {''*8}")
for kw, fa, fb, total, ra, rb, diff in diffs[:25]:
marker = f"<- {cat_a}" if diff > 0.05 else (f"<- {cat_b}" if diff < -0.05 else "")
print(f" {kw:<22} {fa:>8} {fb:>8} {total:>8} {ra:>10.2%} {rb:>10.2%} {diff:>+8.2%} {marker}")
# ── Other notable axes ─────────────────────────────────────────────────
print("\n" + "=" * 100)
print("OTHER NOTABLE DISAGREEMENT AXES (10+ paragraphs)")
print("=" * 100)
primary_axis_set = {("BG", "MR"), ("MR", "BG"), ("MR", "RMP"), ("RMP", "MR"), ("N/O", "SI"), ("SI", "N/O")}
other_axes = []
for (c1, c2), ct in axis_counts.most_common():
if (c1, c2) not in primary_axis_set and ct >= 10:
other_axes.append((c1, c2, ct))
if not other_axes:
print("\n No other axes with 10+ paragraphs.")
else:
for cat_a, cat_b, count in other_axes:
print(f"\n {'' * 90}")
print(f" {cat_a} <-> {cat_b}: {count} paragraphs")
print(f" {'' * 90}")
axis_pids_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
# Show up to 5 examples
for i, (pid, signals, ca, cb) in enumerate(axis_pids_data[:5]):
para = paragraphs.get(pid, {})
text = para.get("text", "[text not found]")
print(f"\n [{i+1}] {truncate_text(text, 200)}")
print(f" Split: {ca}x {cat_a}, {cb}x {cat_b}")
print(format_signal_breakdown(signals, (cat_a, cat_b)))
# ── Summary statistics ─────────────────────────────────────────────────
print("\n" + "=" * 100)
print("SUMMARY STATISTICS")
print("=" * 100)
# Per-axis counts
print("\n Paragraphs on each primary confusion axis:")
for cat_a, cat_b, axis_name in AXES:
axis_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
print(f" {axis_name}: {len(axis_data)} paragraphs")
# How many could potentially be resolved by keyword rules?
print("\n Keyword-resolvable estimate (paragraphs containing strong discriminator keywords):")
mr_rmp_data = find_axis_paragraphs(cat_matrix, "MR", "RMP")
mr_rmp_pids = [x[0] for x in mr_rmp_data]
resolvable_mr_rmp = 0
mr_keywords = {"ciso", "chief information security", "chief security", "vp", "vice president",
"officer", "director of", "head of", "reports to", "reporting to"}
rmp_keywords = {"framework", "nist", "iso", "soc 2", "assessment", "penetration test",
"vulnerability scan", "audit", "tabletop"}
for pid in mr_rmp_pids:
text_lower = paragraphs.get(pid, {}).get("text", "").lower()
has_mr = any(kw in text_lower for kw in mr_keywords)
has_rmp = any(kw in text_lower for kw in rmp_keywords)
if has_mr != has_rmp: # One side but not the other
resolvable_mr_rmp += 1
print(f" MR <-> RMP: {resolvable_mr_rmp}/{len(mr_rmp_pids)} have clear keyword signal ({resolvable_mr_rmp/max(len(mr_rmp_pids),1)*100:.0f}%)")
bg_mr_data = find_axis_paragraphs(cat_matrix, "BG", "MR")
bg_mr_pids = [x[0] for x in bg_mr_data]
resolvable_bg_mr = 0
bg_keywords = {"board", "director", "committee", "audit committee", "board of directors"}
mr_only_keywords = {"ciso", "chief information security", "officer", "vp", "management",
"team", "department", "staff", "day-to-day", "operational"}
for pid in bg_mr_pids:
text_lower = paragraphs.get(pid, {}).get("text", "").lower()
has_bg = any(kw in text_lower for kw in bg_keywords)
has_mr_only = any(kw in text_lower for kw in mr_only_keywords)
if has_bg and not has_mr_only:
resolvable_bg_mr += 1
elif has_mr_only and not has_bg:
resolvable_bg_mr += 1
print(f" BG <-> MR: {resolvable_bg_mr}/{len(bg_mr_pids)} have clear keyword signal ({resolvable_bg_mr/max(len(bg_mr_pids),1)*100:.0f}%)")
si_no_data = find_axis_paragraphs(cat_matrix, "SI", "N/O")
si_no_pids = [x[0] for x in si_no_data]
resolvable_si_no = 0
si_keywords = {"incident", "breach", "attack", "compromise", "unauthorized access",
"ransomware", "malware", "phishing", "data loss", "disruption"}
no_keywords = {"no material", "not material", "have not experienced", "no known",
"not aware of any", "not been subject"}
for pid in si_no_pids:
text_lower = paragraphs.get(pid, {}).get("text", "").lower()
has_si = any(kw in text_lower for kw in si_keywords)
has_no = any(kw in text_lower for kw in no_keywords)
if has_no:
resolvable_si_no += 1
elif has_si and not has_no:
resolvable_si_no += 1
print(f" SI <-> N/O: {resolvable_si_no}/{len(si_no_pids)} have clear keyword signal ({resolvable_si_no/max(len(si_no_pids),1)*100:.0f}%)")
# ── Specificity disagreements on confused paragraphs ───────────────────
print("\n" + "=" * 100)
print("SPECIFICITY DISAGREEMENT ON CONFUSED PARAGRAPHS")
print("=" * 100)
for cat_a, cat_b, axis_name in AXES:
axis_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
if not axis_data:
continue
spec_ranges = []
for pid, signals, _, _ in axis_data:
specs = list(spec_matrix.get(pid, {}).values())
if specs:
spec_ranges.append(max(specs) - min(specs))
if spec_ranges:
avg_range = np.mean(spec_ranges)
print(f"\n {axis_name}: avg specificity range = {avg_range:.2f} (0=agree, 3=max disagree)")
range_dist = Counter(spec_ranges)
for r in sorted(range_dist.keys()):
print(f" Range {r}: {range_dist[r]} paragraphs")
# ── Recommended codebook rulings ───────────────────────────────────────
print("\n" + "=" * 100)
print("RECOMMENDED CODEBOOK RULINGS")
print("=" * 100)
print("""
Based on the analysis above, the following rulings would resolve the most cases:
RULING 1: MR vs RMP — "Named-role test"
──────────────────────────────────────────
If the paragraph's PRIMARY subject is a named individual, titled role (CISO, VP,
CTO, etc.), or a specific person's responsibilities/qualifications/experience,
classify as MR. If the paragraph's PRIMARY subject is a process, program, system,
or methodology (even if it mentions who runs it), classify as RMP.
Disambiguator: Ask "Is this paragraph ABOUT a person/role, or ABOUT a process?"
- "Our CISO oversees our cybersecurity program" → MR (about the CISO)
- "Our cybersecurity program includes monitoring, led by the CISO" → RMP (about the program)
RULING 2: BG vs MR — "Board-line test"
──────────────────────────────────────────
If the paragraph describes oversight, reporting, or governance AT or ABOVE the
board/committee level, classify as BG. If it describes responsibilities BELOW
the board level (C-suite officers reporting TO the board, management teams,
operational roles), classify as MR.
Disambiguator: "Does this paragraph describe what the board/committee DOES,
or what someone REPORTS TO the board?"
- "The Audit Committee oversees cybersecurity risk" → BG
- "The CISO reports quarterly to the Audit Committee" → BG (board's receiving mechanism)
- "The CISO manages a team of security analysts" → MR
Key edge case: When a paragraph describes BOTH board oversight AND management
roles, classify by the paragraph's PRIMARY focus. If roughly equal, prefer BG
when board action is the grammatical subject.
RULING 3: SI vs N/O — "Negative-incident test"
──────────────────────────────────────────
Negative incident statements ("we have not experienced any material cybersecurity
incidents") should be classified as N/O, NOT as SI. SI requires disclosure of an
ACTUAL incident that occurred. The mere mention of incidents in a negation context
does not constitute incident disclosure.
However: If the paragraph describes a SPECIFIC past incident (even if resolved or
deemed immaterial), classify as SI. The test is: "Did something actually happen?"
- "We have not experienced material incidents" → N/O
- "In 2023, we experienced a ransomware attack that..." → SI
- "We experienced incidents but none were material" → SI (something happened)
""")
# ── Deep dive: the very hardest cases ──────────────────────────────────
print("=" * 100)
print("DEEP DIVE: PARAGRAPHS WITH MAXIMUM ENTROPY (4+ DISTINCT CATEGORIES)")
print("=" * 100)
high_entropy = []
for pid in gold_pids:
cats = list(cat_matrix[pid].values())
n_unique = len(set(cats))
if n_unique >= 4:
high_entropy.append((pid, n_unique, Counter(cats)))
high_entropy.sort(key=lambda x: -x[1])
print(f"\n {len(high_entropy)} paragraphs with 4+ distinct category labels")
for i, (pid, n_unique, counts) in enumerate(high_entropy[:10]):
para = paragraphs.get(pid, {})
text = para.get("text", "[text not found]")
print(f"\n [{i+1}] PID: {pid[:12]}... ({n_unique} categories)")
print(f" Text: {truncate_text(text, 250)}")
print(f" Distribution: {dict(counts.most_common())}")
# Show all sources
for src in source_order():
if src in cat_matrix[pid]:
cat = cat_matrix[pid][src]
spec = spec_matrix.get(pid, {}).get(src, "?")
print(f" {src:<25} {cat:<5} spec={spec}")
# ── Per-source accuracy vs human majority ──────────────────────────────
print("\n" + "=" * 100)
print("GENAI SOURCE AGREEMENT WITH HUMAN MAJORITY (on axis-confused paragraphs only)")
print("=" * 100)
for cat_a, cat_b, axis_name in AXES:
axis_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
if not axis_data:
continue
print(f"\n {axis_name} ({len(axis_data)} paragraphs):")
# For each paragraph, determine human majority
genai_sources = [s for s in source_order() if not s.startswith("human:")]
source_agree = {s: 0 for s in genai_sources}
source_total = {s: 0 for s in genai_sources}
for pid, signals, _, _ in axis_data:
# Human majority on this axis
human_cats = [
signals[s] for s in signals
if s.startswith("human:") and signals[s] in (cat_a, cat_b)
]
if not human_cats:
continue
human_majority = Counter(human_cats).most_common(1)[0][0]
for src in genai_sources:
if src in signals:
source_total[src] += 1
if signals[src] == human_majority:
source_agree[src] += 1
print(f" {'Source':<25} {'Agree':>8} {'Total':>8} {'Rate':>8}")
print(f" {''*25} {''*8} {''*8} {''*8}")
for src in genai_sources:
total = source_total[src]
agree = source_agree[src]
rate = agree / max(total, 1)
print(f" {src:<25} {agree:>8} {total:>8} {rate:>8.1%}")
print("\n" + "=" * 100)
print("END OF ANALYSIS")
print("=" * 100)
if __name__ == "__main__":
main()