333 lines
12 KiB
Python
333 lines
12 KiB
Python
"""Keyword/dictionary baseline classifier.
|
|
|
|
A simple rule-based classifier built directly from the v2 codebook IS/NOT
|
|
lists. Serves as the "additional baseline" required by the A-grade rubric
|
|
and demonstrates how much of the task can be solved with hand-crafted rules
|
|
vs. the trained ModernBERT.
|
|
|
|
Category: keyword voting per category, with NOT-cyber filter for N/O.
|
|
Specificity: cascade matching the codebook decision test (L4 → L3 → L2 → L1).
|
|
|
|
Eval against the same proxy gold (GPT-5.4, Opus-4.6) as the trained model
|
|
on the 1,200-paragraph holdout. Reuses metric helpers from src.finetune.eval.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
from src.finetune.data import CAT2ID, CATEGORIES
|
|
from src.finetune.eval import (
|
|
SPEC_LABELS,
|
|
compute_all_metrics,
|
|
format_report,
|
|
load_holdout_data,
|
|
)
|
|
|
|
|
|
PARAGRAPHS_PATH = "../data/paragraphs/paragraphs-clean.patched.jsonl"
|
|
HOLDOUT_PATH = "../data/gold/v2-holdout-ids.json"
|
|
BENCHMARK_PATHS = {
|
|
"GPT-5.4": "../data/annotations/v2-bench/gpt-5.4.jsonl",
|
|
"Opus-4.6": "../data/annotations/v2-bench/opus-4.6.jsonl",
|
|
}
|
|
OUTPUT_DIR = Path("../results/eval/dictionary-baseline")
|
|
|
|
|
|
# ─── Category keywords (lowercased; word-boundary matched) ───
|
|
# Drawn directly from codebook "Key markers" lists.
|
|
|
|
CAT_KEYWORDS: dict[str, list[str]] = {
|
|
"Board Governance": [
|
|
"board of directors", "board oversees", "board oversight",
|
|
"audit committee", "risk committee of the board",
|
|
"board committee", "reports to the board", "report to the board",
|
|
"briefings to the board", "briefed the board", "informs the board",
|
|
"board-level", "board level", "directors oversee",
|
|
],
|
|
"Management Role": [
|
|
"ciso", "chief information security officer",
|
|
"chief security officer", "cso ",
|
|
"vp of information security", "vp of security",
|
|
"vice president of information security",
|
|
"information security officer",
|
|
"director of information security", "director of cybersecurity",
|
|
"head of information security", "head of cybersecurity",
|
|
"reports to the cio", "reports to the cfo", "reports to the ceo",
|
|
"years of experience", "cissp", "cism", "crisc", "ceh",
|
|
"management committee", "steering committee",
|
|
],
|
|
"Risk Management Process": [
|
|
"nist csf", "nist cybersecurity framework",
|
|
"iso 27001", "iso 27002", "cis controls",
|
|
"vulnerability management", "vulnerability assessment",
|
|
"vulnerability scanning", "penetration testing", "pen testing",
|
|
"red team", "phishing simulation", "security awareness training",
|
|
"threat intelligence", "threat hunting", "patch management",
|
|
"siem", "soc ", "security operations center",
|
|
"edr", "xdr", "mdr", "endpoint detection",
|
|
"incident response plan", "tabletop exercise",
|
|
"intrusion detection", "intrusion prevention",
|
|
"multi-factor authentication", "mfa",
|
|
"zero trust", "defense in depth", "least privilege",
|
|
"encryption", "network segmentation",
|
|
"data loss prevention", "dlp",
|
|
"identity and access management", "iam",
|
|
],
|
|
"Third-Party Risk": [
|
|
"third-party", "third party", "service provider", "service providers",
|
|
"vendor risk", "vendor management", "supply chain",
|
|
"soc 2", "soc 1", "soc 2 type",
|
|
"contractual security", "contractual requirements",
|
|
"supplier", "supplier risk", "outsourced",
|
|
],
|
|
"Incident Disclosure": [
|
|
"unauthorized access", "detected unauthorized",
|
|
"we detected", "have detected", "we discovered",
|
|
"data breach", "security breach",
|
|
"forensic investigation", "engaged mandiant",
|
|
"incident response was activated", "ransomware attack",
|
|
"compromised", "exfiltrated", "exfiltration",
|
|
"on or about", "began on", "discovered on",
|
|
"notified law enforcement",
|
|
],
|
|
"Strategy Integration": [
|
|
"materially affected", "material effect",
|
|
"reasonably likely to materially affect",
|
|
"have not experienced any material",
|
|
"cybersecurity insurance", "cyber insurance",
|
|
"insurance coverage", "cybersecurity budget",
|
|
"cybersecurity investment", "investment in cybersecurity",
|
|
],
|
|
"None/Other": [
|
|
"forward-looking statement", "forward looking statement",
|
|
"see item 1a", "refer to item 1a",
|
|
"special purpose acquisition",
|
|
"no cybersecurity program",
|
|
],
|
|
}
|
|
|
|
# Cyber-mention test for N/O fallback: if NONE of these appear, → N/O
|
|
CYBER_TERMS = [
|
|
"cyber", "cybersecurity", "information security", "infosec",
|
|
"data security", "network security", "it security", "data breach",
|
|
"ransomware", "malware", "phishing", "hacker", "intrusion",
|
|
"encryption", "vulnerability",
|
|
]
|
|
|
|
|
|
# ─── Specificity dictionaries (from codebook) ───
|
|
|
|
DOMAIN_TERMS = [
|
|
"penetration testing", "pen testing", "vulnerability scanning",
|
|
"vulnerability assessment", "vulnerability management",
|
|
"red team", "phishing simulation", "security awareness training",
|
|
"threat hunting", "threat intelligence", "patch management",
|
|
"identity and access management", "iam",
|
|
"data loss prevention", "dlp", "network segmentation",
|
|
"siem", "security information and event management",
|
|
"soc ", "security operations center",
|
|
"edr", "xdr", "mdr", "waf", "web application firewall",
|
|
"ids ", "ips ", "intrusion detection", "intrusion prevention",
|
|
"mfa", "2fa", "multi-factor authentication", "two-factor authentication",
|
|
"zero trust", "defense in depth", "least privilege",
|
|
"nist csf", "nist cybersecurity framework",
|
|
"iso 27001", "iso 27002", "soc 2", "cis controls", "cis benchmarks",
|
|
"pci dss", "hipaa", "gdpr", "cobit", "mitre att&ck",
|
|
"ransomware", "malware", "phishing", "ddos",
|
|
"supply chain attack", "supply chain compromise",
|
|
"social engineering", "advanced persistent threat", "apt",
|
|
"zero-day", "zero day",
|
|
]
|
|
|
|
# IS firm-specific patterns (regex with word boundaries)
|
|
FIRM_SPECIFIC_PATTERNS = [
|
|
r"\bciso\b", r"\bcto\b", r"\bcio\b",
|
|
r"\bchief information security officer\b",
|
|
r"\bchief security officer\b",
|
|
r"\bvp of (information )?security\b",
|
|
r"\bvice president of (information )?security\b",
|
|
r"\binformation security officer\b",
|
|
r"\bdirector of (information )?security\b",
|
|
r"\bdirector of cybersecurity\b",
|
|
r"\bhead of (information )?security\b",
|
|
r"\bcybersecurity committee\b",
|
|
r"\bcybersecurity steering committee\b",
|
|
r"\btechnology committee\b",
|
|
r"\brisk committee\b",
|
|
r"\b24/7\b",
|
|
r"\bcyber incident response plan\b",
|
|
r"\bcirp\b",
|
|
]
|
|
|
|
# QV-eligible: numbers + dates + named tools/firms + certifications
|
|
QV_PATTERNS = [
|
|
# Dollar amounts
|
|
r"\$\d",
|
|
# Percentages
|
|
r"\b\d+(\.\d+)?\s?%",
|
|
# Years of experience as a number
|
|
r"\b\d+\+?\s+years",
|
|
# Headcounts / team sizes
|
|
r"\b(team|staff|employees|professionals|members)\s+of\s+\d+",
|
|
r"\b\d+\s+(employees|professionals|engineers|analysts|members)",
|
|
# Specific dates
|
|
r"\b(january|february|march|april|may|june|july|august|september|october|november|december)\s+\d{1,2},?\s+\d{4}\b",
|
|
r"\b\d{4}-\d{2}-\d{2}\b",
|
|
# Named cybersecurity vendors/tools
|
|
r"\bmandiant\b", r"\bcrowdstrike\b", r"\bsplunk\b",
|
|
r"\bpalo alto\b", r"\bfortinet\b", r"\bdarktrace\b",
|
|
r"\bsentinel\b", r"\bservicenow\b", r"\bdeloitte\b",
|
|
r"\bkpmg\b", r"\bpwc\b", r"\bey\b", r"\baccenture\b",
|
|
# Individual certifications
|
|
r"\bcissp\b", r"\bcism\b", r"\bcrisc\b", r"\bceh\b", r"\bcompt(ia)?\b",
|
|
# Company-held certifications (verifiable)
|
|
r"\b(maintain|achieved|certified|completed)[^.]{0,40}\b(iso 27001|soc 2 type|fedramp)\b",
|
|
# Universities (credential context)
|
|
r"\b(ph\.?d|master'?s|bachelor'?s)\b[^.]{0,30}\b(university|institute)\b",
|
|
]
|
|
|
|
|
|
def predict_category(text: str) -> int:
|
|
"""Vote-based keyword classifier. Falls back to N/O if no cyber terms."""
|
|
text_l = text.lower()
|
|
|
|
# N/O fallback: if no cybersecurity terms present, it's N/O
|
|
if not any(term in text_l for term in CYBER_TERMS):
|
|
return CAT2ID["None/Other"]
|
|
|
|
scores: dict[str, int] = {c: 0 for c in CATEGORIES}
|
|
for cat, kws in CAT_KEYWORDS.items():
|
|
for kw in kws:
|
|
if kw in text_l:
|
|
scores[cat] += 1
|
|
|
|
# Strong N/O signal: explicit forward-looking + no other category fires
|
|
if scores["None/Other"] > 0 and sum(scores.values()) - scores["None/Other"] == 0:
|
|
return CAT2ID["None/Other"]
|
|
|
|
# Pick the highest-scoring category. Tie-break by codebook rule order:
|
|
# ID > BG > MR > TP > SI > RMP > N/O (more specific > general)
|
|
priority = [
|
|
"Incident Disclosure", "Board Governance", "Management Role",
|
|
"Third-Party Risk", "Strategy Integration", "Risk Management Process",
|
|
"None/Other",
|
|
]
|
|
best_score = max(scores.values())
|
|
if best_score == 0:
|
|
return CAT2ID["Risk Management Process"] # fallback for cyber text with no marker hits
|
|
for c in priority:
|
|
if scores[c] == best_score:
|
|
return CAT2ID[c]
|
|
|
|
return CAT2ID["Risk Management Process"]
|
|
|
|
|
|
def predict_specificity(text: str) -> int:
|
|
"""Cascade matching the codebook decision test. Returns 0-indexed level."""
|
|
text_l = text.lower()
|
|
|
|
# Level 4: any QV-eligible fact
|
|
for pat in QV_PATTERNS:
|
|
if re.search(pat, text_l):
|
|
return 3
|
|
|
|
# Level 3: any firm-specific pattern
|
|
for pat in FIRM_SPECIFIC_PATTERNS:
|
|
if re.search(pat, text_l):
|
|
return 2
|
|
|
|
# Level 2: any domain term
|
|
for term in DOMAIN_TERMS:
|
|
if term in text_l:
|
|
return 1
|
|
|
|
# Level 1: generic
|
|
return 0
|
|
|
|
|
|
def main() -> None:
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
print("\n Dictionary baseline — keyword voting + cascade specificity")
|
|
records = load_holdout_data(PARAGRAPHS_PATH, HOLDOUT_PATH, BENCHMARK_PATHS)
|
|
print(f" Holdout paragraphs: {len(records)}")
|
|
|
|
cat_preds_arr = np.array([predict_category(r["text"]) for r in records])
|
|
spec_preds_arr = np.array([predict_specificity(r["text"]) for r in records])
|
|
|
|
# One-hot "probabilities" for AUC/ECE machinery
|
|
cat_probs_arr = np.zeros((len(records), len(CATEGORIES)))
|
|
cat_probs_arr[np.arange(len(records)), cat_preds_arr] = 1.0
|
|
spec_probs_arr = np.zeros((len(records), len(SPEC_LABELS)))
|
|
spec_probs_arr[np.arange(len(records)), spec_preds_arr] = 1.0
|
|
|
|
all_results = {}
|
|
|
|
for ref_name in BENCHMARK_PATHS:
|
|
print(f"\n Evaluating dictionary baseline vs {ref_name}...")
|
|
|
|
cat_labels, spec_labels = [], []
|
|
c_preds, s_preds = [], []
|
|
c_probs, s_probs = [], []
|
|
|
|
for i, rec in enumerate(records):
|
|
bench = rec["benchmark_labels"].get(ref_name)
|
|
if bench is None:
|
|
continue
|
|
cat_labels.append(CAT2ID[bench["category"]])
|
|
spec_labels.append(bench["specificity"] - 1)
|
|
c_preds.append(cat_preds_arr[i])
|
|
s_preds.append(spec_preds_arr[i])
|
|
c_probs.append(cat_probs_arr[i])
|
|
s_probs.append(spec_probs_arr[i])
|
|
|
|
cat_labels = np.array(cat_labels)
|
|
spec_labels = np.array(spec_labels)
|
|
c_preds = np.array(c_preds)
|
|
s_preds = np.array(s_preds)
|
|
c_probs = np.array(c_probs)
|
|
s_probs = np.array(s_probs)
|
|
|
|
cat_metrics = compute_all_metrics(
|
|
c_preds, cat_labels, c_probs, CATEGORIES, "cat", is_ordinal=False
|
|
)
|
|
spec_metrics = compute_all_metrics(
|
|
s_preds, spec_labels, s_probs, SPEC_LABELS, "spec", is_ordinal=True
|
|
)
|
|
|
|
inference_stub = {
|
|
"num_samples": len(cat_labels),
|
|
"total_time_s": 0.0,
|
|
"avg_ms_per_sample": 0.001, # rules are essentially free
|
|
}
|
|
|
|
combined = {**cat_metrics, **spec_metrics, **inference_stub}
|
|
combined["combined_macro_f1"] = (combined["cat_macro_f1"] + combined["spec_macro_f1"]) / 2
|
|
|
|
report = format_report("dictionary-baseline", ref_name, combined, inference_stub)
|
|
print(report)
|
|
|
|
report_path = OUTPUT_DIR / f"report_{ref_name.lower().replace(' ', '_').replace('.', '')}.txt"
|
|
with open(report_path, "w") as f:
|
|
f.write(report)
|
|
|
|
all_results[f"dictionary_vs_{ref_name}"] = combined
|
|
|
|
serializable = {}
|
|
for k, v in all_results.items():
|
|
serializable[k] = {
|
|
mk: mv for mk, mv in v.items()
|
|
if isinstance(mv, (int, float, str, list, bool))
|
|
}
|
|
with open(OUTPUT_DIR / "metrics.json", "w") as f:
|
|
json.dump(serializable, f, indent=2, default=str)
|
|
|
|
print(f"\n Results saved to {OUTPUT_DIR}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|