737 lines
30 KiB
Python
737 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
"""Examine hardest disagreement cases in the SEC cybersecurity holdout dataset.
|
|
|
|
Identifies paragraphs where the 13 annotation sources split on the three main
|
|
confusion axes (MR<->RMP, BG<->MR, SI<->N/O), shows representative examples,
|
|
extracts linguistic patterns, and recommends codebook rulings.
|
|
|
|
Run: uv run --with numpy scripts/examine-hard-cases.py
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import textwrap
|
|
from collections import Counter, defaultdict
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
# ── Constants ──────────────────────────────────────────────────────────────────
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
|
|
CAT_ABBREV = {
|
|
"Board Governance": "BG",
|
|
"Incident Disclosure": "ID",
|
|
"Management Role": "MR",
|
|
"None/Other": "N/O",
|
|
"Risk Management Process": "RMP",
|
|
"Strategy Integration": "SI",
|
|
"Third-Party Risk": "TPR",
|
|
}
|
|
ABBREV_CAT = {v: k for k, v in CAT_ABBREV.items()}
|
|
|
|
AXES = [
|
|
("MR", "RMP", "MR <-> RMP"),
|
|
("BG", "MR", "BG <-> MR"),
|
|
("SI", "N/O", "SI <-> N/O"),
|
|
]
|
|
|
|
BENCH_FILES = [
|
|
"gpt-5.4.jsonl",
|
|
"gemini-3.1-pro-preview.jsonl",
|
|
"glm-5:exacto.jsonl",
|
|
"kimi-k2.5.jsonl",
|
|
"mimo-v2-pro:exacto.jsonl",
|
|
"minimax-m2.7:exacto.jsonl",
|
|
]
|
|
|
|
STAGE1_MODEL_SHORT = {
|
|
"google/gemini-3.1-flash-lite-preview": "s1:gemini-flash",
|
|
"x-ai/grok-4.1-fast": "s1:grok-fast",
|
|
"xiaomi/mimo-v2-flash": "s1:mimo-flash",
|
|
}
|
|
|
|
BENCH_MODEL_SHORT = {
|
|
"gpt-5.4.jsonl": "bench:gpt5.4",
|
|
"gemini-3.1-pro-preview.jsonl": "bench:gemini-pro",
|
|
"glm-5:exacto.jsonl": "bench:glm5",
|
|
"kimi-k2.5.jsonl": "bench:kimi",
|
|
"mimo-v2-pro:exacto.jsonl": "bench:mimo-pro",
|
|
"minimax-m2.7:exacto.jsonl": "bench:minimax",
|
|
}
|
|
|
|
|
|
# ── Load data ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def load_jsonl(path: str | Path) -> list[dict]:
|
|
records = []
|
|
with open(path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
records.append(json.loads(line))
|
|
return records
|
|
|
|
|
|
def abbrev(cat: str) -> str:
|
|
return CAT_ABBREV.get(cat, cat)
|
|
|
|
|
|
def build_signal_matrix() -> tuple[dict[str, dict[str, str]], dict[str, dict[str, int]]]:
|
|
"""Build paragraphId -> {source: category_abbrev} and {source: specificity}."""
|
|
# Only for the 1200 gold PIDs
|
|
gold_pids: set[str] = set()
|
|
human_labels = load_jsonl(ROOT / "data/gold/human-labels-raw.jsonl")
|
|
for rec in human_labels:
|
|
gold_pids.add(rec["paragraphId"])
|
|
|
|
cat_matrix: dict[str, dict[str, str]] = defaultdict(dict)
|
|
spec_matrix: dict[str, dict[str, int]] = defaultdict(dict)
|
|
|
|
# 1) Human annotators (3 per paragraph)
|
|
for rec in human_labels:
|
|
pid = rec["paragraphId"]
|
|
src = f"human:{rec['annotatorName']}"
|
|
cat_matrix[pid][src] = abbrev(rec["contentCategory"])
|
|
spec_matrix[pid][src] = rec["specificityLevel"]
|
|
|
|
# 2) Stage 1 models (filter to gold PIDs)
|
|
stage1_path = ROOT / "data/annotations/stage1.patched.jsonl"
|
|
with open(stage1_path) as f:
|
|
for line in f:
|
|
rec = json.loads(line)
|
|
pid = rec["paragraphId"]
|
|
if pid not in gold_pids:
|
|
continue
|
|
model_id = rec["provenance"]["modelId"]
|
|
src = STAGE1_MODEL_SHORT.get(model_id, model_id)
|
|
cat_matrix[pid][src] = abbrev(rec["label"]["content_category"])
|
|
spec_matrix[pid][src] = rec["label"]["specificity_level"]
|
|
|
|
# 3) Opus
|
|
for rec in load_jsonl(ROOT / "data/annotations/golden/opus.jsonl"):
|
|
pid = rec["paragraphId"]
|
|
if pid in gold_pids:
|
|
cat_matrix[pid]["opus"] = abbrev(rec["label"]["content_category"])
|
|
spec_matrix[pid]["opus"] = rec["label"]["specificity_level"]
|
|
|
|
# 4) Bench-holdout models
|
|
for fn in BENCH_FILES:
|
|
src = BENCH_MODEL_SHORT[fn]
|
|
for rec in load_jsonl(ROOT / "data/annotations/bench-holdout" / fn):
|
|
pid = rec["paragraphId"]
|
|
if pid in gold_pids:
|
|
cat_matrix[pid][src] = abbrev(rec["label"]["content_category"])
|
|
spec_matrix[pid][src] = rec["label"]["specificity_level"]
|
|
|
|
return dict(cat_matrix), dict(spec_matrix)
|
|
|
|
|
|
def load_paragraphs(gold_pids: set[str]) -> dict[str, dict]:
|
|
"""Load paragraph text for gold PIDs."""
|
|
paragraphs = {}
|
|
for rec in load_jsonl(ROOT / "data/gold/paragraphs-holdout.jsonl"):
|
|
if rec["id"] in gold_pids:
|
|
paragraphs[rec["id"]] = rec
|
|
return paragraphs
|
|
|
|
|
|
# ── Analysis helpers ───────────────────────────────────────────────────────────
|
|
|
|
|
|
def find_axis_paragraphs(
|
|
cat_matrix: dict[str, dict[str, str]], a: str, b: str
|
|
) -> list[tuple[str, dict[str, str], int, int]]:
|
|
"""Find paragraphs where the primary disagreement is between categories a and b.
|
|
|
|
Returns list of (pid, signals, count_a, count_b) sorted by disagreement strength.
|
|
"""
|
|
results = []
|
|
for pid, signals in cat_matrix.items():
|
|
cats = list(signals.values())
|
|
counts = Counter(cats)
|
|
ca, cb = counts.get(a, 0), counts.get(b, 0)
|
|
if ca >= 1 and cb >= 1 and ca + cb >= len(cats) * 0.5:
|
|
# This paragraph has a meaningful split on this axis
|
|
results.append((pid, signals, ca, cb))
|
|
# Sort by how evenly split (closer to 50/50 = harder)
|
|
results.sort(key=lambda x: -min(x[2], x[3]))
|
|
return results
|
|
|
|
|
|
def truncate_text(text: str, max_chars: int = 200) -> str:
|
|
if len(text) <= max_chars:
|
|
return text
|
|
return text[:max_chars].rstrip() + "..."
|
|
|
|
|
|
def source_order() -> list[str]:
|
|
"""Canonical order for displaying sources."""
|
|
humans = [f"human:{n}" for n in ["Joey", "Anuj", "Aaryan", "Elisabeth", "Meghan", "Xander"]]
|
|
stage1 = ["s1:gemini-flash", "s1:grok-fast", "s1:mimo-flash"]
|
|
opus = ["opus"]
|
|
bench = [BENCH_MODEL_SHORT[fn] for fn in BENCH_FILES]
|
|
return humans + stage1 + opus + bench
|
|
|
|
|
|
def format_signal_breakdown(
|
|
signals: dict[str, str], axis_cats: tuple[str, str]
|
|
) -> str:
|
|
"""Format which sources said which category."""
|
|
a, b = axis_cats
|
|
a_sources = []
|
|
b_sources = []
|
|
other_sources = []
|
|
for src in source_order():
|
|
if src not in signals:
|
|
continue
|
|
cat = signals[src]
|
|
if cat == a:
|
|
a_sources.append(src)
|
|
elif cat == b:
|
|
b_sources.append(src)
|
|
else:
|
|
other_sources.append(f"{src}={cat}")
|
|
|
|
parts = [
|
|
f" {a} ({len(a_sources)}): {', '.join(a_sources)}",
|
|
f" {b} ({len(b_sources)}): {', '.join(b_sources)}",
|
|
]
|
|
if other_sources:
|
|
parts.append(f" Other: {', '.join(other_sources)}")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def extract_keyword_frequencies(
|
|
paragraphs: dict[str, dict],
|
|
axis_pids: list[str],
|
|
cat_matrix: dict[str, dict[str, str]],
|
|
cat_a: str,
|
|
cat_b: str,
|
|
) -> tuple[Counter, Counter, Counter]:
|
|
"""Extract keyword frequencies for paragraphs leaning toward cat_a vs cat_b."""
|
|
# Keywords to look for (domain-relevant)
|
|
all_keywords = [
|
|
"board", "director", "committee", "audit", "oversee", "oversight",
|
|
"ciso", "officer", "chief", "vp", "vice president", "manager",
|
|
"manage", "manages", "managing", "management", "responsible",
|
|
"program", "team", "department", "staff", "personnel",
|
|
"report", "reports", "reporting", "brief", "briefing", "informed",
|
|
"incident", "breach", "attack", "compromise", "unauthorized",
|
|
"material", "immaterial", "not material", "no material",
|
|
"strategy", "strategic", "integrate", "integration", "aligned",
|
|
"risk", "assess", "assessment", "framework", "nist", "iso",
|
|
"policy", "policies", "procedure", "procedures",
|
|
"third party", "third-party", "vendor", "supplier", "service provider",
|
|
"insurance", "cyber insurance",
|
|
"training", "awareness", "employee",
|
|
"monitor", "monitoring", "detect", "detection",
|
|
"govern", "governance",
|
|
"experience", "experienced", "background", "qualification", "expertise",
|
|
"day-to-day", "daily", "operational",
|
|
"enterprise", "enterprise-wide",
|
|
"designate", "designated", "appoint", "appointed",
|
|
]
|
|
|
|
lean_a_pids = []
|
|
lean_b_pids = []
|
|
for pid in axis_pids:
|
|
signals = cat_matrix[pid]
|
|
counts = Counter(signals.values())
|
|
if counts.get(cat_a, 0) > counts.get(cat_b, 0):
|
|
lean_a_pids.append(pid)
|
|
elif counts.get(cat_b, 0) > counts.get(cat_a, 0):
|
|
lean_b_pids.append(pid)
|
|
|
|
def count_keywords(pids: list[str]) -> Counter:
|
|
kw_counts = Counter()
|
|
for pid in pids:
|
|
if pid not in paragraphs:
|
|
continue
|
|
text_lower = paragraphs[pid]["text"].lower()
|
|
for kw in all_keywords:
|
|
if kw in text_lower:
|
|
kw_counts[kw] += 1
|
|
return kw_counts
|
|
|
|
freq_a = count_keywords(lean_a_pids)
|
|
freq_b = count_keywords(lean_b_pids)
|
|
freq_all = count_keywords(axis_pids)
|
|
|
|
return freq_a, freq_b, freq_all
|
|
|
|
|
|
def analyze_human_vs_genai_splits(
|
|
axis_pids: list[str],
|
|
cat_matrix: dict[str, dict[str, str]],
|
|
cat_a: str,
|
|
cat_b: str,
|
|
) -> tuple[list[str], list[str]]:
|
|
"""Find cases where humans lean one way but GenAI leans the other."""
|
|
human_a_genai_b = [] # humans say A, GenAI says B
|
|
human_b_genai_a = [] # humans say B, GenAI says A
|
|
|
|
human_prefixes = ["human:"]
|
|
genai_prefixes = ["s1:", "opus", "bench:"]
|
|
|
|
for pid in axis_pids:
|
|
signals = cat_matrix[pid]
|
|
human_cats = []
|
|
genai_cats = []
|
|
for src, cat in signals.items():
|
|
if any(src.startswith(p) for p in human_prefixes):
|
|
human_cats.append(cat)
|
|
else:
|
|
genai_cats.append(cat)
|
|
|
|
human_a = sum(1 for c in human_cats if c == cat_a)
|
|
human_b = sum(1 for c in human_cats if c == cat_b)
|
|
genai_a = sum(1 for c in genai_cats if c == cat_a)
|
|
genai_b = sum(1 for c in genai_cats if c == cat_b)
|
|
|
|
if human_a > human_b and genai_b > genai_a:
|
|
human_a_genai_b.append(pid)
|
|
elif human_b > human_a and genai_a > genai_b:
|
|
human_b_genai_a.append(pid)
|
|
|
|
return human_a_genai_b, human_b_genai_a
|
|
|
|
|
|
# ── Main analysis ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
def main():
|
|
print("=" * 100)
|
|
print("HARDEST CASES ANALYSIS: SEC CYBERSECURITY HOLDOUT DATASET")
|
|
print("Examining disagreements across 13 annotation sources to inform codebook rulings")
|
|
print("=" * 100)
|
|
|
|
# Load data
|
|
print("\nLoading data...")
|
|
cat_matrix, spec_matrix = build_signal_matrix()
|
|
gold_pids = set(cat_matrix.keys())
|
|
paragraphs = load_paragraphs(gold_pids)
|
|
print(f" Loaded {len(gold_pids)} gold paragraphs with {len(source_order())} potential sources each")
|
|
|
|
# Verify source coverage
|
|
source_coverage = Counter()
|
|
for pid in gold_pids:
|
|
for src in cat_matrix[pid]:
|
|
source_coverage[src] += 1
|
|
print("\n Source coverage:")
|
|
for src in source_order():
|
|
print(f" {src}: {source_coverage.get(src, 0)} paragraphs")
|
|
|
|
# ── Overall disagreement stats ─────────────────────────────────────────
|
|
|
|
print("\n" + "=" * 100)
|
|
print("OVERALL DISAGREEMENT STATISTICS")
|
|
print("=" * 100)
|
|
|
|
unanimous = 0
|
|
near_unanimous = 0 # 1 dissenter
|
|
split = 0
|
|
for pid in gold_pids:
|
|
cats = list(cat_matrix[pid].values())
|
|
counts = Counter(cats)
|
|
top = counts.most_common(1)[0][1]
|
|
n = len(cats)
|
|
if top == n:
|
|
unanimous += 1
|
|
elif top >= n - 1:
|
|
near_unanimous += 1
|
|
else:
|
|
split += 1
|
|
|
|
print(f"\n Unanimous (all sources agree): {unanimous} ({unanimous/len(gold_pids)*100:.1f}%)")
|
|
print(f" Near-unanimous (1 dissenter): {near_unanimous} ({near_unanimous/len(gold_pids)*100:.1f}%)")
|
|
print(f" Split (2+ dissenters): {split} ({split/len(gold_pids)*100:.1f}%)")
|
|
|
|
# Count all pairwise disagreement axes
|
|
axis_counts = Counter()
|
|
for pid in gold_pids:
|
|
cats = list(cat_matrix[pid].values())
|
|
unique = set(cats)
|
|
if len(unique) >= 2:
|
|
for c1 in unique:
|
|
for c2 in unique:
|
|
if c1 < c2:
|
|
axis_counts[(c1, c2)] += 1
|
|
|
|
print("\n All disagreement axes (paragraph has at least 1 source saying each):")
|
|
for (c1, c2), ct in axis_counts.most_common(30):
|
|
print(f" {c1} <-> {c2}: {ct} paragraphs")
|
|
|
|
# ── Axis-specific analysis ─────────────────────────────────────────────
|
|
|
|
all_axis_results = {}
|
|
|
|
for cat_a, cat_b, axis_name in AXES:
|
|
print("\n" + "=" * 100)
|
|
print(f"AXIS: {axis_name}")
|
|
print("=" * 100)
|
|
|
|
axis_pids_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
|
|
axis_pids = [x[0] for x in axis_pids_data]
|
|
all_axis_results[axis_name] = axis_pids
|
|
|
|
print(f"\n Paragraphs with primary {cat_a}/{cat_b} disagreement: {len(axis_pids)}")
|
|
|
|
if not axis_pids:
|
|
print(" No paragraphs found on this axis.")
|
|
continue
|
|
|
|
# ── Signal split statistics ────────────────────────────────────────
|
|
|
|
# Count how the split goes (majority A vs majority B)
|
|
majority_a = sum(1 for _, _, ca, cb in axis_pids_data if ca > cb)
|
|
majority_b = sum(1 for _, _, ca, cb in axis_pids_data if cb > ca)
|
|
tied = sum(1 for _, _, ca, cb in axis_pids_data if ca == cb)
|
|
print(f" Majority {cat_a}: {majority_a} | Majority {cat_b}: {majority_b} | Tied: {tied}")
|
|
|
|
# ── Human vs GenAI splits ──────────────────────────────────────────
|
|
|
|
human_a_genai_b, human_b_genai_a = analyze_human_vs_genai_splits(
|
|
axis_pids, cat_matrix, cat_a, cat_b
|
|
)
|
|
print(f"\n Human/GenAI disagreements:")
|
|
print(f" Humans say {cat_a}, GenAI says {cat_b}: {len(human_a_genai_b)}")
|
|
print(f" Humans say {cat_b}, GenAI says {cat_a}: {len(human_b_genai_a)}")
|
|
|
|
# ── Representative examples ────────────────────────────────────────
|
|
|
|
# Show hardest cases (most evenly split)
|
|
n_examples = min(10, len(axis_pids_data))
|
|
print(f"\n {'─' * 90}")
|
|
print(f" TOP {n_examples} MOST CONTENTIOUS PARAGRAPHS")
|
|
print(f" {'─' * 90}")
|
|
|
|
for i, (pid, signals, ca, cb) in enumerate(axis_pids_data[:n_examples]):
|
|
para = paragraphs.get(pid, {})
|
|
text = para.get("text", "[text not found]")
|
|
company = para.get("companyName", "?")
|
|
word_count = para.get("wordCount", "?")
|
|
|
|
print(f"\n [{i+1}] PID: {pid[:12]}... Company: {company}")
|
|
print(f" Words: {word_count} | Split: {ca} say {cat_a}, {cb} say {cat_b}, {len(signals)-ca-cb} say other")
|
|
print(f" Text: {truncate_text(text, 250)}")
|
|
print(format_signal_breakdown(signals, (cat_a, cat_b)))
|
|
|
|
# ── Human-A / GenAI-B examples ─────────────────────────────────────
|
|
|
|
if human_a_genai_b:
|
|
print(f"\n {'─' * 90}")
|
|
print(f" HUMANS SAY {cat_a}, GenAI SAYS {cat_b} (up to 5 examples)")
|
|
print(f" {'─' * 90}")
|
|
for pid in human_a_genai_b[:5]:
|
|
para = paragraphs.get(pid, {})
|
|
text = para.get("text", "[text not found]")
|
|
print(f"\n PID: {pid[:12]}...")
|
|
print(f" Text: {truncate_text(text, 250)}")
|
|
print(format_signal_breakdown(cat_matrix[pid], (cat_a, cat_b)))
|
|
|
|
if human_b_genai_a:
|
|
print(f"\n {'─' * 90}")
|
|
print(f" HUMANS SAY {cat_b}, GenAI SAYS {cat_a} (up to 5 examples)")
|
|
print(f" {'─' * 90}")
|
|
for pid in human_b_genai_a[:5]:
|
|
para = paragraphs.get(pid, {})
|
|
text = para.get("text", "[text not found]")
|
|
print(f"\n PID: {pid[:12]}...")
|
|
print(f" Text: {truncate_text(text, 250)}")
|
|
print(format_signal_breakdown(cat_matrix[pid], (cat_a, cat_b)))
|
|
|
|
# ── Keyword / linguistic patterns ──────────────────────────────────
|
|
|
|
print(f"\n {'─' * 90}")
|
|
print(f" LINGUISTIC PATTERNS")
|
|
print(f" {'─' * 90}")
|
|
|
|
freq_a, freq_b, freq_all = extract_keyword_frequencies(
|
|
paragraphs, axis_pids, cat_matrix, cat_a, cat_b
|
|
)
|
|
|
|
# Compute over-representation: keywords more common when majority says A vs B
|
|
lean_a_ct = sum(
|
|
1 for pid in axis_pids
|
|
if Counter(cat_matrix[pid].values()).get(cat_a, 0) > Counter(cat_matrix[pid].values()).get(cat_b, 0)
|
|
)
|
|
lean_b_ct = sum(
|
|
1 for pid in axis_pids
|
|
if Counter(cat_matrix[pid].values()).get(cat_b, 0) > Counter(cat_matrix[pid].values()).get(cat_a, 0)
|
|
)
|
|
|
|
print(f"\n Paragraphs leaning {cat_a}: {lean_a_ct} | leaning {cat_b}: {lean_b_ct}")
|
|
|
|
# Show keywords sorted by differential
|
|
all_kws = set(freq_a.keys()) | set(freq_b.keys())
|
|
diffs = []
|
|
for kw in all_kws:
|
|
fa = freq_a.get(kw, 0)
|
|
fb = freq_b.get(kw, 0)
|
|
total = freq_all.get(kw, 0)
|
|
if total < 3:
|
|
continue
|
|
# Normalize by group size
|
|
rate_a = fa / max(lean_a_ct, 1)
|
|
rate_b = fb / max(lean_b_ct, 1)
|
|
diff = rate_a - rate_b
|
|
diffs.append((kw, fa, fb, total, rate_a, rate_b, diff))
|
|
|
|
diffs.sort(key=lambda x: -abs(x[6]))
|
|
|
|
print(f"\n Keywords by differential (rate in {cat_a}-leaning vs {cat_b}-leaning paragraphs):")
|
|
print(f" {'Keyword':<22} {'In '+cat_a:>8} {'In '+cat_b:>8} {'Total':>8} {'Rate '+cat_a:>10} {'Rate '+cat_b:>10} {'Diff':>8}")
|
|
print(f" {'─'*22} {'─'*8} {'─'*8} {'─'*8} {'─'*10} {'─'*10} {'─'*8}")
|
|
for kw, fa, fb, total, ra, rb, diff in diffs[:25]:
|
|
marker = f"<- {cat_a}" if diff > 0.05 else (f"<- {cat_b}" if diff < -0.05 else "")
|
|
print(f" {kw:<22} {fa:>8} {fb:>8} {total:>8} {ra:>10.2%} {rb:>10.2%} {diff:>+8.2%} {marker}")
|
|
|
|
# ── Other notable axes ─────────────────────────────────────────────────
|
|
|
|
print("\n" + "=" * 100)
|
|
print("OTHER NOTABLE DISAGREEMENT AXES (10+ paragraphs)")
|
|
print("=" * 100)
|
|
|
|
primary_axis_set = {("BG", "MR"), ("MR", "BG"), ("MR", "RMP"), ("RMP", "MR"), ("N/O", "SI"), ("SI", "N/O")}
|
|
|
|
other_axes = []
|
|
for (c1, c2), ct in axis_counts.most_common():
|
|
if (c1, c2) not in primary_axis_set and ct >= 10:
|
|
other_axes.append((c1, c2, ct))
|
|
|
|
if not other_axes:
|
|
print("\n No other axes with 10+ paragraphs.")
|
|
else:
|
|
for cat_a, cat_b, count in other_axes:
|
|
print(f"\n {'─' * 90}")
|
|
print(f" {cat_a} <-> {cat_b}: {count} paragraphs")
|
|
print(f" {'─' * 90}")
|
|
|
|
axis_pids_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
|
|
# Show up to 5 examples
|
|
for i, (pid, signals, ca, cb) in enumerate(axis_pids_data[:5]):
|
|
para = paragraphs.get(pid, {})
|
|
text = para.get("text", "[text not found]")
|
|
print(f"\n [{i+1}] {truncate_text(text, 200)}")
|
|
print(f" Split: {ca}x {cat_a}, {cb}x {cat_b}")
|
|
print(format_signal_breakdown(signals, (cat_a, cat_b)))
|
|
|
|
# ── Summary statistics ─────────────────────────────────────────────────
|
|
|
|
print("\n" + "=" * 100)
|
|
print("SUMMARY STATISTICS")
|
|
print("=" * 100)
|
|
|
|
# Per-axis counts
|
|
print("\n Paragraphs on each primary confusion axis:")
|
|
for cat_a, cat_b, axis_name in AXES:
|
|
axis_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
|
|
print(f" {axis_name}: {len(axis_data)} paragraphs")
|
|
|
|
# How many could potentially be resolved by keyword rules?
|
|
print("\n Keyword-resolvable estimate (paragraphs containing strong discriminator keywords):")
|
|
|
|
mr_rmp_data = find_axis_paragraphs(cat_matrix, "MR", "RMP")
|
|
mr_rmp_pids = [x[0] for x in mr_rmp_data]
|
|
resolvable_mr_rmp = 0
|
|
mr_keywords = {"ciso", "chief information security", "chief security", "vp", "vice president",
|
|
"officer", "director of", "head of", "reports to", "reporting to"}
|
|
rmp_keywords = {"framework", "nist", "iso", "soc 2", "assessment", "penetration test",
|
|
"vulnerability scan", "audit", "tabletop"}
|
|
for pid in mr_rmp_pids:
|
|
text_lower = paragraphs.get(pid, {}).get("text", "").lower()
|
|
has_mr = any(kw in text_lower for kw in mr_keywords)
|
|
has_rmp = any(kw in text_lower for kw in rmp_keywords)
|
|
if has_mr != has_rmp: # One side but not the other
|
|
resolvable_mr_rmp += 1
|
|
print(f" MR <-> RMP: {resolvable_mr_rmp}/{len(mr_rmp_pids)} have clear keyword signal ({resolvable_mr_rmp/max(len(mr_rmp_pids),1)*100:.0f}%)")
|
|
|
|
bg_mr_data = find_axis_paragraphs(cat_matrix, "BG", "MR")
|
|
bg_mr_pids = [x[0] for x in bg_mr_data]
|
|
resolvable_bg_mr = 0
|
|
bg_keywords = {"board", "director", "committee", "audit committee", "board of directors"}
|
|
mr_only_keywords = {"ciso", "chief information security", "officer", "vp", "management",
|
|
"team", "department", "staff", "day-to-day", "operational"}
|
|
for pid in bg_mr_pids:
|
|
text_lower = paragraphs.get(pid, {}).get("text", "").lower()
|
|
has_bg = any(kw in text_lower for kw in bg_keywords)
|
|
has_mr_only = any(kw in text_lower for kw in mr_only_keywords)
|
|
if has_bg and not has_mr_only:
|
|
resolvable_bg_mr += 1
|
|
elif has_mr_only and not has_bg:
|
|
resolvable_bg_mr += 1
|
|
print(f" BG <-> MR: {resolvable_bg_mr}/{len(bg_mr_pids)} have clear keyword signal ({resolvable_bg_mr/max(len(bg_mr_pids),1)*100:.0f}%)")
|
|
|
|
si_no_data = find_axis_paragraphs(cat_matrix, "SI", "N/O")
|
|
si_no_pids = [x[0] for x in si_no_data]
|
|
resolvable_si_no = 0
|
|
si_keywords = {"incident", "breach", "attack", "compromise", "unauthorized access",
|
|
"ransomware", "malware", "phishing", "data loss", "disruption"}
|
|
no_keywords = {"no material", "not material", "have not experienced", "no known",
|
|
"not aware of any", "not been subject"}
|
|
for pid in si_no_pids:
|
|
text_lower = paragraphs.get(pid, {}).get("text", "").lower()
|
|
has_si = any(kw in text_lower for kw in si_keywords)
|
|
has_no = any(kw in text_lower for kw in no_keywords)
|
|
if has_no:
|
|
resolvable_si_no += 1
|
|
elif has_si and not has_no:
|
|
resolvable_si_no += 1
|
|
print(f" SI <-> N/O: {resolvable_si_no}/{len(si_no_pids)} have clear keyword signal ({resolvable_si_no/max(len(si_no_pids),1)*100:.0f}%)")
|
|
|
|
# ── Specificity disagreements on confused paragraphs ───────────────────
|
|
|
|
print("\n" + "=" * 100)
|
|
print("SPECIFICITY DISAGREEMENT ON CONFUSED PARAGRAPHS")
|
|
print("=" * 100)
|
|
|
|
for cat_a, cat_b, axis_name in AXES:
|
|
axis_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
|
|
if not axis_data:
|
|
continue
|
|
spec_ranges = []
|
|
for pid, signals, _, _ in axis_data:
|
|
specs = list(spec_matrix.get(pid, {}).values())
|
|
if specs:
|
|
spec_ranges.append(max(specs) - min(specs))
|
|
if spec_ranges:
|
|
avg_range = np.mean(spec_ranges)
|
|
print(f"\n {axis_name}: avg specificity range = {avg_range:.2f} (0=agree, 3=max disagree)")
|
|
range_dist = Counter(spec_ranges)
|
|
for r in sorted(range_dist.keys()):
|
|
print(f" Range {r}: {range_dist[r]} paragraphs")
|
|
|
|
# ── Recommended codebook rulings ───────────────────────────────────────
|
|
|
|
print("\n" + "=" * 100)
|
|
print("RECOMMENDED CODEBOOK RULINGS")
|
|
print("=" * 100)
|
|
|
|
print("""
|
|
Based on the analysis above, the following rulings would resolve the most cases:
|
|
|
|
RULING 1: MR vs RMP — "Named-role test"
|
|
──────────────────────────────────────────
|
|
If the paragraph's PRIMARY subject is a named individual, titled role (CISO, VP,
|
|
CTO, etc.), or a specific person's responsibilities/qualifications/experience,
|
|
classify as MR. If the paragraph's PRIMARY subject is a process, program, system,
|
|
or methodology (even if it mentions who runs it), classify as RMP.
|
|
|
|
Disambiguator: Ask "Is this paragraph ABOUT a person/role, or ABOUT a process?"
|
|
- "Our CISO oversees our cybersecurity program" → MR (about the CISO)
|
|
- "Our cybersecurity program includes monitoring, led by the CISO" → RMP (about the program)
|
|
|
|
RULING 2: BG vs MR — "Board-line test"
|
|
──────────────────────────────────────────
|
|
If the paragraph describes oversight, reporting, or governance AT or ABOVE the
|
|
board/committee level, classify as BG. If it describes responsibilities BELOW
|
|
the board level (C-suite officers reporting TO the board, management teams,
|
|
operational roles), classify as MR.
|
|
|
|
Disambiguator: "Does this paragraph describe what the board/committee DOES,
|
|
or what someone REPORTS TO the board?"
|
|
- "The Audit Committee oversees cybersecurity risk" → BG
|
|
- "The CISO reports quarterly to the Audit Committee" → BG (board's receiving mechanism)
|
|
- "The CISO manages a team of security analysts" → MR
|
|
|
|
Key edge case: When a paragraph describes BOTH board oversight AND management
|
|
roles, classify by the paragraph's PRIMARY focus. If roughly equal, prefer BG
|
|
when board action is the grammatical subject.
|
|
|
|
RULING 3: SI vs N/O — "Negative-incident test"
|
|
──────────────────────────────────────────
|
|
Negative incident statements ("we have not experienced any material cybersecurity
|
|
incidents") should be classified as N/O, NOT as SI. SI requires disclosure of an
|
|
ACTUAL incident that occurred. The mere mention of incidents in a negation context
|
|
does not constitute incident disclosure.
|
|
|
|
However: If the paragraph describes a SPECIFIC past incident (even if resolved or
|
|
deemed immaterial), classify as SI. The test is: "Did something actually happen?"
|
|
- "We have not experienced material incidents" → N/O
|
|
- "In 2023, we experienced a ransomware attack that..." → SI
|
|
- "We experienced incidents but none were material" → SI (something happened)
|
|
""")
|
|
|
|
# ── Deep dive: the very hardest cases ──────────────────────────────────
|
|
|
|
print("=" * 100)
|
|
print("DEEP DIVE: PARAGRAPHS WITH MAXIMUM ENTROPY (4+ DISTINCT CATEGORIES)")
|
|
print("=" * 100)
|
|
|
|
high_entropy = []
|
|
for pid in gold_pids:
|
|
cats = list(cat_matrix[pid].values())
|
|
n_unique = len(set(cats))
|
|
if n_unique >= 4:
|
|
high_entropy.append((pid, n_unique, Counter(cats)))
|
|
|
|
high_entropy.sort(key=lambda x: -x[1])
|
|
print(f"\n {len(high_entropy)} paragraphs with 4+ distinct category labels")
|
|
|
|
for i, (pid, n_unique, counts) in enumerate(high_entropy[:10]):
|
|
para = paragraphs.get(pid, {})
|
|
text = para.get("text", "[text not found]")
|
|
print(f"\n [{i+1}] PID: {pid[:12]}... ({n_unique} categories)")
|
|
print(f" Text: {truncate_text(text, 250)}")
|
|
print(f" Distribution: {dict(counts.most_common())}")
|
|
# Show all sources
|
|
for src in source_order():
|
|
if src in cat_matrix[pid]:
|
|
cat = cat_matrix[pid][src]
|
|
spec = spec_matrix.get(pid, {}).get(src, "?")
|
|
print(f" {src:<25} {cat:<5} spec={spec}")
|
|
|
|
# ── Per-source accuracy vs human majority ──────────────────────────────
|
|
|
|
print("\n" + "=" * 100)
|
|
print("GENAI SOURCE AGREEMENT WITH HUMAN MAJORITY (on axis-confused paragraphs only)")
|
|
print("=" * 100)
|
|
|
|
for cat_a, cat_b, axis_name in AXES:
|
|
axis_data = find_axis_paragraphs(cat_matrix, cat_a, cat_b)
|
|
if not axis_data:
|
|
continue
|
|
|
|
print(f"\n {axis_name} ({len(axis_data)} paragraphs):")
|
|
|
|
# For each paragraph, determine human majority
|
|
genai_sources = [s for s in source_order() if not s.startswith("human:")]
|
|
source_agree = {s: 0 for s in genai_sources}
|
|
source_total = {s: 0 for s in genai_sources}
|
|
|
|
for pid, signals, _, _ in axis_data:
|
|
# Human majority on this axis
|
|
human_cats = [
|
|
signals[s] for s in signals
|
|
if s.startswith("human:") and signals[s] in (cat_a, cat_b)
|
|
]
|
|
if not human_cats:
|
|
continue
|
|
human_majority = Counter(human_cats).most_common(1)[0][0]
|
|
|
|
for src in genai_sources:
|
|
if src in signals:
|
|
source_total[src] += 1
|
|
if signals[src] == human_majority:
|
|
source_agree[src] += 1
|
|
|
|
print(f" {'Source':<25} {'Agree':>8} {'Total':>8} {'Rate':>8}")
|
|
print(f" {'─'*25} {'─'*8} {'─'*8} {'─'*8}")
|
|
for src in genai_sources:
|
|
total = source_total[src]
|
|
agree = source_agree[src]
|
|
rate = agree / max(total, 1)
|
|
print(f" {src:<25} {agree:>8} {total:>8} {rate:>8.1%}")
|
|
|
|
print("\n" + "=" * 100)
|
|
print("END OF ANALYSIS")
|
|
print("=" * 100)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|