""" Gold Set Adjudication — Experimental Harness ============================================= Runs the adjudication pipeline with toggleable interventions, one variable at a time, and produces comparable metrics for each configuration. Experiments: baseline — Current production adjudication (92 T5 cases) exp1_gemini — Exclude Gemini from MR↔RMP axis when Gemini voted MR exp2_board — Board-removal test overrides for BG↔RMP T5 cases exp3_committee — Committee-level test overrides for BG↔MR T5 cases exp4_idsi — ID↔SI volume-dominant tiebreaker exp5_spec — Specificity hybrid (human unanimous → human, split → model) combined — All validated interventions stacked Usage: uv run scripts/adjudicate-gold-experiment.py [experiment_name|all] """ import json import sys from collections import Counter, defaultdict from dataclasses import dataclass, field from pathlib import Path ROOT = Path(__file__).resolve().parent.parent # ── IMPORTS FROM PRODUCTION SCRIPT ────────────────────────────────────── # These are the existing overrides from adjudicate-gold.py, kept identical # so the baseline matches production exactly. SI_NO_OVERRIDES: dict[str, tuple[str, str]] = { "026c8eca": ("None/Other", "Speculation: 'could potentially result in' -- no materiality assessment"), "160fec46": ("None/Other", "Resource lament: 'do not have manpower' -- no materiality assessment"), "1f29ea8c": ("None/Other", "Speculation: 'could have material adverse effect' boilerplate"), "20c70335": ("None/Other", "Risk list: 'A breach could lead to...' -- enumeration, not assessment"), "303685cf": ("None/Other", "Speculation: 'could materially adversely affect'"), "7d021fcc": ("None/Other", "Speculation: 'could...have a material adverse effect'"), "7ef53cab": ("None/Other", "Risk enumeration: 'could lead to... could disrupt... could steal...'"), "a0d01951": ("None/Other", "Speculation: 'could adversely affect our business'"), "aaa8974b": ("None/Other", "Speculation: 'could potentially have a material impact' -- Case 9 fix"), "b058dca1": ("None/Other", "Speculation: 'could disrupt our operations'"), "b1b216b6": ("None/Other", "Speculation: 'could materially adversely affect'"), "dc8a2798": ("None/Other", "Speculation: 'If compromised, we could be subject to...'"), "e4bd0e2f": ("None/Other", "Speculation: 'could have material adverse impact'"), "f4656a7e": ("None/Other", "Threat enumeration under SI-sounding header -- no assessment"), "2e8cbdbf": ("None/Other", "Cross-ref: 'We describe whether and how... under the headings [risk factors]'"), "75de7441": ("None/Other", "Cross-ref: 'We describe whether and how... under the heading [risk factor]'"), "78cad2a1": ("None/Other", "Cross-ref: 'In our Risk Factors, we describe whether and how...'"), "3879887f": ("None/Other", "Brief incident mention + 'See Item 1A' cross-reference"), "f026f2be": ("None/Other", "Risk factor heading/cross-reference -- not an assessment"), "5df3a6c9": ("None/Other", "IT importance statement -- no assessment. H=1/3 SI"), "d5dc17c2": ("None/Other", "Risk enumeration -- no assessment. H=1/3 SI"), "c10f2a54": ("None/Other", "Early-stage/SPAC + weak negative assertion. SPAC rule dominates"), "45961c99": ("None/Other", "Past disruption but no materiality language. Primarily speculation"), "1673f332": ("None/Other", "SPAC with assessment at end -- SPAC rule dominates per Case 8"), "f75ac78a": ("Risk Management Process", "Resource expenditure on cybersecurity -- RMP per person-removal test"), "367108c2": ("Strategy Integration", "Negative assertion: 'not aware of having experienced any prior material data breaches'"), "837e31d5": ("Strategy Integration", "Negative assertion: 'did not experience any cybersecurity incident during 2024'"), } T5_CODEBOOK_OVERRIDES: dict[str, tuple[str, str]] = { "15e7cf99": ("Strategy Integration", "SI/ID tiebreaker: 'have not encountered any risks' -- materiality assessment, no specific incident described"), "6dc6bb4a": ("Incident Disclosure", "SI/ID tiebreaker: 'ransomware attack in October 2021' -- describes specific incident with date"), "c71739a9": ("Risk Management Process", "TP/RMP: Fund relies on CCO and adviser's risk management expertise -- third parties supporting internal process"), } # ── EXPERIMENT-SPECIFIC OVERRIDES ─────────────────────────────────────── # Exp 2/3: Board-removal + committee-level test overrides (with-board paragraphs) # These 5 paragraphs mention "board" so the automated no-board test can't catch them. # Each read manually; board-removal test applied to determine if board mention is # incidental or substantive. MANUAL_BOARD_OVERRIDES: dict[str, tuple[str, str]] = { # Board = 1/5 sentences + final notification clause. CISO/ISIRT/incident # response plan dominate the content. Board oversight is incidental attribution. "22da6695": ("Risk Management Process", "Board-removal: 'Board is also responsible for approval' (1 sentence) + " "'notifying the Board' (final clause). Remove → CISO + IS Program + incident " "response plan. Process dominates."), # Titled 'Management's Role.' Compliance Committee = management-level (CIO, # executives). Board mentioned 2x as information destination only. "a2ff7e1e": ("Management Role", "Committee-level: Compliance Committee is management-level (O'Reilly executives). " "Board is incidental destination (2 clauses). Titled 'Management's Role.'"), # Very brief (3 sentences). Management oversees + board notification + 'Public # Offering' (registration statement). Board is incident notification only. "cb518f47": ("Management Role", "Board-removal: remove notification sentence → 'management oversees cybersecurity.' " "Board is incident notification destination only. Brief paragraph."), } # Exp 4: Codebook tiebreaker overrides (beyond existing T5_CODEBOOK_OVERRIDES) # Each paragraph read in full and classified by codebook rules. CODEBOOK_OVERRIDES: dict[str, tuple[str, str]] = { # ── ID↔SI: negative assertion = materiality assessment → SI ────────── "0ceeb618": ("Strategy Integration", "ID/SI: Opens with negative assertion ('no material incidents'), Feb 2025 " "incident is brief context + 'has not had material impact' conclusion. " "Materiality assessment frame dominates → SI"), "cc82eb9f": ("Strategy Integration", "ID/SI: June 2018 incident is example within broader negative materiality " "assertion ('have not materially affected us'). Assessment frame dominates → SI"), # ── SPAC rule (Case 8): pre-revenue company → N/O ──────────────────── "203ccd43": ("None/Other", "SPAC: 'once the Company commences operations' — pre-revenue company. " "Case 8: SPAC → N/O regardless of management role language"), # ── ID→RMP: post-incident improvements, no incident described ──────── "f549fd64": ("Risk Management Process", "ID/RMP: 'Following this cybersecurity event' — refers to incident without " "describing it. 100% of content is hardening, training, MFA, EDR — pure RMP"), } @dataclass class ExperimentConfig: name: str description: str exclude_gemini_mr_rmp: bool = False apply_board_removal: bool = False apply_committee_level: bool = False apply_idsi_tiebreaker: bool = False apply_specificity_hybrid: bool = False # Text-based: remove BG model votes when "board" absent from paragraph text apply_no_board_bg_removal: bool = False @dataclass class ExperimentResult: config: ExperimentConfig total: int = 0 tier_counts: dict[str, int] = field(default_factory=dict) category_dist: dict[str, int] = field(default_factory=dict) human_maj_dist: dict[str, int] = field(default_factory=dict) flipped_from_human: int = 0 source_accuracy: dict[str, float] = field(default_factory=dict) t5_by_axis: dict[str, int] = field(default_factory=dict) t5_weak_plurality: int = 0 # 4-5/9 results: list[dict] = field(default_factory=list) spec_changes: int = 0 def load_jsonl(path: Path) -> list[dict]: with open(path) as f: return [json.loads(line) for line in f] def majority_vote(votes: list[str]) -> str | None: if not votes: return None return Counter(votes).most_common(1)[0][0] def get_confusion_axis(human_votes: dict, model_votes: dict) -> str: """Identify the confusion axis from vote distributions.""" all_cats = sorted(set(list(human_votes.keys()) + list(model_votes.keys()))) if len(all_cats) == 2: return f"{all_cats[0]}↔{all_cats[1]}" return "↔".join(all_cats) def run_experiment(config: ExperimentConfig) -> ExperimentResult: """Run adjudication with a specific experimental configuration.""" # ── Load data ───────────────────────────────────────────────────── human_labels: dict[str, list[dict]] = defaultdict(list) for r in load_jsonl(ROOT / "data/gold/human-labels-raw.jsonl"): human_labels[r["paragraphId"]].append({ "cat": r["contentCategory"], "spec": r["specificityLevel"], "annotator": r["annotatorName"], }) confusion_pids = {r["paragraphId"] for r in load_jsonl(ROOT / "data/gold/holdout-rerun-v35.jsonl")} TOP6 = ["Opus", "GPT-5.4", "Gemini", "GLM-5", "Kimi", "MIMO"] def load_model_cats(files: dict[str, Path]) -> dict[str, dict[str, str]]: result: dict[str, dict[str, str]] = {} for name, path in files.items(): result[name] = {} if path.exists(): for r in load_jsonl(path): cat = r.get("label", {}).get("content_category") or r.get("content_category") if cat: result[name][r["paragraphId"]] = cat # Also load specificity for exp5 result[f"{name}_spec"] = {} if path.exists(): for r in load_jsonl(path): spec = r.get("label", {}).get("specificity_level") or r.get("specificity_level") if spec is not None: result[f"{name}_spec"][r["paragraphId"]] = spec return result v30_cats = load_model_cats({ "Opus": ROOT / "data/annotations/golden/opus.jsonl", "GPT-5.4": ROOT / "data/annotations/bench-holdout/gpt-5.4.jsonl", "Gemini": ROOT / "data/annotations/bench-holdout/gemini-3.1-pro-preview.jsonl", "GLM-5": ROOT / "data/annotations/bench-holdout/glm-5:exacto.jsonl", "Kimi": ROOT / "data/annotations/bench-holdout/kimi-k2.5.jsonl", "MIMO": ROOT / "data/annotations/bench-holdout/mimo-v2-pro:exacto.jsonl", }) v35_cats = load_model_cats({ "Opus": ROOT / "data/annotations/golden-v35/opus.jsonl", "GPT-5.4": ROOT / "data/annotations/bench-holdout-v35/gpt-5.4.jsonl", "Gemini": ROOT / "data/annotations/bench-holdout-v35/gemini-3.1-pro-preview.jsonl", "GLM-5": ROOT / "data/annotations/bench-holdout-v35/glm-5:exacto.jsonl", "Kimi": ROOT / "data/annotations/bench-holdout-v35/kimi-k2.5.jsonl", "MIMO": ROOT / "data/annotations/bench-holdout-v35/mimo-v2-pro:exacto.jsonl", }) # Merge v3.0 + v3.5 (v3.5 for confusion PIDs) model_cats: dict[str, dict[str, str]] = {} model_specs: dict[str, dict[str, int]] = {} for m in TOP6: model_cats[m] = {} model_specs[m] = {} for pid in human_labels: if pid in confusion_pids and pid in v35_cats.get(m, {}): model_cats[m][pid] = v35_cats[m][pid] elif pid in v30_cats.get(m, {}): model_cats[m][pid] = v30_cats[m][pid] # Specificity (always v3.0 for full coverage) if pid in v30_cats.get(f"{m}_spec", {}): model_specs[m][pid] = v30_cats[f"{m}_spec"][pid] # ── Adjudicate ──────────────────────────────────────────────────── result = ExperimentResult(config=config) tier_counts: Counter[str] = Counter() for pid in sorted(human_labels.keys()): h_cats = [l["cat"] for l in human_labels[pid]] h_specs = [l["spec"] for l in human_labels[pid]] h_cat_maj = majority_vote(h_cats) h_spec_maj = majority_vote(h_specs) h_spec_unanimous = len(set(h_specs)) == 1 # Use full model panel for tier calculation (T1-T4 stability) active_models = list(TOP6) m_cats_list = [model_cats[m][pid] for m in active_models if pid in model_cats[m]] m_cat_maj = majority_vote(m_cats_list) m_cat_unanimous = len(set(m_cats_list)) == 1 and len(m_cats_list) == len(active_models) all_signals = h_cats + m_cats_list signal_counter = Counter(all_signals) total_signals = len(all_signals) top_signal, top_count = signal_counter.most_common(1)[0] short_pid = pid[:8] si_override = SI_NO_OVERRIDES.get(short_pid) gold_cat: str | None = None tier: str = "" reason: str = "" if si_override: gold_cat = si_override[0] tier = "T3-rule" reason = f"SI/NO override: {si_override[1]}" elif top_count >= 8 and total_signals >= 8: gold_cat = top_signal tier = "T1-super" reason = f"{top_count}/{total_signals} signals agree" elif h_cat_maj == m_cat_maj: gold_cat = h_cat_maj tier = "T2-cross" reason = "Human + model majority agree" elif m_cat_unanimous: gold_cat = m_cat_maj tier = "T4-model" h_count = Counter(h_cats).most_common(1)[0][1] reason = f"{len(m_cats_list)}/{len(m_cats_list)} models unanimous ({m_cat_maj}) vs human {h_count}/3 ({h_cat_maj})" else: # Check rule-based overrides t5_override = T5_CODEBOOK_OVERRIDES.get(short_pid) # Exp 2/3: Manual board-removal + committee-level test (with-board paragraphs) board_override = MANUAL_BOARD_OVERRIDES.get(short_pid) if (config.apply_board_removal or config.apply_committee_level) else None # Exp 4: Codebook tiebreaker overrides codebook_override = CODEBOOK_OVERRIDES.get(short_pid) if config.apply_idsi_tiebreaker else None if t5_override: gold_cat = t5_override[0] tier = "T3-rule" reason = f"T5 codebook override: {t5_override[1]}" elif board_override: gold_cat = board_override[0] tier = "T3-rule" reason = f"Board/committee test: {board_override[1]}" elif codebook_override: gold_cat = codebook_override[0] tier = "T3-rule" reason = f"Codebook tiebreaker: {codebook_override[1]}" else: t5_signals = list(all_signals) t5_total = total_signals suffix = "" # ── Exp 1: Gemini exclusion at T5 resolution only ───── if config.exclude_gemini_mr_rmp: gemini_cat = model_cats.get("Gemini", {}).get(pid) if gemini_cat == "Management Role": other_m_cats = [model_cats[m][pid] for m in TOP6 if m != "Gemini" and pid in model_cats[m]] other_m_maj = majority_vote(other_m_cats) if other_m_cats else None if other_m_maj != "Management Role": t5_signals = h_cats + other_m_cats t5_total = len(t5_signals) suffix += " [Gemini MR excluded]" # ── Exp 2b: No-board BG vote removal ───────────────── # If "board" (case-insensitive) doesn't appear in the paragraph # text, BG model votes are provably unsupported — the paragraph # can't be about board governance if it never mentions the board. # Remove those BG signals and recalculate plurality. if config.apply_no_board_bg_removal: para_texts = load_paragraph_texts() para_text = para_texts.get(pid, "") if "board" not in para_text.lower(): bg_count = sum(1 for s in t5_signals if s == "Board Governance") if bg_count > 0: t5_signals = [s for s in t5_signals if s != "Board Governance"] t5_total = len(t5_signals) if t5_signals: suffix += f" [BG removed: no 'board' in text, {bg_count} votes dropped]" if t5_signals: t5_counter = Counter(t5_signals) t5_top, t5_top_count = t5_counter.most_common(1)[0] else: t5_top, t5_top_count = top_signal, top_count gold_cat = t5_top tier = "T5-plurality" reason = f"Mixed: human={h_cat_maj}, model={m_cat_maj}, plurality={t5_top} ({t5_top_count}/{t5_total}){suffix}" # ── Specificity ─────────────────────────────────────────────── if config.apply_specificity_hybrid and not h_spec_unanimous: # Human split → use model majority m_specs = [model_specs[m][pid] for m in TOP6 if pid in model_specs[m]] if m_specs: gold_spec = majority_vote([str(s) for s in m_specs]) gold_spec = int(gold_spec) if gold_spec else h_spec_maj if gold_spec != h_spec_maj: result.spec_changes += 1 else: gold_spec = h_spec_maj else: gold_spec = h_spec_maj tier_counts[tier] += 1 row = { "paragraphId": pid, "gold_category": gold_cat, "gold_specificity": gold_spec, "tier": tier, "reason": reason, "human_majority": h_cat_maj, "model_majority": m_cat_maj, "human_votes": dict(Counter(h_cats)), "model_votes": dict(Counter(m_cats_list)), } result.results.append(row) if tier == "T5-plurality": axis = get_confusion_axis(dict(Counter(h_cats)), dict(Counter(m_cats_list))) result.t5_by_axis[axis] = result.t5_by_axis.get(axis, 0) + 1 if top_count <= 5: result.t5_weak_plurality += 1 result.total = len(result.results) result.tier_counts = dict(sorted(tier_counts.items())) result.flipped_from_human = sum(1 for r in result.results if r["gold_category"] != r["human_majority"]) result.category_dist = dict(Counter(r["gold_category"] for r in result.results)) result.human_maj_dist = dict(Counter(r["human_majority"] for r in result.results)) # Source accuracy vs gold gold_by_pid = {r["paragraphId"]: r["gold_category"] for r in result.results} # Human annotators annotator_names = sorted(set(l["annotator"] for labels in human_labels.values() for l in labels)) for ann in annotator_names: agree = total = 0 for pid, labels in human_labels.items(): for l in labels: if l["annotator"] == ann and pid in gold_by_pid: total += 1 if l["cat"] == gold_by_pid[pid]: agree += 1 if total > 0: result.source_accuracy[f"H:{ann}"] = agree / total # Models (v3.0 on full 1200) for m in TOP6: agree = total = 0 for pid in gold_by_pid: if pid in v30_cats.get(m, {}): total += 1 if v30_cats[m][pid] == gold_by_pid[pid]: agree += 1 if total > 0: result.source_accuracy[f"M:{m}"] = agree / total return result def print_result(r: ExperimentResult, baseline: ExperimentResult | None = None) -> None: """Print experiment results with optional delta from baseline.""" print(f"\n{'=' * 90}") print(f"EXPERIMENT: {r.config.name}") print(f" {r.config.description}") print(f"{'=' * 90}") print(f"\nTier distribution:") for tier in ["T1-super", "T2-cross", "T3-rule", "T4-model", "T5-plurality"]: count = r.tier_counts.get(tier, 0) pct = count / r.total * 100 delta = "" if baseline: bc = baseline.tier_counts.get(tier, 0) if count != bc: delta = f" (Δ {count - bc:+d})" print(f" {tier:<16} {count:>5} ({pct:.1f}%){delta}") print(f"\nGold ≠ human majority: {r.flipped_from_human} ({r.flipped_from_human / r.total:.1%})") if baseline and r.flipped_from_human != baseline.flipped_from_human: print(f" (Δ {r.flipped_from_human - baseline.flipped_from_human:+d})") if r.t5_by_axis: t5_total = sum(r.t5_by_axis.values()) print(f"\nT5 remaining ({t5_total} cases):") for axis, count in sorted(r.t5_by_axis.items(), key=lambda x: -x[1])[:10]: print(f" {axis:<60} {count:>3}") print(f" Weak plurality (4-5/9): {r.t5_weak_plurality}") print(f"\nCategory distribution (gold):") all_cats = sorted(set(list(r.category_dist.keys()) + list(r.human_maj_dist.keys()))) print(f" {'Category':<25} {'Gold':>6} {'H-Maj':>6} {'Δ':>5}", end="") if baseline: print(f" {'Prev':>6} {'ΔExp':>5}", end="") print() for cat in all_cats: g = r.category_dist.get(cat, 0) h = r.human_maj_dist.get(cat, 0) line = f" {cat:<25} {g:>6} {h:>6} {g - h:>+5}" if baseline: bg = baseline.category_dist.get(cat, 0) line += f" {bg:>6} {g - bg:>+5}" print(line) print(f"\nSource accuracy vs gold:") # Sort by accuracy descending for source, acc in sorted(r.source_accuracy.items(), key=lambda x: -x[1]): delta = "" if baseline and source in baseline.source_accuracy: ba = baseline.source_accuracy[source] diff = acc - ba if abs(diff) >= 0.0005: delta = f" (Δ {diff:+.1%})" print(f" {source:<16} {acc:.1%}{delta}") if r.config.apply_specificity_hybrid: print(f"\nSpecificity: {r.spec_changes} labels changed from human majority to model majority") def diff_results(a: ExperimentResult, b: ExperimentResult) -> list[dict]: """Find paragraphs where gold_category differs between two experiments.""" a_map = {r["paragraphId"]: r for r in a.results} b_map = {r["paragraphId"]: r for r in b.results} diffs = [] for pid in sorted(a_map.keys()): if a_map[pid]["gold_category"] != b_map[pid]["gold_category"]: diffs.append({ "paragraphId": pid, "before": a_map[pid]["gold_category"], "after": b_map[pid]["gold_category"], "before_tier": a_map[pid]["tier"], "after_tier": b_map[pid]["tier"], "human_majority": a_map[pid]["human_majority"], "reason_after": b_map[pid]["reason"], }) return diffs # ── PARAGRAPH TEXT LOADER (for text-based tests) ─────────────────────── _paragraph_texts: dict[str, str] | None = None def load_paragraph_texts() -> dict[str, str]: global _paragraph_texts if _paragraph_texts is None: _paragraph_texts = {} for r in load_jsonl(ROOT / "data/gold/paragraphs-holdout.jsonl"): _paragraph_texts[r["id"]] = r["text"] return _paragraph_texts EXPERIMENTS = { "baseline": ExperimentConfig( name="baseline", description="Current production adjudication (no changes)", ), "exp1_gemini": ExperimentConfig( name="exp1_gemini", description="Exclude Gemini from MR↔RMP axis when Gemini voted MR", exclude_gemini_mr_rmp=True, ), "exp2_board": ExperimentConfig( name="exp2_board", description="Board-removal test overrides for BG↔RMP T5 cases", apply_board_removal=True, ), "exp2b_noboard": ExperimentConfig( name="exp2b_noboard", description="Remove BG model votes when 'board' absent from paragraph text (automated, verifiable)", apply_no_board_bg_removal=True, ), "exp3_committee": ExperimentConfig( name="exp3_committee", description="Committee-level test overrides for BG↔MR T5 cases", apply_committee_level=True, ), "exp4_idsi": ExperimentConfig( name="exp4_idsi", description="ID↔SI volume-dominant tiebreaker", apply_idsi_tiebreaker=True, ), "exp5_spec": ExperimentConfig( name="exp5_spec", description="Specificity hybrid: human unanimous → human, split → model majority", apply_specificity_hybrid=True, ), "combined": ExperimentConfig( name="combined", description="All validated interventions: no-board BG removal + manual board overrides + codebook tiebreakers + specificity hybrid", apply_no_board_bg_removal=True, apply_board_removal=True, apply_idsi_tiebreaker=True, apply_specificity_hybrid=True, ), } def main() -> None: experiments_to_run = sys.argv[1:] if len(sys.argv) > 1 else ["all"] if "all" in experiments_to_run: experiments_to_run = list(EXPERIMENTS.keys()) # Always run baseline first if "baseline" not in experiments_to_run: experiments_to_run.insert(0, "baseline") results: dict[str, ExperimentResult] = {} baseline: ExperimentResult | None = None for exp_name in experiments_to_run: if exp_name not in EXPERIMENTS: print(f"Unknown experiment: {exp_name}") continue r = run_experiment(EXPERIMENTS[exp_name]) results[exp_name] = r if exp_name == "baseline": baseline = r print_result(r) else: print_result(r, baseline) # Show specific label changes if baseline: diffs = diff_results(baseline, r) if diffs: print(f"\n Label changes ({len(diffs)}):") for d in diffs: print(f" {d['paragraphId'][:8]}: {d['before']:<25} → {d['after']:<25} (H={d['human_majority']}) [{d['after_tier']}]") # ── Acceptance criteria check ───────────────────────────────────── if baseline and len(results) > 1: print(f"\n{'=' * 90}") print("ACCEPTANCE CRITERIA SUMMARY") print(f"{'=' * 90}") print(f"\nCriteria:") print(f" 1. T5 count decreases (fewer arbitrary resolutions)") print(f" 2. Source accuracy: no model/human drops >1% (intervention isn't distorting)") print(f" 3. Category distribution: no category shifts >±5% of its baseline count") print(f" 4. Changes are principled (each has documented codebook justification)") print() for exp_name, r in results.items(): if exp_name == "baseline": continue t5_base = baseline.tier_counts.get("T5-plurality", 0) t5_exp = r.tier_counts.get("T5-plurality", 0) t5_pass = t5_exp <= t5_base max_acc_drop = 0.0 for source in baseline.source_accuracy: if source in r.source_accuracy: drop = baseline.source_accuracy[source] - r.source_accuracy[source] max_acc_drop = max(max_acc_drop, drop) acc_pass = max_acc_drop < 0.01 max_cat_shift_pct = 0.0 for cat in baseline.category_dist: base_n = baseline.category_dist.get(cat, 0) exp_n = r.category_dist.get(cat, 0) if base_n > 0: shift = abs(exp_n - base_n) / base_n max_cat_shift_pct = max(max_cat_shift_pct, shift) cat_pass = max_cat_shift_pct < 0.05 status = "✓ PASS" if (t5_pass and acc_pass and cat_pass) else "✗ FAIL" print(f" {exp_name:<20} {status}") print(f" T5: {t5_base} → {t5_exp} (Δ {t5_exp - t5_base:+d}) {'✓' if t5_pass else '✗'}") print(f" Max accuracy drop: {max_acc_drop:.2%} {'✓' if acc_pass else '✗'}") print(f" Max category shift: {max_cat_shift_pct:.1%} {'✓' if cat_pass else '✗'}") if __name__ == "__main__": main()