From 1c0718f8b8b5bcb515a571c02b4eece05c8c6c2c Mon Sep 17 00:00:00 2001 From: Scott Brumley Date: Fri, 26 Jun 2026 12:58:42 -0400 Subject: [PATCH] feat(testing-harness): add schema mutation tests and DSS seed tooling - tools/schema_mutation_test.py: schema mutation test harness - tools/send_pan_dss_seed.sh: seed socfw identity source data into pan_dss_raw - tools/send_test_events.py: extend test-event sender - scenarios/h7_full_chain.yml: new full-chain scenario - scenarios/h8_full_chain.yml: scenario updates --- scenarios/h7_full_chain.yml | 15 ++ scenarios/h8_full_chain.yml | 7 +- tools/schema_mutation_test.py | 266 ++++++++++++++++++++++++++++++++++ tools/send_pan_dss_seed.sh | 31 ++++ tools/send_test_events.py | 64 ++++++-- 5 files changed, 368 insertions(+), 15 deletions(-) create mode 100644 scenarios/h7_full_chain.yml create mode 100644 tools/schema_mutation_test.py create mode 100755 tools/send_pan_dss_seed.sh mode change 100644 => 100755 tools/send_test_events.py diff --git a/scenarios/h7_full_chain.yml b/scenarios/h7_full_chain.yml new file mode 100644 index 00000000..ad920030 --- /dev/null +++ b/scenarios/h7_full_chain.yml @@ -0,0 +1,15 @@ +# H8: Full kill chain — Proofpoint + CrowdStrike EPP + CrowdStrike IDP +# Tests: Cross-source grouping, full NIST IR lifecycle E2E +scenario: Turla Carbon — Full Kill Chain (H8) +compress_window: 30m + +sources: + - name: ProofPoint TAP + file: input_tsv/ProofPoint-MITRE-Turla-Carbon-in-XSIAM.tsv + env: .env-brumxdr-proofpoint + - name: CrowdStrike + file: input_tsv/CrowdStrike-MITRE-Turla-Carbon-in-XSIAM.tsv + env: .env-brumxdr-crowdstrike + - name: CrowdStrike IDP + file: input_tsv/CrowdStrike-IDP-Turla-Carbon-Synthetic.tsv + env: .env-brumxdr-crowdstrike diff --git a/scenarios/h8_full_chain.yml b/scenarios/h8_full_chain.yml index ad920030..3ad292a8 100644 --- a/scenarios/h8_full_chain.yml +++ b/scenarios/h8_full_chain.yml @@ -6,10 +6,7 @@ compress_window: 30m sources: - name: ProofPoint TAP file: input_tsv/ProofPoint-MITRE-Turla-Carbon-in-XSIAM.tsv - env: .env-brumxdr-proofpoint + env: .env-skynet-proofpoint - name: CrowdStrike file: input_tsv/CrowdStrike-MITRE-Turla-Carbon-in-XSIAM.tsv - env: .env-brumxdr-crowdstrike - - name: CrowdStrike IDP - file: input_tsv/CrowdStrike-IDP-Turla-Carbon-Synthetic.tsv - env: .env-brumxdr-crowdstrike + env: .env-skynet-crowdstrike diff --git a/tools/schema_mutation_test.py b/tools/schema_mutation_test.py new file mode 100644 index 00000000..54d73a18 --- /dev/null +++ b/tools/schema_mutation_test.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 +""" +schema_mutation_test.py — Empirically test how an XSIAM dataset reacts to +schema changes, without touching any production dataset. + +Point it at a DEDICATED test HTTP Collector (custom name -> custom dataset). +It sends a sequence of rows, each tagged with a unique `test_marker`, that +exercise every way data can change: add fields, drop fields, change types, +diverge field names, collide on id, and a volume/no-decay check. After each +step it prints the exact XQL to run so you can observe what happened to the +rows that already existed. + +NOTHING here can harm crowdstrike_falcon_event_raw or any real dataset — it +only writes to whatever dataset the test collector in your .env routes to. + +Usage: + python3 schema_mutation_test.py --env .env-brumxdr-schematest + python3 schema_mutation_test.py --env .env-brumxdr-schematest --test T3 + python3 schema_mutation_test.py --env .env-brumxdr-schematest --all + python3 schema_mutation_test.py --env .env-prod-schematest --all --dry-run + +The .env file must contain: + API_URL= + API_KEY= + +Recommended: name the collector/dataset something obvious like +`schema_mutation_test_raw` so it's unmistakable and easy to delete after. +""" + +import argparse +import json +import os +import ssl +import sys +import time +import urllib.request +import urllib.error +from datetime import datetime, timezone + + +# ───────────────────────────────────────────────────────────────────────────── +# env loading +# ───────────────────────────────────────────────────────────────────────────── + +def load_env(path): + """Read KEY=VALUE lines from an env file. Returns (api_url, api_key).""" + if not os.path.exists(path): + sys.exit(f"env file not found: {path}") + url = key = None + with open(path) as f: + for line in f: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, v = line.split("=", 1) + k, v = k.strip(), v.strip().strip('"').strip("'") + if k == "API_URL": + url = v + elif k == "API_KEY": + key = v + if not url: + sys.exit(f"API_URL missing from {path}") + if not key: + sys.exit(f"API_KEY missing from {path}") + return url, key + + +def now_iso(): + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z") + + +# ───────────────────────────────────────────────────────────────────────────── +# sending +# ───────────────────────────────────────────────────────────────────────────── + +def send(url, key, rows, dry_run=False, ssl_ctx=None): + """POST a JSON array of event dicts to the HTTP collector.""" + payload = json.dumps(rows).encode("utf-8") + if dry_run: + print(f" [DRY RUN] would POST {len(rows)} row(s) to {url}") + print(f" sample: {json.dumps(rows[0])[:200]}") + return True + req = urllib.request.Request( + url, + data=payload, + method="POST", + headers={ + "Content-Type": "application/json", + "Authorization": key, + }, + ) + try: + with urllib.request.urlopen(req, timeout=30, context=ssl_ctx) as resp: + body = resp.read().decode("utf-8", "replace") + print(f" HTTP {resp.status} sent {len(rows)} row(s) resp: {body[:200]}") + return 200 <= resp.status < 300 + except urllib.error.HTTPError as e: + print(f" HTTP ERROR {e.code}: {e.read().decode('utf-8','replace')[:300]}") + return False + except urllib.error.URLError as e: + print(f" CONNECTION ERROR: {e.reason}") + return False + + +# ───────────────────────────────────────────────────────────────────────────── +# test rows — one builder per phase. Each row carries test_marker + _time. +# A stable id field `event_uid` lets us probe collision behavior in T6. +# ───────────────────────────────────────────────────────────────────────────── + +def base10(marker, uid): + """The baseline 10-field shape every later test diverges from.""" + return { + "test_marker": marker, + "event_uid": uid, + "_time": now_iso(), + "f_string": "alpha", + "f_int": 100, + "f_bool": True, + "f_user": "gunter@skt.local", + "f_host": "host-01", + "f_ip": "10.0.0.1", + "f_score": 42, + } + + +def build(test): + """Return (description, list-of-rows, post_query_hint) for a given phase.""" + if test == "T0": + rows = [base10("T0", "uid-T0-001")] + return ("Baseline control: 10 fields, 1 row.", rows, + 'filter test_marker = "T0" → expect exactly 1 row, 10 fields.') + + if test == "T1": + r = base10("T1", "uid-T1-001") + r["f_new_one"] = "added-field" # 10 -> 11 + return ("Add ONE field (10→11).", [r], + 'filter test_marker in ("T0","T1") → T0 still present; ' + 'f_new_one column exists and is null on T0.') + + if test == "T2": + r = base10("T2", "uid-T2-001") + for i in range(1, 10): # 11 -> 20 + r[f"f_bulk_{i}"] = f"v{i}" + return ("Add NINE fields at once (11→20).", [r], + 'filter test_marker in ("T0","T1","T2") → all prior rows intact.') + + if test == "T3": + # NARROWER row: only 5 of the original fields, rest omitted. + r = { + "test_marker": "T3", + "event_uid": "uid-T3-001", + "_time": now_iso(), + "f_string": "narrow", + "f_int": 5, + } + return ("Drop fields on NEW rows (send only 5).", [r], + 'filter test_marker in ("T0","T1","T2","T3") → CRITICAL: confirm ' + 'T0/T1/T2 still hold their f_user/f_host/etc values (NOT nulled). ' + 'T3 simply has nulls for omitted columns. ' + 'This is the "does narrow data strip old columns" test.') + + if test == "T4": + # TYPE COLLISION: f_int was integer; send it as a string now. + r = base10("T4", "uid-T4-001") + r["f_int"] = "not-a-number" + r["f_score"] = "high" # was 42 (int) + return ("Type change on existing fields (int→string).", [r], + 'filter test_marker in ("T0","T4") → both rows present. ' + 'Then query f_int / f_score across all rows and watch for QUERY ' + 'ERRORS or coercion. Row loss is the failure; query error is a caveat.') + + if test == "T5": + # FIELD-NAME DIVERGENCE: near-miss name vs f_user. + r = base10("T5", "uid-T5-001") + del r["f_user"] + r["fuser"] = "frieda@skt.local" # typo'd / diverged name + return ("Field-name divergence (f_user → fuser).", [r], + 'Confirm fuser is a NEW column; f_user on prior rows unchanged. ' + 'Proves a renamed/typo field does not overwrite the original column.') + + if test == "T6": + # ID COLLISION: reuse T0's event_uid with a changed payload. + r = base10("T6", "uid-T0-001") # SAME uid as T0 + r["f_string"] = "COLLISION-CHANGED" + r["f_int"] = 999 + return ("ID collision: reuse T0 event_uid with changed payload.", [r], + 'filter event_uid = "uid-T0-001" → THE KEY TEST: ' + 'append-only = TWO rows (original "alpha" + new "COLLISION-CHANGED"). ' + 'Upsert/overwrite = ONE row with the changed payload. ' + 'If you see overwrite, sending colliding ids is destructive.') + + if test == "T7": + # VOLUME / NO-DECAY: a batch, then you wait and re-query. + rows = [base10("T7", f"uid-T7-{i:03d}") for i in range(1, 21)] + return ("Volume + no-decay: send 20 rows.", rows, + 'Query count() by test_marker → all markers T0..T7 present. ' + 'Then WAIT 10-15 min, re-run → nothing decayed/disappeared.') + + sys.exit(f"unknown test: {test}") + + +ORDER = ["T0", "T1", "T2", "T3", "T4", "T5", "T6", "T7"] + + +# ───────────────────────────────────────────────────────────────────────────── +# main +# ───────────────────────────────────────────────────────────────────────────── + +def run_phase(test, url, key, dry_run, ssl_ctx=None): + desc, rows, hint = build(test) + print(f"\n=== {test}: {desc} ===") + ok = send(url, key, rows, dry_run=dry_run, ssl_ctx=ssl_ctx) + print(f" AFTER THIS STEP, run in XQL Search:") + print(f" dataset = ") + print(f" | {hint}") + return ok + + +def main(): + ap = argparse.ArgumentParser(description="XSIAM dataset schema-mutation test harness") + ap.add_argument("--env", required=True, help="path to .env file with API_URL + API_KEY") + ap.add_argument("--test", help="run a single phase (T0..T7)") + ap.add_argument("--all", action="store_true", help="run the full T0..T7 sequence") + ap.add_argument("--dry-run", action="store_true", help="show what would be sent, send nothing") + ap.add_argument("--pause", type=float, default=3.0, help="seconds between phases (default 3)") + ap.add_argument("--insecure", action="store_true", + help="skip TLS verification (use on SSL-inspected/corporate networks)") + ap.add_argument("--ca-bundle", + help="path to a CA bundle PEM to trust (preferred over --insecure)") + args = ap.parse_args() + + url, key = load_env(args.env) + + ssl_ctx = None + if args.ca_bundle: + ssl_ctx = ssl.create_default_context(cafile=args.ca_bundle) + elif args.insecure: + ssl_ctx = ssl.create_default_context() + ssl_ctx.check_hostname = False + ssl_ctx.verify_mode = ssl.CERT_NONE + + print(f"[*] env: {args.env}") + print(f"[*] collector: {url}") + print(f"[*] mode: {'DRY RUN' if args.dry_run else 'LIVE'}" + + (" [TLS verify OFF]" if args.insecure else "") + + (f" [CA: {args.ca_bundle}]" if args.ca_bundle else "")) + print(f"[!] This writes ONLY to the dataset the above collector routes to.") + print(f"[!] It cannot affect crowdstrike_falcon_event_raw or any other dataset.") + + if args.test: + run_phase(args.test, url, key, args.dry_run, ssl_ctx) + elif args.all: + for t in ORDER: + run_phase(t, url, key, args.dry_run, ssl_ctx) + if t != ORDER[-1]: + time.sleep(args.pause) + print("\n[*] Full matrix sent. Decision rule:") + print(" Any phase where a PRIOR test_marker row vanished or its values") + print(" changed = that operation is PROD-UNSAFE. Everything that left") + print(" prior rows intact is proven safe on this platform version.") + else: + sys.exit("specify --test T0 (single phase) or --all (full sequence)") + + +if __name__ == "__main__": + main() diff --git a/tools/send_pan_dss_seed.sh b/tools/send_pan_dss_seed.sh new file mode 100755 index 00000000..6e38af50 --- /dev/null +++ b/tools/send_pan_dss_seed.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash +# Seed socfw identity source data into pan_dss_raw via the brumxdr HTTP Collector. +# Then run correlation rule 416 (SOC IdentityResolve) to build socfw_identity_map. +# +# Reads API_URL + API_KEY from the same env file the replay tool uses. +# Adjust ENV path if yours differs. +set -euo pipefail + +ENV="${1:-.env-brumxdr-pan-dss}" +SEED="${2:-pan_dss_raw_seed.json}" + +# Pull collector URL + token from the env file (KEY=VALUE lines). +API_URL="$(grep -E '^API_URL=' "$ENV" | cut -d= -f2-)" +API_KEY="$(grep -E '^API_KEY=' "$ENV" | cut -d= -f2-)" + +if [ -z "$API_URL" ] || [ -z "$API_KEY" ]; then + echo "API_URL or API_KEY missing from $ENV" >&2 + exit 1 +fi + +echo "[*] Posting $(python3 -c "import json;print(len(json.load(open('$SEED'))))") rows to $API_URL" + +curl --fail --silent --show-error \ + -X POST "$API_URL" \ + -H "Authorization: $API_KEY" \ + -H "Content-Type: application/json" \ + --data-binary "@$SEED" + +echo +echo "[*] Done. Now run rule 416 (SOC IdentityResolve) to build socfw_identity_map," +echo " then verify: dataset = socfw_identity_map | fields sid, upn, netbios_and_sam_account_name" diff --git a/tools/send_test_events.py b/tools/send_test_events.py old mode 100644 new mode 100755 index 5cf15ebf..979d8bc6 --- a/tools/send_test_events.py +++ b/tools/send_test_events.py @@ -195,24 +195,68 @@ def rebase_timestamps( # ---------- Sender (unchanged API) ---------- -def send_events(events, api_url: str, api_key: str): +def send_events(events, api_url: str, api_key: str, + batch_size: int = 5, max_body_bytes: int = 512_000): """ Pure sender: assumes events already have whatever timestamps you want. - """ - body = "\n".join(json.dumps(ev) for ev in events) + Batches the POST. A single large body (e.g. 138 CrowdStrike EPP events at + ~17KB each = 2.3MB) returns HTTP 200 but is silently dropped by the HTTP + collector — nothing lands in the dataset. Small sources (2-event Proofpoint) + work because the body is trivial. To avoid that, send in chunks. + + Two ceilings are enforced per POST, whichever is hit first: + - batch_size: max events per POST (default 5) + - max_body_bytes: max serialized body size per POST (default ~512KB) + + Events are NDJSON (one JSON object per line), unchanged from before. + """ headers = { "Authorization": api_key, # bare api_key as before "Content-Type": "application/json", } - print(f"[*] Sending {len(events)} events to {api_url}") - resp = requests.post(api_url, headers=headers, data=body, timeout=30) - print(f"[+] HTTP {resp.status_code}") - try: - print(resp.text) - except Exception: - pass + total = len(events) + print(f"[*] Sending {total} events to {api_url} " + f"(batch_size={batch_size}, max_body={max_body_bytes}B)") + + # Build batches honoring both the count cap and the byte cap. + batches = [] + cur, cur_bytes = [], 0 + for ev in events: + line = json.dumps(ev) + ln = len(line) + 1 # +1 for the newline join + # flush if adding this event would breach either ceiling + if cur and (len(cur) >= batch_size or cur_bytes + ln > max_body_bytes): + batches.append(cur) + cur, cur_bytes = [], 0 + cur.append(ev) + cur_bytes += ln + if cur: + batches.append(cur) + + sent = 0 + failures = 0 + for i, batch in enumerate(batches, 1): + body = "\n".join(json.dumps(ev) for ev in batch) + try: + resp = requests.post(api_url, headers=headers, data=body, timeout=30) + ok = 200 <= resp.status_code < 300 + sent += len(batch) if ok else 0 + failures += 0 if ok else 1 + print(f" batch {i}/{len(batches)}: {len(batch)} events " + f"({len(body)}B) → HTTP {resp.status_code} {resp.text.strip()[:80]}") + if not ok: + print(f" [!] batch {i} failed; stopping.") + break + except requests.RequestException as e: + failures += 1 + print(f" [!] batch {i} POST error: {e}") + break + + print(f"[+] Done: {sent}/{total} events accepted across " + f"{len(batches)} batch(es); {failures} failed batch(es).") + return failures == 0 # ---------- CLI ----------