531 lines
23 KiB
Python
531 lines
23 KiB
Python
"""
|
|
Show carefully selected hard-case paragraphs from the holdout set for each confusion axis.
|
|
Displays full paragraph text + compact 13-signal label table + vote tally.
|
|
|
|
Run: uv run --with numpy scripts/show-hard-examples.py
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from collections import Counter, defaultdict
|
|
from pathlib import Path
|
|
from textwrap import fill
|
|
|
|
import numpy as np
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
|
|
# ── Category abbreviation map ──────────────────────────────────────────────
|
|
FULL_TO_ABBR = {
|
|
"Board Governance": "BG",
|
|
"Incident Disclosure": "ID",
|
|
"Management Role": "MR",
|
|
"None/Other": "N/O",
|
|
"Risk Management Process": "RMP",
|
|
"Strategy Integration": "SI",
|
|
"Third-Party Risk": "TPR",
|
|
}
|
|
|
|
# ── Short source-name helpers ──────────────────────────────────────────────
|
|
S1_MODEL_SHORT = {
|
|
"google/gemini-3.1-flash-lite-preview": "gemini-lite",
|
|
"x-ai/grok-4.1-fast": "grok-fast",
|
|
"xiaomi/mimo-v2-flash": "mimo-flash",
|
|
}
|
|
|
|
BENCH_FILE_SHORT = {
|
|
"gpt-5.4": "gpt-5.4",
|
|
"gemini-3.1-pro-preview": "gemini-pro",
|
|
"glm-5:exacto": "glm-5",
|
|
"kimi-k2.5": "kimi",
|
|
"mimo-v2-pro:exacto": "mimo-pro",
|
|
"minimax-m2.7:exacto": "minimax",
|
|
}
|
|
|
|
BENCH_FILES = [
|
|
"gpt-5.4",
|
|
"gemini-3.1-pro-preview",
|
|
"glm-5:exacto",
|
|
"kimi-k2.5",
|
|
"mimo-v2-pro:exacto",
|
|
"minimax-m2.7:exacto",
|
|
]
|
|
|
|
|
|
def load_jsonl(path: str | Path) -> list[dict]:
|
|
rows = []
|
|
with open(path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
rows.append(json.loads(line))
|
|
return rows
|
|
|
|
|
|
# ── Load data ──────────────────────────────────────────────────────────────
|
|
print("Loading data...")
|
|
paragraphs_raw = load_jsonl(ROOT / "data/gold/paragraphs-holdout.jsonl")
|
|
para_map: dict[str, dict] = {p["id"]: p for p in paragraphs_raw}
|
|
holdout_pids = set(para_map.keys())
|
|
|
|
human_raw = load_jsonl(ROOT / "data/gold/human-labels-raw.jsonl")
|
|
opus_raw = load_jsonl(ROOT / "data/annotations/golden/opus.jsonl")
|
|
stage1_raw = load_jsonl(ROOT / "data/annotations/stage1.patched.jsonl")
|
|
|
|
# ── Build signal matrix: pid → {source_label: category_abbr} ─────────────
|
|
signals: dict[str, dict[str, str]] = defaultdict(dict)
|
|
|
|
# 1) Human annotators
|
|
for row in human_raw:
|
|
pid = row["paragraphId"]
|
|
name = row["annotatorName"]
|
|
cat = FULL_TO_ABBR.get(row["contentCategory"], row["contentCategory"])
|
|
signals[pid][f"H:{name}"] = cat
|
|
|
|
# 2) Opus
|
|
for row in opus_raw:
|
|
pid = row["paragraphId"]
|
|
cat = FULL_TO_ABBR.get(row["label"]["content_category"], row["label"]["content_category"])
|
|
signals[pid]["Opus"] = cat
|
|
|
|
# 3) Stage 1 (filter to holdout PIDs)
|
|
for row in stage1_raw:
|
|
pid = row["paragraphId"]
|
|
if pid not in holdout_pids:
|
|
continue
|
|
model_id = row["provenance"]["modelId"]
|
|
short = S1_MODEL_SHORT.get(model_id, model_id)
|
|
source = f"S1:{short}"
|
|
cat = FULL_TO_ABBR.get(row["label"]["content_category"], row["label"]["content_category"])
|
|
signals[pid][source] = cat
|
|
|
|
# 4) Benchmark models
|
|
for bench_name in BENCH_FILES:
|
|
path = ROOT / f"data/annotations/bench-holdout/{bench_name}.jsonl"
|
|
short = BENCH_FILE_SHORT[bench_name]
|
|
rows = load_jsonl(path)
|
|
for row in rows:
|
|
pid = row["paragraphId"]
|
|
cat = FULL_TO_ABBR.get(row["label"]["content_category"], row["label"]["content_category"])
|
|
signals[pid][short] = cat
|
|
|
|
# ── Ordered source list (for display) ─────────────────────────────────────
|
|
HUMAN_NAMES = sorted({r["annotatorName"] for r in human_raw})
|
|
ORDERED_SOURCES = (
|
|
[f"H:{n}" for n in HUMAN_NAMES]
|
|
+ ["Opus"]
|
|
+ [f"S1:{S1_MODEL_SHORT[m]}" for m in sorted(S1_MODEL_SHORT)]
|
|
+ [BENCH_FILE_SHORT[b] for b in BENCH_FILES]
|
|
)
|
|
|
|
# ── Utility: compute axis stats ───────────────────────────────────────────
|
|
|
|
def axis_candidates(cat_a: str, cat_b: str, extra_cat: str | None = None) -> list[tuple[str, dict, Counter]]:
|
|
"""Find PIDs where both cat_a and cat_b appear among the 13 signals.
|
|
Returns list of (pid, signals_dict, vote_counter) sorted by closeness of split."""
|
|
results = []
|
|
for pid, sigs in signals.items():
|
|
if pid not in holdout_pids:
|
|
continue
|
|
counts = Counter(sigs.values())
|
|
cats_present = set(counts.keys())
|
|
if cat_a in cats_present and cat_b in cats_present:
|
|
if extra_cat is not None and extra_cat not in cats_present:
|
|
continue
|
|
# closeness = min(count_a, count_b) / total — higher is closer split
|
|
total = sum(counts.values())
|
|
closeness = min(counts[cat_a], counts[cat_b]) / total
|
|
results.append((pid, sigs, counts, closeness))
|
|
# Sort by closeness (descending), then by total signal count (descending) as tiebreaker
|
|
results.sort(key=lambda x: (-x[3], -sum(x[2].values())))
|
|
return [(pid, sigs, counts) for pid, sigs, counts, _ in results]
|
|
|
|
|
|
def print_example(pid: str, sigs: dict, counts: Counter, sub_pattern: str, note: str = ""):
|
|
"""Print one example paragraph with signals."""
|
|
para = para_map.get(pid)
|
|
if not para:
|
|
print(f" [paragraph {pid} not found]")
|
|
return
|
|
|
|
print(f" ┌─ Paragraph {pid}")
|
|
print(f" │ Company: {para.get('companyName', '?')} | Filing: {para.get('filingType', '?')} {para.get('filingDate', '?')}")
|
|
print(f" │ Sub-pattern: {sub_pattern}")
|
|
print(f" │")
|
|
|
|
# Full text — wrap at 100 chars, indent
|
|
text = para["text"]
|
|
for line in text.split("\n"):
|
|
wrapped = fill(line, width=100, initial_indent=" │ ", subsequent_indent=" │ ")
|
|
print(wrapped)
|
|
print(f" │")
|
|
|
|
# Signal table — compact single line
|
|
parts = []
|
|
for src in ORDERED_SOURCES:
|
|
if src in sigs:
|
|
parts.append(f"{src}={sigs[src]}")
|
|
print(f" │ Signals: {', '.join(parts)}")
|
|
|
|
# Vote tally
|
|
tally_parts = [f"{cat}: {n}" for cat, n in counts.most_common()]
|
|
print(f" │ Tally: {', '.join(tally_parts)} (out of {sum(counts.values())})")
|
|
|
|
if note:
|
|
print(f" │")
|
|
for line in note.split("\n"):
|
|
wrapped = fill(line, width=100, initial_indent=" │ ▸ ", subsequent_indent=" │ ")
|
|
print(wrapped)
|
|
|
|
print(f" └{'─' * 78}")
|
|
print()
|
|
|
|
|
|
def pick_diverse(candidates: list[tuple[str, dict, Counter]], n: int, min_signals: int = 10) -> list[tuple[str, dict, Counter]]:
|
|
"""Pick n diverse examples from candidates (different companies, prefer many signals)."""
|
|
if len(candidates) <= n:
|
|
return candidates
|
|
# Filter to examples with enough signals for a meaningful table
|
|
rich = [(pid, sigs, counts) for pid, sigs, counts in candidates if sum(counts.values()) >= min_signals]
|
|
if len(rich) < n:
|
|
rich = candidates # fall back if not enough rich examples
|
|
# Diversify by company
|
|
seen_companies: set[str] = set()
|
|
selected = []
|
|
for pid, sigs, counts in rich:
|
|
company = para_map.get(pid, {}).get("companyName", "")
|
|
if company in seen_companies and len(rich) > n * 2:
|
|
continue
|
|
selected.append((pid, sigs, counts))
|
|
seen_companies.add(company)
|
|
if len(selected) >= n * 3:
|
|
break
|
|
return selected[:n]
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════
|
|
# AXIS 1: MR ↔ RMP
|
|
# ══════════════════════════════════════════════════════════════════════════
|
|
print("\n" + "=" * 80)
|
|
print(" AXIS 1: MR ↔ RMP — Management Role vs. Risk Management Process")
|
|
print("=" * 80)
|
|
|
|
mr_rmp = axis_candidates("MR", "RMP")
|
|
print(f"\n Total paragraphs with both MR and RMP in signals: {len(mr_rmp)}\n")
|
|
|
|
|
|
def classify_mr_rmp_subpattern(text: str) -> str:
|
|
"""Heuristic to guess sub-pattern for MR↔RMP confusion."""
|
|
text_lower = text.lower()
|
|
sentences = [s.strip() for s in text.replace("\n", " ").split(".") if s.strip()]
|
|
|
|
person_keywords = [
|
|
"ciso", "chief information security", "chief information officer",
|
|
"cio", "vp ", "vice president", "director", "officer", "head of",
|
|
"manager", "leader", "executive", "cto", "chief technology",
|
|
]
|
|
process_keywords = [
|
|
"program", "framework", "process", "policy", "policies",
|
|
"procedures", "controls", "assessment", "monitoring",
|
|
"risk management", "incident response", "vulnerability",
|
|
]
|
|
|
|
person_subject_sentences = 0
|
|
process_subject_sentences = 0
|
|
|
|
for sent in sentences:
|
|
sent_lower = sent.lower().strip()
|
|
has_person = any(kw in sent_lower[:80] for kw in person_keywords)
|
|
has_process = any(kw in sent_lower[:80] for kw in process_keywords)
|
|
if has_person:
|
|
person_subject_sentences += 1
|
|
if has_process:
|
|
process_subject_sentences += 1
|
|
|
|
if person_subject_sentences > 0 and process_subject_sentences == 0:
|
|
return "person-subject"
|
|
elif process_subject_sentences > 0 and person_subject_sentences == 0:
|
|
return "process-subject"
|
|
elif person_subject_sentences > 0 and process_subject_sentences > 0:
|
|
return "mixed"
|
|
else:
|
|
return "other"
|
|
|
|
|
|
# Bucket candidates by sub-pattern
|
|
buckets: dict[str, list] = {"person-subject": [], "process-subject": [], "mixed": [], "other": []}
|
|
for pid, sigs, counts in mr_rmp:
|
|
text = para_map.get(pid, {}).get("text", "")
|
|
sp = classify_mr_rmp_subpattern(text)
|
|
buckets[sp].append((pid, sigs, counts))
|
|
|
|
print(f" Sub-pattern distribution: person-subject={len(buckets['person-subject'])}, "
|
|
f"process-subject={len(buckets['process-subject'])}, mixed={len(buckets['mixed'])}, "
|
|
f"other={len(buckets['other'])}")
|
|
print()
|
|
|
|
# (a) Person is grammatical subject
|
|
print(" ── (a) Person is the grammatical subject, doing process-like things ──\n")
|
|
for pid, sigs, counts in pick_diverse(buckets["person-subject"], 2):
|
|
text = para_map[pid]["text"]
|
|
# Subject test note
|
|
note = "SUBJECT TEST → MR (person is the main subject)"
|
|
print_example(pid, sigs, counts, "Person as subject doing process-like things", note)
|
|
|
|
# (b) Process/framework is subject
|
|
print(" ── (b) Process/framework is the subject, person mentioned as responsible ──\n")
|
|
for pid, sigs, counts in pick_diverse(buckets["process-subject"], 2):
|
|
text = para_map[pid]["text"]
|
|
note = "SUBJECT TEST → RMP (process/framework is the main subject)"
|
|
print_example(pid, sigs, counts, "Process as subject, person mentioned", note)
|
|
|
|
# (c) Mixed
|
|
print(" ── (c) Mixed — both person and process are subjects ──\n")
|
|
for pid, sigs, counts in pick_diverse(buckets["mixed"], 2):
|
|
note = "SUBJECT TEST → AMBIGUOUS (both person and process serve as subjects)"
|
|
print_example(pid, sigs, counts, "Mixed subjects", note)
|
|
|
|
# (d) Edge cases — closest splits from "other" or overall closest
|
|
print(" ── (d) Edge cases — genuinely hard to call ──\n")
|
|
# Take from overall closest that aren't already shown
|
|
shown_pids = set()
|
|
for bucket in buckets.values():
|
|
for pid, _, _ in bucket[:2]:
|
|
shown_pids.add(pid)
|
|
edge_cases = [(p, s, c) for p, s, c in mr_rmp if p not in shown_pids][:20]
|
|
for pid, sigs, counts in pick_diverse(edge_cases, 2):
|
|
mr_count = counts.get("MR", 0)
|
|
rmp_count = counts.get("RMP", 0)
|
|
note = f"SUBJECT TEST → unclear; split is {mr_count}-{rmp_count} MR-RMP"
|
|
print_example(pid, sigs, counts, "Edge case", note)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════
|
|
# AXIS 2: BG ↔ MR
|
|
# ══════════════════════════════════════════════════════════════════════════
|
|
print("\n" + "=" * 80)
|
|
print(" AXIS 2: BG ↔ MR — Board Governance vs. Management Role")
|
|
print("=" * 80)
|
|
|
|
bg_mr = axis_candidates("BG", "MR")
|
|
print(f"\n Total paragraphs with both BG and MR in signals: {len(bg_mr)}\n")
|
|
|
|
|
|
def classify_bg_mr_subpattern(text: str) -> str:
|
|
text_lower = text.lower()
|
|
board_words = ["board", "committee", "audit committee", "directors"]
|
|
mgmt_words = ["ciso", "chief information", "officer", "vp", "vice president",
|
|
"director of", "head of", "reports to", "briefing", "briefs",
|
|
"presents to", "reporting"]
|
|
|
|
has_board_actor = any(w in text_lower for w in board_words)
|
|
has_mgmt_reporting = any(w in text_lower for w in mgmt_words)
|
|
|
|
if has_board_actor and not has_mgmt_reporting:
|
|
return "board-actor"
|
|
elif has_mgmt_reporting and has_board_actor:
|
|
return "mgmt-reporting-to-board"
|
|
elif has_mgmt_reporting:
|
|
return "mgmt-only"
|
|
else:
|
|
return "mixed-governance"
|
|
|
|
|
|
buckets_bg: dict[str, list] = defaultdict(list)
|
|
for pid, sigs, counts in bg_mr:
|
|
sp = classify_bg_mr_subpattern(para_map.get(pid, {}).get("text", ""))
|
|
buckets_bg[sp].append((pid, sigs, counts))
|
|
|
|
print(f" Sub-pattern distribution: {dict((k, len(v)) for k, v in buckets_bg.items())}")
|
|
print()
|
|
|
|
# (a) Board/committee is clearly the actor
|
|
print(" ── (a) Board/committee is clearly the actor ──\n")
|
|
pool = buckets_bg.get("board-actor", []) or buckets_bg.get("mixed-governance", [])
|
|
for pid, sigs, counts in pick_diverse(pool, 2):
|
|
print_example(pid, sigs, counts, "Board as actor")
|
|
|
|
# (b) Management officer reporting TO the board
|
|
print(" ── (b) Management officer reporting TO/briefing the board ──\n")
|
|
pool = buckets_bg.get("mgmt-reporting-to-board", [])
|
|
for pid, sigs, counts in pick_diverse(pool, 2):
|
|
note = "KEY QUESTION: Is this BG (board receiving info) or MR (officer doing the briefing)?"
|
|
print_example(pid, sigs, counts, "Management reporting to board", note)
|
|
|
|
# (c) Mixed governance
|
|
print(" ── (c) Mixed governance language ──\n")
|
|
remaining = [x for x in bg_mr if x[0] not in {p for bucket in buckets_bg.values() for p, _, _ in bucket[:2]}]
|
|
for pid, sigs, counts in pick_diverse(remaining, 2):
|
|
note = "Could be BG, MR, or RMP depending on interpretation"
|
|
print_example(pid, sigs, counts, "Mixed governance", note)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════
|
|
# AXIS 3: SI ↔ N/O
|
|
# ══════════════════════════════════════════════════════════════════════════
|
|
print("\n" + "=" * 80)
|
|
print(" AXIS 3: SI ↔ N/O — Strategy Integration vs. None/Other")
|
|
print("=" * 80)
|
|
|
|
si_no = axis_candidates("SI", "N/O")
|
|
print(f"\n Total paragraphs with both SI and N/O in signals: {len(si_no)}\n")
|
|
|
|
|
|
def classify_si_no_subpattern(text: str) -> str:
|
|
text_lower = text.lower()
|
|
|
|
incident_words = ["incident", "breach", "attack", "compromised", "unauthorized access",
|
|
"data breach", "ransomware", "phishing"]
|
|
negative_words = ["have not experienced", "not experienced", "no material",
|
|
"has not been materially", "not been the subject",
|
|
"not aware of any", "no known", "have not had"]
|
|
hypothetical_words = ["could", "may", "might", "would", "if ", "potential",
|
|
"face threats", "subject to"]
|
|
specific_words = ["$", "million", "vendor", "contract", "insurance",
|
|
"specific", "particular", "named"]
|
|
|
|
has_incident = any(w in text_lower for w in incident_words)
|
|
has_negative = any(w in text_lower for w in negative_words)
|
|
has_hypothetical = any(w in text_lower for w in hypothetical_words)
|
|
has_specific = any(w in text_lower for w in specific_words)
|
|
|
|
if has_incident and not has_negative:
|
|
return "actual-incident"
|
|
elif has_negative:
|
|
return "negative-assertion"
|
|
elif has_hypothetical and not has_specific:
|
|
return "hypothetical"
|
|
elif has_specific:
|
|
return "specific-no-incident"
|
|
else:
|
|
return "other"
|
|
|
|
|
|
buckets_si: dict[str, list] = defaultdict(list)
|
|
for pid, sigs, counts in si_no:
|
|
sp = classify_si_no_subpattern(para_map.get(pid, {}).get("text", ""))
|
|
buckets_si[sp].append((pid, sigs, counts))
|
|
|
|
print(f" Sub-pattern distribution: {dict((k, len(v)) for k, v in buckets_si.items())}")
|
|
print()
|
|
|
|
# Also find the 23 cases where humans=SI but GenAI=N/O
|
|
human_si_genai_no = []
|
|
for pid, sigs, counts in si_no:
|
|
human_cats = [sigs.get(f"H:{n}") for n in HUMAN_NAMES if f"H:{n}" in sigs]
|
|
genai_cats = [v for k, v in sigs.items() if not k.startswith("H:")]
|
|
human_si = sum(1 for c in human_cats if c == "SI")
|
|
human_no = sum(1 for c in human_cats if c == "N/O")
|
|
genai_si = sum(1 for c in genai_cats if c == "SI")
|
|
genai_no = sum(1 for c in genai_cats if c == "N/O")
|
|
if human_si > human_no and genai_no > genai_si:
|
|
human_si_genai_no.append((pid, sigs, counts))
|
|
|
|
print(f" Cases where humans lean SI but GenAI leans N/O: {len(human_si_genai_no)}")
|
|
print()
|
|
|
|
# (a) Clear actual incident
|
|
print(" ── (a) Clear actual incident described ──\n")
|
|
for pid, sigs, counts in pick_diverse(buckets_si.get("actual-incident", []), 2):
|
|
print_example(pid, sigs, counts, "Actual incident")
|
|
|
|
# (b) Negative assertion
|
|
print(" ── (b) Negative assertion — 'we have not experienced material incidents' ──\n")
|
|
neg_pool = buckets_si.get("negative-assertion", [])
|
|
# Prefer ones in the human-SI-genAI-NO set
|
|
neg_human_si = [x for x in neg_pool if x[0] in {p for p, _, _ in human_si_genai_no}]
|
|
neg_other = [x for x in neg_pool if x[0] not in {p for p, _, _ in human_si_genai_no}]
|
|
pool = neg_human_si[:2] if len(neg_human_si) >= 2 else (neg_human_si + neg_other)[:2]
|
|
for pid, sigs, counts in pool:
|
|
human_cats = [sigs.get(f"H:{n}") for n in HUMAN_NAMES if f"H:{n}" in sigs]
|
|
genai_cats = [v for k, v in sigs.items() if not k.startswith("H:")]
|
|
note = (f"CRUX: Humans keyed on the materiality assessment language. "
|
|
f"Human votes: {Counter(human_cats).most_common()}, "
|
|
f"GenAI votes: {Counter(genai_cats).most_common()}")
|
|
print_example(pid, sigs, counts, "Negative assertion", note)
|
|
|
|
# (c) Hypothetical/conditional
|
|
print(" ── (c) Hypothetical/conditional language ──\n")
|
|
for pid, sigs, counts in pick_diverse(buckets_si.get("hypothetical", []), 2):
|
|
print_example(pid, sigs, counts, "Hypothetical/conditional")
|
|
|
|
# (d) Specific programs/vendors/amounts but no incident
|
|
print(" ── (d) Specific programs/vendors/amounts but no incident ──\n")
|
|
spec_pool = buckets_si.get("specific-no-incident", [])
|
|
if len(spec_pool) < 2:
|
|
spec_pool += buckets_si.get("other", [])
|
|
for pid, sigs, counts in pick_diverse(spec_pool, 2):
|
|
note = "SI because specific details? Or N/O because no event/strategy content?"
|
|
print_example(pid, sigs, counts, "Specific but no incident", note)
|
|
|
|
# Extra: show human-SI / genAI-N/O cases not already shown
|
|
shown_si = set()
|
|
for bucket in buckets_si.values():
|
|
for p, _, _ in bucket[:2]:
|
|
shown_si.add(p)
|
|
extra_human_si = [x for x in human_si_genai_no if x[0] not in shown_si]
|
|
if extra_human_si:
|
|
print(" ── (extra) Additional human=SI, GenAI=N/O cases ──\n")
|
|
for pid, sigs, counts in pick_diverse(extra_human_si, 2):
|
|
human_cats = [sigs.get(f"H:{n}") for n in HUMAN_NAMES if f"H:{n}" in sigs]
|
|
genai_cats = [v for k, v in sigs.items() if not k.startswith("H:")]
|
|
note = (f"Humans: {Counter(human_cats).most_common()}, "
|
|
f"GenAI: {Counter(genai_cats).most_common()}")
|
|
print_example(pid, sigs, counts, "Human=SI, GenAI=N/O", note)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════
|
|
# AXIS 4: Three-way BG ↔ MR ↔ RMP
|
|
# ══════════════════════════════════════════════════════════════════════════
|
|
print("\n" + "=" * 80)
|
|
print(" AXIS 4: Three-way BG ↔ MR ↔ RMP")
|
|
print("=" * 80)
|
|
|
|
three_way = []
|
|
for pid, sigs in signals.items():
|
|
if pid not in holdout_pids:
|
|
continue
|
|
counts = Counter(sigs.values())
|
|
if "BG" in counts and "MR" in counts and "RMP" in counts:
|
|
# Score by how evenly split the three are
|
|
vals = [counts["BG"], counts["MR"], counts["RMP"]]
|
|
total_3 = sum(vals)
|
|
evenness = min(vals) / max(vals) if max(vals) > 0 else 0
|
|
three_way.append((pid, sigs, counts, evenness))
|
|
|
|
three_way.sort(key=lambda x: (-x[3], -sum(x[2].values())))
|
|
print(f"\n Total paragraphs with all three of BG, MR, RMP: {len(three_way)}\n")
|
|
|
|
# Pick diverse examples with enough signals
|
|
seen_co: set[str] = set()
|
|
three_way_selected = []
|
|
for pid, sigs, counts, evenness in three_way:
|
|
if sum(counts.values()) < 10:
|
|
continue
|
|
co = para_map.get(pid, {}).get("companyName", "")
|
|
if co in seen_co:
|
|
continue
|
|
seen_co.add(co)
|
|
three_way_selected.append((pid, sigs, counts, evenness))
|
|
if len(three_way_selected) >= 3:
|
|
break
|
|
|
|
for pid, sigs, counts, evenness in three_way_selected:
|
|
bg_c, mr_c, rmp_c = counts["BG"], counts["MR"], counts["RMP"]
|
|
note = (f"Three-way split: BG={bg_c}, MR={mr_c}, RMP={rmp_c}. "
|
|
f"This paragraph intertwines governance, management roles, and process descriptions.")
|
|
print_example(pid, sigs, counts, "Three-way BG/MR/RMP", note)
|
|
|
|
|
|
# ── Summary statistics ────────────────────────────────────────────────────
|
|
print("\n" + "=" * 80)
|
|
print(" SUMMARY")
|
|
print("=" * 80)
|
|
print(f"""
|
|
Axis 1 (MR↔RMP): {len(mr_rmp)} paragraphs with split signals
|
|
Axis 2 (BG↔MR): {len(bg_mr)} paragraphs with split signals
|
|
Axis 3 (SI↔N/O): {len(si_no)} paragraphs with split signals
|
|
Axis 4 (BG↔MR↔RMP): {len(three_way)} paragraphs with three-way split
|
|
Human=SI/GenAI=N/O: {len(human_si_genai_no)} cases (directional asymmetry)
|
|
""")
|