SEC-cyBERT/scripts/adjudicate-gold-experiment.py
2026-04-03 14:43:53 -04:00

626 lines
29 KiB
Python

"""
Gold Set Adjudication — Experimental Harness
=============================================
Runs the adjudication pipeline with toggleable interventions, one variable
at a time, and produces comparable metrics for each configuration.
Experiments:
baseline — Current production adjudication (92 T5 cases)
exp1_gemini — Exclude Gemini from MR↔RMP axis when Gemini voted MR
exp2_board — Board-removal test overrides for BG↔RMP T5 cases
exp3_committee — Committee-level test overrides for BG↔MR T5 cases
exp4_idsi — ID↔SI volume-dominant tiebreaker
exp5_spec — Specificity hybrid (human unanimous → human, split → model)
combined — All validated interventions stacked
Usage:
uv run scripts/adjudicate-gold-experiment.py [experiment_name|all]
"""
import json
import sys
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
# ── IMPORTS FROM PRODUCTION SCRIPT ──────────────────────────────────────
# These are the existing overrides from adjudicate-gold.py, kept identical
# so the baseline matches production exactly.
SI_NO_OVERRIDES: dict[str, tuple[str, str]] = {
"026c8eca": ("None/Other", "Speculation: 'could potentially result in' -- no materiality assessment"),
"160fec46": ("None/Other", "Resource lament: 'do not have manpower' -- no materiality assessment"),
"1f29ea8c": ("None/Other", "Speculation: 'could have material adverse effect' boilerplate"),
"20c70335": ("None/Other", "Risk list: 'A breach could lead to...' -- enumeration, not assessment"),
"303685cf": ("None/Other", "Speculation: 'could materially adversely affect'"),
"7d021fcc": ("None/Other", "Speculation: 'could...have a material adverse effect'"),
"7ef53cab": ("None/Other", "Risk enumeration: 'could lead to... could disrupt... could steal...'"),
"a0d01951": ("None/Other", "Speculation: 'could adversely affect our business'"),
"aaa8974b": ("None/Other", "Speculation: 'could potentially have a material impact' -- Case 9 fix"),
"b058dca1": ("None/Other", "Speculation: 'could disrupt our operations'"),
"b1b216b6": ("None/Other", "Speculation: 'could materially adversely affect'"),
"dc8a2798": ("None/Other", "Speculation: 'If compromised, we could be subject to...'"),
"e4bd0e2f": ("None/Other", "Speculation: 'could have material adverse impact'"),
"f4656a7e": ("None/Other", "Threat enumeration under SI-sounding header -- no assessment"),
"2e8cbdbf": ("None/Other", "Cross-ref: 'We describe whether and how... under the headings [risk factors]'"),
"75de7441": ("None/Other", "Cross-ref: 'We describe whether and how... under the heading [risk factor]'"),
"78cad2a1": ("None/Other", "Cross-ref: 'In our Risk Factors, we describe whether and how...'"),
"3879887f": ("None/Other", "Brief incident mention + 'See Item 1A' cross-reference"),
"f026f2be": ("None/Other", "Risk factor heading/cross-reference -- not an assessment"),
"5df3a6c9": ("None/Other", "IT importance statement -- no assessment. H=1/3 SI"),
"d5dc17c2": ("None/Other", "Risk enumeration -- no assessment. H=1/3 SI"),
"c10f2a54": ("None/Other", "Early-stage/SPAC + weak negative assertion. SPAC rule dominates"),
"45961c99": ("None/Other", "Past disruption but no materiality language. Primarily speculation"),
"1673f332": ("None/Other", "SPAC with assessment at end -- SPAC rule dominates per Case 8"),
"f75ac78a": ("Risk Management Process", "Resource expenditure on cybersecurity -- RMP per person-removal test"),
"367108c2": ("Strategy Integration", "Negative assertion: 'not aware of having experienced any prior material data breaches'"),
"837e31d5": ("Strategy Integration", "Negative assertion: 'did not experience any cybersecurity incident during 2024'"),
}
T5_CODEBOOK_OVERRIDES: dict[str, tuple[str, str]] = {
"15e7cf99": ("Strategy Integration", "SI/ID tiebreaker: 'have not encountered any risks' -- materiality assessment, no specific incident described"),
"6dc6bb4a": ("Incident Disclosure", "SI/ID tiebreaker: 'ransomware attack in October 2021' -- describes specific incident with date"),
"c71739a9": ("Risk Management Process", "TP/RMP: Fund relies on CCO and adviser's risk management expertise -- third parties supporting internal process"),
}
# ── EXPERIMENT-SPECIFIC OVERRIDES ───────────────────────────────────────
# Exp 2/3: Board-removal + committee-level test overrides (with-board paragraphs)
# These 5 paragraphs mention "board" so the automated no-board test can't catch them.
# Each read manually; board-removal test applied to determine if board mention is
# incidental or substantive.
MANUAL_BOARD_OVERRIDES: dict[str, tuple[str, str]] = {
# Board = 1/5 sentences + final notification clause. CISO/ISIRT/incident
# response plan dominate the content. Board oversight is incidental attribution.
"22da6695": ("Risk Management Process",
"Board-removal: 'Board is also responsible for approval' (1 sentence) + "
"'notifying the Board' (final clause). Remove → CISO + IS Program + incident "
"response plan. Process dominates."),
# Titled 'Management's Role.' Compliance Committee = management-level (CIO,
# executives). Board mentioned 2x as information destination only.
"a2ff7e1e": ("Management Role",
"Committee-level: Compliance Committee is management-level (O'Reilly executives). "
"Board is incidental destination (2 clauses). Titled 'Management's Role.'"),
# Very brief (3 sentences). Management oversees + board notification + 'Public
# Offering' (registration statement). Board is incident notification only.
"cb518f47": ("Management Role",
"Board-removal: remove notification sentence → 'management oversees cybersecurity.' "
"Board is incident notification destination only. Brief paragraph."),
}
# Exp 4: Codebook tiebreaker overrides (beyond existing T5_CODEBOOK_OVERRIDES)
# Each paragraph read in full and classified by codebook rules.
CODEBOOK_OVERRIDES: dict[str, tuple[str, str]] = {
# ── ID↔SI: negative assertion = materiality assessment → SI ──────────
"0ceeb618": ("Strategy Integration",
"ID/SI: Opens with negative assertion ('no material incidents'), Feb 2025 "
"incident is brief context + 'has not had material impact' conclusion. "
"Materiality assessment frame dominates → SI"),
"cc82eb9f": ("Strategy Integration",
"ID/SI: June 2018 incident is example within broader negative materiality "
"assertion ('have not materially affected us'). Assessment frame dominates → SI"),
# ── SPAC rule (Case 8): pre-revenue company → N/O ────────────────────
"203ccd43": ("None/Other",
"SPAC: 'once the Company commences operations' — pre-revenue company. "
"Case 8: SPAC → N/O regardless of management role language"),
# ── ID→RMP: post-incident improvements, no incident described ────────
"f549fd64": ("Risk Management Process",
"ID/RMP: 'Following this cybersecurity event' — refers to incident without "
"describing it. 100% of content is hardening, training, MFA, EDR — pure RMP"),
}
@dataclass
class ExperimentConfig:
name: str
description: str
exclude_gemini_mr_rmp: bool = False
apply_board_removal: bool = False
apply_committee_level: bool = False
apply_idsi_tiebreaker: bool = False
apply_specificity_hybrid: bool = False
# Text-based: remove BG model votes when "board" absent from paragraph text
apply_no_board_bg_removal: bool = False
@dataclass
class ExperimentResult:
config: ExperimentConfig
total: int = 0
tier_counts: dict[str, int] = field(default_factory=dict)
category_dist: dict[str, int] = field(default_factory=dict)
human_maj_dist: dict[str, int] = field(default_factory=dict)
flipped_from_human: int = 0
source_accuracy: dict[str, float] = field(default_factory=dict)
t5_by_axis: dict[str, int] = field(default_factory=dict)
t5_weak_plurality: int = 0 # 4-5/9
results: list[dict] = field(default_factory=list)
spec_changes: int = 0
def load_jsonl(path: Path) -> list[dict]:
with open(path) as f:
return [json.loads(line) for line in f]
def majority_vote(votes: list[str]) -> str | None:
if not votes:
return None
return Counter(votes).most_common(1)[0][0]
def get_confusion_axis(human_votes: dict, model_votes: dict) -> str:
"""Identify the confusion axis from vote distributions."""
all_cats = sorted(set(list(human_votes.keys()) + list(model_votes.keys())))
if len(all_cats) == 2:
return f"{all_cats[0]}{all_cats[1]}"
return "".join(all_cats)
def run_experiment(config: ExperimentConfig) -> ExperimentResult:
"""Run adjudication with a specific experimental configuration."""
# ── Load data ─────────────────────────────────────────────────────
human_labels: dict[str, list[dict]] = defaultdict(list)
for r in load_jsonl(ROOT / "data/gold/human-labels-raw.jsonl"):
human_labels[r["paragraphId"]].append({
"cat": r["contentCategory"],
"spec": r["specificityLevel"],
"annotator": r["annotatorName"],
})
confusion_pids = {r["paragraphId"] for r in load_jsonl(ROOT / "data/gold/holdout-rerun-v35.jsonl")}
TOP6 = ["Opus", "GPT-5.4", "Gemini", "GLM-5", "Kimi", "MIMO"]
def load_model_cats(files: dict[str, Path]) -> dict[str, dict[str, str]]:
result: dict[str, dict[str, str]] = {}
for name, path in files.items():
result[name] = {}
if path.exists():
for r in load_jsonl(path):
cat = r.get("label", {}).get("content_category") or r.get("content_category")
if cat:
result[name][r["paragraphId"]] = cat
# Also load specificity for exp5
result[f"{name}_spec"] = {}
if path.exists():
for r in load_jsonl(path):
spec = r.get("label", {}).get("specificity_level") or r.get("specificity_level")
if spec is not None:
result[f"{name}_spec"][r["paragraphId"]] = spec
return result
v30_cats = load_model_cats({
"Opus": ROOT / "data/annotations/golden/opus.jsonl",
"GPT-5.4": ROOT / "data/annotations/bench-holdout/gpt-5.4.jsonl",
"Gemini": ROOT / "data/annotations/bench-holdout/gemini-3.1-pro-preview.jsonl",
"GLM-5": ROOT / "data/annotations/bench-holdout/glm-5:exacto.jsonl",
"Kimi": ROOT / "data/annotations/bench-holdout/kimi-k2.5.jsonl",
"MIMO": ROOT / "data/annotations/bench-holdout/mimo-v2-pro:exacto.jsonl",
})
v35_cats = load_model_cats({
"Opus": ROOT / "data/annotations/golden-v35/opus.jsonl",
"GPT-5.4": ROOT / "data/annotations/bench-holdout-v35/gpt-5.4.jsonl",
"Gemini": ROOT / "data/annotations/bench-holdout-v35/gemini-3.1-pro-preview.jsonl",
"GLM-5": ROOT / "data/annotations/bench-holdout-v35/glm-5:exacto.jsonl",
"Kimi": ROOT / "data/annotations/bench-holdout-v35/kimi-k2.5.jsonl",
"MIMO": ROOT / "data/annotations/bench-holdout-v35/mimo-v2-pro:exacto.jsonl",
})
# Merge v3.0 + v3.5 (v3.5 for confusion PIDs)
model_cats: dict[str, dict[str, str]] = {}
model_specs: dict[str, dict[str, int]] = {}
for m in TOP6:
model_cats[m] = {}
model_specs[m] = {}
for pid in human_labels:
if pid in confusion_pids and pid in v35_cats.get(m, {}):
model_cats[m][pid] = v35_cats[m][pid]
elif pid in v30_cats.get(m, {}):
model_cats[m][pid] = v30_cats[m][pid]
# Specificity (always v3.0 for full coverage)
if pid in v30_cats.get(f"{m}_spec", {}):
model_specs[m][pid] = v30_cats[f"{m}_spec"][pid]
# ── Adjudicate ────────────────────────────────────────────────────
result = ExperimentResult(config=config)
tier_counts: Counter[str] = Counter()
for pid in sorted(human_labels.keys()):
h_cats = [l["cat"] for l in human_labels[pid]]
h_specs = [l["spec"] for l in human_labels[pid]]
h_cat_maj = majority_vote(h_cats)
h_spec_maj = majority_vote(h_specs)
h_spec_unanimous = len(set(h_specs)) == 1
# Use full model panel for tier calculation (T1-T4 stability)
active_models = list(TOP6)
m_cats_list = [model_cats[m][pid] for m in active_models if pid in model_cats[m]]
m_cat_maj = majority_vote(m_cats_list)
m_cat_unanimous = len(set(m_cats_list)) == 1 and len(m_cats_list) == len(active_models)
all_signals = h_cats + m_cats_list
signal_counter = Counter(all_signals)
total_signals = len(all_signals)
top_signal, top_count = signal_counter.most_common(1)[0]
short_pid = pid[:8]
si_override = SI_NO_OVERRIDES.get(short_pid)
gold_cat: str | None = None
tier: str = ""
reason: str = ""
if si_override:
gold_cat = si_override[0]
tier = "T3-rule"
reason = f"SI/NO override: {si_override[1]}"
elif top_count >= 8 and total_signals >= 8:
gold_cat = top_signal
tier = "T1-super"
reason = f"{top_count}/{total_signals} signals agree"
elif h_cat_maj == m_cat_maj:
gold_cat = h_cat_maj
tier = "T2-cross"
reason = "Human + model majority agree"
elif m_cat_unanimous:
gold_cat = m_cat_maj
tier = "T4-model"
h_count = Counter(h_cats).most_common(1)[0][1]
reason = f"{len(m_cats_list)}/{len(m_cats_list)} models unanimous ({m_cat_maj}) vs human {h_count}/3 ({h_cat_maj})"
else:
# Check rule-based overrides
t5_override = T5_CODEBOOK_OVERRIDES.get(short_pid)
# Exp 2/3: Manual board-removal + committee-level test (with-board paragraphs)
board_override = MANUAL_BOARD_OVERRIDES.get(short_pid) if (config.apply_board_removal or config.apply_committee_level) else None
# Exp 4: Codebook tiebreaker overrides
codebook_override = CODEBOOK_OVERRIDES.get(short_pid) if config.apply_idsi_tiebreaker else None
if t5_override:
gold_cat = t5_override[0]
tier = "T3-rule"
reason = f"T5 codebook override: {t5_override[1]}"
elif board_override:
gold_cat = board_override[0]
tier = "T3-rule"
reason = f"Board/committee test: {board_override[1]}"
elif codebook_override:
gold_cat = codebook_override[0]
tier = "T3-rule"
reason = f"Codebook tiebreaker: {codebook_override[1]}"
else:
t5_signals = list(all_signals)
t5_total = total_signals
suffix = ""
# ── Exp 1: Gemini exclusion at T5 resolution only ─────
if config.exclude_gemini_mr_rmp:
gemini_cat = model_cats.get("Gemini", {}).get(pid)
if gemini_cat == "Management Role":
other_m_cats = [model_cats[m][pid] for m in TOP6 if m != "Gemini" and pid in model_cats[m]]
other_m_maj = majority_vote(other_m_cats) if other_m_cats else None
if other_m_maj != "Management Role":
t5_signals = h_cats + other_m_cats
t5_total = len(t5_signals)
suffix += " [Gemini MR excluded]"
# ── Exp 2b: No-board BG vote removal ─────────────────
# If "board" (case-insensitive) doesn't appear in the paragraph
# text, BG model votes are provably unsupported — the paragraph
# can't be about board governance if it never mentions the board.
# Remove those BG signals and recalculate plurality.
if config.apply_no_board_bg_removal:
para_texts = load_paragraph_texts()
para_text = para_texts.get(pid, "")
if "board" not in para_text.lower():
bg_count = sum(1 for s in t5_signals if s == "Board Governance")
if bg_count > 0:
t5_signals = [s for s in t5_signals if s != "Board Governance"]
t5_total = len(t5_signals)
if t5_signals:
suffix += f" [BG removed: no 'board' in text, {bg_count} votes dropped]"
if t5_signals:
t5_counter = Counter(t5_signals)
t5_top, t5_top_count = t5_counter.most_common(1)[0]
else:
t5_top, t5_top_count = top_signal, top_count
gold_cat = t5_top
tier = "T5-plurality"
reason = f"Mixed: human={h_cat_maj}, model={m_cat_maj}, plurality={t5_top} ({t5_top_count}/{t5_total}){suffix}"
# ── Specificity ───────────────────────────────────────────────
if config.apply_specificity_hybrid and not h_spec_unanimous:
# Human split → use model majority
m_specs = [model_specs[m][pid] for m in TOP6 if pid in model_specs[m]]
if m_specs:
gold_spec = majority_vote([str(s) for s in m_specs])
gold_spec = int(gold_spec) if gold_spec else h_spec_maj
if gold_spec != h_spec_maj:
result.spec_changes += 1
else:
gold_spec = h_spec_maj
else:
gold_spec = h_spec_maj
tier_counts[tier] += 1
row = {
"paragraphId": pid,
"gold_category": gold_cat,
"gold_specificity": gold_spec,
"tier": tier,
"reason": reason,
"human_majority": h_cat_maj,
"model_majority": m_cat_maj,
"human_votes": dict(Counter(h_cats)),
"model_votes": dict(Counter(m_cats_list)),
}
result.results.append(row)
if tier == "T5-plurality":
axis = get_confusion_axis(dict(Counter(h_cats)), dict(Counter(m_cats_list)))
result.t5_by_axis[axis] = result.t5_by_axis.get(axis, 0) + 1
if top_count <= 5:
result.t5_weak_plurality += 1
result.total = len(result.results)
result.tier_counts = dict(sorted(tier_counts.items()))
result.flipped_from_human = sum(1 for r in result.results if r["gold_category"] != r["human_majority"])
result.category_dist = dict(Counter(r["gold_category"] for r in result.results))
result.human_maj_dist = dict(Counter(r["human_majority"] for r in result.results))
# Source accuracy vs gold
gold_by_pid = {r["paragraphId"]: r["gold_category"] for r in result.results}
# Human annotators
annotator_names = sorted(set(l["annotator"] for labels in human_labels.values() for l in labels))
for ann in annotator_names:
agree = total = 0
for pid, labels in human_labels.items():
for l in labels:
if l["annotator"] == ann and pid in gold_by_pid:
total += 1
if l["cat"] == gold_by_pid[pid]:
agree += 1
if total > 0:
result.source_accuracy[f"H:{ann}"] = agree / total
# Models (v3.0 on full 1200)
for m in TOP6:
agree = total = 0
for pid in gold_by_pid:
if pid in v30_cats.get(m, {}):
total += 1
if v30_cats[m][pid] == gold_by_pid[pid]:
agree += 1
if total > 0:
result.source_accuracy[f"M:{m}"] = agree / total
return result
def print_result(r: ExperimentResult, baseline: ExperimentResult | None = None) -> None:
"""Print experiment results with optional delta from baseline."""
print(f"\n{'=' * 90}")
print(f"EXPERIMENT: {r.config.name}")
print(f" {r.config.description}")
print(f"{'=' * 90}")
print(f"\nTier distribution:")
for tier in ["T1-super", "T2-cross", "T3-rule", "T4-model", "T5-plurality"]:
count = r.tier_counts.get(tier, 0)
pct = count / r.total * 100
delta = ""
if baseline:
bc = baseline.tier_counts.get(tier, 0)
if count != bc:
delta = f"{count - bc:+d})"
print(f" {tier:<16} {count:>5} ({pct:.1f}%){delta}")
print(f"\nGold ≠ human majority: {r.flipped_from_human} ({r.flipped_from_human / r.total:.1%})")
if baseline and r.flipped_from_human != baseline.flipped_from_human:
print(f"{r.flipped_from_human - baseline.flipped_from_human:+d})")
if r.t5_by_axis:
t5_total = sum(r.t5_by_axis.values())
print(f"\nT5 remaining ({t5_total} cases):")
for axis, count in sorted(r.t5_by_axis.items(), key=lambda x: -x[1])[:10]:
print(f" {axis:<60} {count:>3}")
print(f" Weak plurality (4-5/9): {r.t5_weak_plurality}")
print(f"\nCategory distribution (gold):")
all_cats = sorted(set(list(r.category_dist.keys()) + list(r.human_maj_dist.keys())))
print(f" {'Category':<25} {'Gold':>6} {'H-Maj':>6} {'Δ':>5}", end="")
if baseline:
print(f" {'Prev':>6} {'ΔExp':>5}", end="")
print()
for cat in all_cats:
g = r.category_dist.get(cat, 0)
h = r.human_maj_dist.get(cat, 0)
line = f" {cat:<25} {g:>6} {h:>6} {g - h:>+5}"
if baseline:
bg = baseline.category_dist.get(cat, 0)
line += f" {bg:>6} {g - bg:>+5}"
print(line)
print(f"\nSource accuracy vs gold:")
# Sort by accuracy descending
for source, acc in sorted(r.source_accuracy.items(), key=lambda x: -x[1]):
delta = ""
if baseline and source in baseline.source_accuracy:
ba = baseline.source_accuracy[source]
diff = acc - ba
if abs(diff) >= 0.0005:
delta = f"{diff:+.1%})"
print(f" {source:<16} {acc:.1%}{delta}")
if r.config.apply_specificity_hybrid:
print(f"\nSpecificity: {r.spec_changes} labels changed from human majority to model majority")
def diff_results(a: ExperimentResult, b: ExperimentResult) -> list[dict]:
"""Find paragraphs where gold_category differs between two experiments."""
a_map = {r["paragraphId"]: r for r in a.results}
b_map = {r["paragraphId"]: r for r in b.results}
diffs = []
for pid in sorted(a_map.keys()):
if a_map[pid]["gold_category"] != b_map[pid]["gold_category"]:
diffs.append({
"paragraphId": pid,
"before": a_map[pid]["gold_category"],
"after": b_map[pid]["gold_category"],
"before_tier": a_map[pid]["tier"],
"after_tier": b_map[pid]["tier"],
"human_majority": a_map[pid]["human_majority"],
"reason_after": b_map[pid]["reason"],
})
return diffs
# ── PARAGRAPH TEXT LOADER (for text-based tests) ───────────────────────
_paragraph_texts: dict[str, str] | None = None
def load_paragraph_texts() -> dict[str, str]:
global _paragraph_texts
if _paragraph_texts is None:
_paragraph_texts = {}
for r in load_jsonl(ROOT / "data/gold/paragraphs-holdout.jsonl"):
_paragraph_texts[r["id"]] = r["text"]
return _paragraph_texts
EXPERIMENTS = {
"baseline": ExperimentConfig(
name="baseline",
description="Current production adjudication (no changes)",
),
"exp1_gemini": ExperimentConfig(
name="exp1_gemini",
description="Exclude Gemini from MR↔RMP axis when Gemini voted MR",
exclude_gemini_mr_rmp=True,
),
"exp2_board": ExperimentConfig(
name="exp2_board",
description="Board-removal test overrides for BG↔RMP T5 cases",
apply_board_removal=True,
),
"exp2b_noboard": ExperimentConfig(
name="exp2b_noboard",
description="Remove BG model votes when 'board' absent from paragraph text (automated, verifiable)",
apply_no_board_bg_removal=True,
),
"exp3_committee": ExperimentConfig(
name="exp3_committee",
description="Committee-level test overrides for BG↔MR T5 cases",
apply_committee_level=True,
),
"exp4_idsi": ExperimentConfig(
name="exp4_idsi",
description="ID↔SI volume-dominant tiebreaker",
apply_idsi_tiebreaker=True,
),
"exp5_spec": ExperimentConfig(
name="exp5_spec",
description="Specificity hybrid: human unanimous → human, split → model majority",
apply_specificity_hybrid=True,
),
"combined": ExperimentConfig(
name="combined",
description="All validated interventions: no-board BG removal + manual board overrides + codebook tiebreakers + specificity hybrid",
apply_no_board_bg_removal=True,
apply_board_removal=True,
apply_idsi_tiebreaker=True,
apply_specificity_hybrid=True,
),
}
def main() -> None:
experiments_to_run = sys.argv[1:] if len(sys.argv) > 1 else ["all"]
if "all" in experiments_to_run:
experiments_to_run = list(EXPERIMENTS.keys())
# Always run baseline first
if "baseline" not in experiments_to_run:
experiments_to_run.insert(0, "baseline")
results: dict[str, ExperimentResult] = {}
baseline: ExperimentResult | None = None
for exp_name in experiments_to_run:
if exp_name not in EXPERIMENTS:
print(f"Unknown experiment: {exp_name}")
continue
r = run_experiment(EXPERIMENTS[exp_name])
results[exp_name] = r
if exp_name == "baseline":
baseline = r
print_result(r)
else:
print_result(r, baseline)
# Show specific label changes
if baseline:
diffs = diff_results(baseline, r)
if diffs:
print(f"\n Label changes ({len(diffs)}):")
for d in diffs:
print(f" {d['paragraphId'][:8]}: {d['before']:<25}{d['after']:<25} (H={d['human_majority']}) [{d['after_tier']}]")
# ── Acceptance criteria check ─────────────────────────────────────
if baseline and len(results) > 1:
print(f"\n{'=' * 90}")
print("ACCEPTANCE CRITERIA SUMMARY")
print(f"{'=' * 90}")
print(f"\nCriteria:")
print(f" 1. T5 count decreases (fewer arbitrary resolutions)")
print(f" 2. Source accuracy: no model/human drops >1% (intervention isn't distorting)")
print(f" 3. Category distribution: no category shifts >±5% of its baseline count")
print(f" 4. Changes are principled (each has documented codebook justification)")
print()
for exp_name, r in results.items():
if exp_name == "baseline":
continue
t5_base = baseline.tier_counts.get("T5-plurality", 0)
t5_exp = r.tier_counts.get("T5-plurality", 0)
t5_pass = t5_exp <= t5_base
max_acc_drop = 0.0
for source in baseline.source_accuracy:
if source in r.source_accuracy:
drop = baseline.source_accuracy[source] - r.source_accuracy[source]
max_acc_drop = max(max_acc_drop, drop)
acc_pass = max_acc_drop < 0.01
max_cat_shift_pct = 0.0
for cat in baseline.category_dist:
base_n = baseline.category_dist.get(cat, 0)
exp_n = r.category_dist.get(cat, 0)
if base_n > 0:
shift = abs(exp_n - base_n) / base_n
max_cat_shift_pct = max(max_cat_shift_pct, shift)
cat_pass = max_cat_shift_pct < 0.05
status = "✓ PASS" if (t5_pass and acc_pass and cat_pass) else "✗ FAIL"
print(f" {exp_name:<20} {status}")
print(f" T5: {t5_base}{t5_exp}{t5_exp - t5_base:+d}) {'' if t5_pass else ''}")
print(f" Max accuracy drop: {max_acc_drop:.2%} {'' if acc_pass else ''}")
print(f" Max category shift: {max_cat_shift_pct:.1%} {'' if cat_pass else ''}")
if __name__ == "__main__":
main()