SEC-cyBERT/python/scripts/dictionary_baseline.py

333 lines
12 KiB
Python

"""Keyword/dictionary baseline classifier.
A simple rule-based classifier built directly from the v2 codebook IS/NOT
lists. Serves as the "additional baseline" required by the A-grade rubric
and demonstrates how much of the task can be solved with hand-crafted rules
vs. the trained ModernBERT.
Category: keyword voting per category, with NOT-cyber filter for N/O.
Specificity: cascade matching the codebook decision test (L4 → L3 → L2 → L1).
Eval against the same proxy gold (GPT-5.4, Opus-4.6) as the trained model
on the 1,200-paragraph holdout. Reuses metric helpers from src.finetune.eval.
"""
import json
import re
from pathlib import Path
import numpy as np
from src.finetune.data import CAT2ID, CATEGORIES
from src.finetune.eval import (
SPEC_LABELS,
compute_all_metrics,
format_report,
load_holdout_data,
)
PARAGRAPHS_PATH = "../data/paragraphs/paragraphs-clean.patched.jsonl"
HOLDOUT_PATH = "../data/gold/v2-holdout-ids.json"
BENCHMARK_PATHS = {
"GPT-5.4": "../data/annotations/v2-bench/gpt-5.4.jsonl",
"Opus-4.6": "../data/annotations/v2-bench/opus-4.6.jsonl",
}
OUTPUT_DIR = Path("../results/eval/dictionary-baseline")
# ─── Category keywords (lowercased; word-boundary matched) ───
# Drawn directly from codebook "Key markers" lists.
CAT_KEYWORDS: dict[str, list[str]] = {
"Board Governance": [
"board of directors", "board oversees", "board oversight",
"audit committee", "risk committee of the board",
"board committee", "reports to the board", "report to the board",
"briefings to the board", "briefed the board", "informs the board",
"board-level", "board level", "directors oversee",
],
"Management Role": [
"ciso", "chief information security officer",
"chief security officer", "cso ",
"vp of information security", "vp of security",
"vice president of information security",
"information security officer",
"director of information security", "director of cybersecurity",
"head of information security", "head of cybersecurity",
"reports to the cio", "reports to the cfo", "reports to the ceo",
"years of experience", "cissp", "cism", "crisc", "ceh",
"management committee", "steering committee",
],
"Risk Management Process": [
"nist csf", "nist cybersecurity framework",
"iso 27001", "iso 27002", "cis controls",
"vulnerability management", "vulnerability assessment",
"vulnerability scanning", "penetration testing", "pen testing",
"red team", "phishing simulation", "security awareness training",
"threat intelligence", "threat hunting", "patch management",
"siem", "soc ", "security operations center",
"edr", "xdr", "mdr", "endpoint detection",
"incident response plan", "tabletop exercise",
"intrusion detection", "intrusion prevention",
"multi-factor authentication", "mfa",
"zero trust", "defense in depth", "least privilege",
"encryption", "network segmentation",
"data loss prevention", "dlp",
"identity and access management", "iam",
],
"Third-Party Risk": [
"third-party", "third party", "service provider", "service providers",
"vendor risk", "vendor management", "supply chain",
"soc 2", "soc 1", "soc 2 type",
"contractual security", "contractual requirements",
"supplier", "supplier risk", "outsourced",
],
"Incident Disclosure": [
"unauthorized access", "detected unauthorized",
"we detected", "have detected", "we discovered",
"data breach", "security breach",
"forensic investigation", "engaged mandiant",
"incident response was activated", "ransomware attack",
"compromised", "exfiltrated", "exfiltration",
"on or about", "began on", "discovered on",
"notified law enforcement",
],
"Strategy Integration": [
"materially affected", "material effect",
"reasonably likely to materially affect",
"have not experienced any material",
"cybersecurity insurance", "cyber insurance",
"insurance coverage", "cybersecurity budget",
"cybersecurity investment", "investment in cybersecurity",
],
"None/Other": [
"forward-looking statement", "forward looking statement",
"see item 1a", "refer to item 1a",
"special purpose acquisition",
"no cybersecurity program",
],
}
# Cyber-mention test for N/O fallback: if NONE of these appear, → N/O
CYBER_TERMS = [
"cyber", "cybersecurity", "information security", "infosec",
"data security", "network security", "it security", "data breach",
"ransomware", "malware", "phishing", "hacker", "intrusion",
"encryption", "vulnerability",
]
# ─── Specificity dictionaries (from codebook) ───
DOMAIN_TERMS = [
"penetration testing", "pen testing", "vulnerability scanning",
"vulnerability assessment", "vulnerability management",
"red team", "phishing simulation", "security awareness training",
"threat hunting", "threat intelligence", "patch management",
"identity and access management", "iam",
"data loss prevention", "dlp", "network segmentation",
"siem", "security information and event management",
"soc ", "security operations center",
"edr", "xdr", "mdr", "waf", "web application firewall",
"ids ", "ips ", "intrusion detection", "intrusion prevention",
"mfa", "2fa", "multi-factor authentication", "two-factor authentication",
"zero trust", "defense in depth", "least privilege",
"nist csf", "nist cybersecurity framework",
"iso 27001", "iso 27002", "soc 2", "cis controls", "cis benchmarks",
"pci dss", "hipaa", "gdpr", "cobit", "mitre att&ck",
"ransomware", "malware", "phishing", "ddos",
"supply chain attack", "supply chain compromise",
"social engineering", "advanced persistent threat", "apt",
"zero-day", "zero day",
]
# IS firm-specific patterns (regex with word boundaries)
FIRM_SPECIFIC_PATTERNS = [
r"\bciso\b", r"\bcto\b", r"\bcio\b",
r"\bchief information security officer\b",
r"\bchief security officer\b",
r"\bvp of (information )?security\b",
r"\bvice president of (information )?security\b",
r"\binformation security officer\b",
r"\bdirector of (information )?security\b",
r"\bdirector of cybersecurity\b",
r"\bhead of (information )?security\b",
r"\bcybersecurity committee\b",
r"\bcybersecurity steering committee\b",
r"\btechnology committee\b",
r"\brisk committee\b",
r"\b24/7\b",
r"\bcyber incident response plan\b",
r"\bcirp\b",
]
# QV-eligible: numbers + dates + named tools/firms + certifications
QV_PATTERNS = [
# Dollar amounts
r"\$\d",
# Percentages
r"\b\d+(\.\d+)?\s?%",
# Years of experience as a number
r"\b\d+\+?\s+years",
# Headcounts / team sizes
r"\b(team|staff|employees|professionals|members)\s+of\s+\d+",
r"\b\d+\s+(employees|professionals|engineers|analysts|members)",
# Specific dates
r"\b(january|february|march|april|may|june|july|august|september|october|november|december)\s+\d{1,2},?\s+\d{4}\b",
r"\b\d{4}-\d{2}-\d{2}\b",
# Named cybersecurity vendors/tools
r"\bmandiant\b", r"\bcrowdstrike\b", r"\bsplunk\b",
r"\bpalo alto\b", r"\bfortinet\b", r"\bdarktrace\b",
r"\bsentinel\b", r"\bservicenow\b", r"\bdeloitte\b",
r"\bkpmg\b", r"\bpwc\b", r"\bey\b", r"\baccenture\b",
# Individual certifications
r"\bcissp\b", r"\bcism\b", r"\bcrisc\b", r"\bceh\b", r"\bcompt(ia)?\b",
# Company-held certifications (verifiable)
r"\b(maintain|achieved|certified|completed)[^.]{0,40}\b(iso 27001|soc 2 type|fedramp)\b",
# Universities (credential context)
r"\b(ph\.?d|master'?s|bachelor'?s)\b[^.]{0,30}\b(university|institute)\b",
]
def predict_category(text: str) -> int:
"""Vote-based keyword classifier. Falls back to N/O if no cyber terms."""
text_l = text.lower()
# N/O fallback: if no cybersecurity terms present, it's N/O
if not any(term in text_l for term in CYBER_TERMS):
return CAT2ID["None/Other"]
scores: dict[str, int] = {c: 0 for c in CATEGORIES}
for cat, kws in CAT_KEYWORDS.items():
for kw in kws:
if kw in text_l:
scores[cat] += 1
# Strong N/O signal: explicit forward-looking + no other category fires
if scores["None/Other"] > 0 and sum(scores.values()) - scores["None/Other"] == 0:
return CAT2ID["None/Other"]
# Pick the highest-scoring category. Tie-break by codebook rule order:
# ID > BG > MR > TP > SI > RMP > N/O (more specific > general)
priority = [
"Incident Disclosure", "Board Governance", "Management Role",
"Third-Party Risk", "Strategy Integration", "Risk Management Process",
"None/Other",
]
best_score = max(scores.values())
if best_score == 0:
return CAT2ID["Risk Management Process"] # fallback for cyber text with no marker hits
for c in priority:
if scores[c] == best_score:
return CAT2ID[c]
return CAT2ID["Risk Management Process"]
def predict_specificity(text: str) -> int:
"""Cascade matching the codebook decision test. Returns 0-indexed level."""
text_l = text.lower()
# Level 4: any QV-eligible fact
for pat in QV_PATTERNS:
if re.search(pat, text_l):
return 3
# Level 3: any firm-specific pattern
for pat in FIRM_SPECIFIC_PATTERNS:
if re.search(pat, text_l):
return 2
# Level 2: any domain term
for term in DOMAIN_TERMS:
if term in text_l:
return 1
# Level 1: generic
return 0
def main() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print("\n Dictionary baseline — keyword voting + cascade specificity")
records = load_holdout_data(PARAGRAPHS_PATH, HOLDOUT_PATH, BENCHMARK_PATHS)
print(f" Holdout paragraphs: {len(records)}")
cat_preds_arr = np.array([predict_category(r["text"]) for r in records])
spec_preds_arr = np.array([predict_specificity(r["text"]) for r in records])
# One-hot "probabilities" for AUC/ECE machinery
cat_probs_arr = np.zeros((len(records), len(CATEGORIES)))
cat_probs_arr[np.arange(len(records)), cat_preds_arr] = 1.0
spec_probs_arr = np.zeros((len(records), len(SPEC_LABELS)))
spec_probs_arr[np.arange(len(records)), spec_preds_arr] = 1.0
all_results = {}
for ref_name in BENCHMARK_PATHS:
print(f"\n Evaluating dictionary baseline vs {ref_name}...")
cat_labels, spec_labels = [], []
c_preds, s_preds = [], []
c_probs, s_probs = [], []
for i, rec in enumerate(records):
bench = rec["benchmark_labels"].get(ref_name)
if bench is None:
continue
cat_labels.append(CAT2ID[bench["category"]])
spec_labels.append(bench["specificity"] - 1)
c_preds.append(cat_preds_arr[i])
s_preds.append(spec_preds_arr[i])
c_probs.append(cat_probs_arr[i])
s_probs.append(spec_probs_arr[i])
cat_labels = np.array(cat_labels)
spec_labels = np.array(spec_labels)
c_preds = np.array(c_preds)
s_preds = np.array(s_preds)
c_probs = np.array(c_probs)
s_probs = np.array(s_probs)
cat_metrics = compute_all_metrics(
c_preds, cat_labels, c_probs, CATEGORIES, "cat", is_ordinal=False
)
spec_metrics = compute_all_metrics(
s_preds, spec_labels, s_probs, SPEC_LABELS, "spec", is_ordinal=True
)
inference_stub = {
"num_samples": len(cat_labels),
"total_time_s": 0.0,
"avg_ms_per_sample": 0.001, # rules are essentially free
}
combined = {**cat_metrics, **spec_metrics, **inference_stub}
combined["combined_macro_f1"] = (combined["cat_macro_f1"] + combined["spec_macro_f1"]) / 2
report = format_report("dictionary-baseline", ref_name, combined, inference_stub)
print(report)
report_path = OUTPUT_DIR / f"report_{ref_name.lower().replace(' ', '_').replace('.', '')}.txt"
with open(report_path, "w") as f:
f.write(report)
all_results[f"dictionary_vs_{ref_name}"] = combined
serializable = {}
for k, v in all_results.items():
serializable[k] = {
mk: mv for mk, mv in v.items()
if isinstance(mv, (int, float, str, list, bool))
}
with open(OUTPUT_DIR / "metrics.json", "w") as f:
json.dump(serializable, f, indent=2, default=str)
print(f"\n Results saved to {OUTPUT_DIR}")
if __name__ == "__main__":
main()