diff --git a/volatility3/plugins/windows/lateralmove.py b/volatility3/plugins/windows/lateralmove.py new file mode 100644 index 0000000000..c1290d7f96 --- /dev/null +++ b/volatility3/plugins/windows/lateralmove.py @@ -0,0 +1,399 @@ +""" +lateralmove.py - Volatility 3 plugin for lateral movement detection. + +Detects three categories of indicators: + Module 1 - Suspicious parent-child process relationships + (WMI, PsExec, DCOM, scheduled tasks, MMC, browser proxy tools) + Module 2 - Suspicious network connections + (SMB, RPC, WinRM, lsass outbound, non-browser HTTP/S) + Module 3 - Token and session anomalies + (session ID mismatches from non-bridge parents, privilege escalation) + +Usage: + python vol.py -f memory.dmp windows.lateralmove + python vol.py -f memory.dmp windows.lateralmove --lateralmove.config-file custom_combos.json + +Custom config format (JSON): + { + "winword.exe": [["cmd.exe", "powershell.exe"], "Macro execution"], + "excel.exe": [["cmd.exe", "powershell.exe"], "Macro execution"] + } + +Install: + Copy this file to: volatility3/volatility3/plugins/windows/lateralmove.py + +API verified against Volatility 2.27.0: + pslist.PsList._version = (3, 0, 1) + netscan.NetScan._version = (2, 0, 0) + PsList.list_processes params = (context, kernel_module_name, filter_func) + NetScan.scan params = (context, kernel_module_name, netscan_symbol_table) + NetScan.create_netscan_symbol_table = (context, kernel_module_name, config_path) + +Author: Philip Vieyra (github: cybernerdphil) +""" + +import logging +from typing import Dict, Iterator, List, Optional, Set, Tuple + +from volatility3.framework import exceptions, interfaces, renderers +from volatility3.framework.configuration import requirements +from volatility3.plugins.windows import netscan, pslist + +vollog = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# MITRE ATT&CK technique mappings +# --------------------------------------------------------------------------- +MITRE: Dict[str, str] = { + "WMI execution": "T1021.003", + "PsExec / remote service": "T1021.002", + "DCOM execution": "T1021.003", + "Scheduled task spawn": "T1053.005", + "MMC DCOM": "T1021.003", + "SMB connection": "T1021.002", + "WinRM connection": "T1021.006", + "RPC/DCOM port": "T1021.003", + "Suspicious lsass conn": "T1003.001", + "Non-browser HTTP/S": "T1071.001", + "Token impersonation": "T1134.001", + "Privilege escalation": "T1134.002", + "Proxy/tunnel tool": "T1090.001", +} + +# Processes that legitimately bridge session 0 -> user sessions. +# Session mismatch from these parents is normal Windows behaviour. +SESSION_BRIDGE_PARENTS: Set[str] = { + "smss.exe", "services.exe", "svchost.exe", "wininit.exe", + "winlogon.exe", "csrss.exe", "lsass.exe", "taskschd.exe", +} + +# Child process names that make a session mismatch suspicious +SESSION_MISMATCH_SUSPICIOUS_CHILDREN: Set[str] = { + "cmd.exe", "powershell.exe", "pwsh.exe", "wscript.exe", + "cscript.exe", "mshta.exe", "rundll32.exe", "regsvr32.exe", +} + + +def _get_session(proc) -> int: + """Safely get session ID - attribute name varies by Windows version.""" + try: + return int(proc.Session) + except AttributeError: + pass + try: + return int(proc.SessionId) + except AttributeError: + return -1 + + +class LateralMove(interfaces.plugins.PluginInterface): + """Detects lateral movement indicators: suspicious parent-child + relationships, network connections, and token anomalies.""" + + _required_framework_version = (2, 0, 0) + _version = (1, 0, 0) + + DEFAULT_COMBOS: Dict[str, Tuple[List[str], str]] = { + "wmiprvse.exe": ( + ["*"], + "WMI execution", + ), + "services.exe": ( + ["cmd.exe", "powershell.exe", "pwsh.exe", + "wscript.exe", "cscript.exe", "mshta.exe"], + "PsExec / remote service", + ), + "mmc.exe": ( + ["cmd.exe", "powershell.exe", "pwsh.exe"], + "MMC DCOM", + ), + "dllhost.exe": ( + ["cmd.exe", "powershell.exe", "pwsh.exe", "wscript.exe"], + "DCOM execution", + ), + "taskeng.exe": ( + ["cmd.exe", "powershell.exe", "pwsh.exe", + "wscript.exe", "cscript.exe"], + "Scheduled task spawn", + ), + "firefox.exe": ( + ["tor.exe", "proxifier.exe", "stunnel.exe"], + "Proxy/tunnel tool", + ), + "chrome.exe": ( + ["tor.exe", "proxifier.exe", "stunnel.exe"], + "Proxy/tunnel tool", + ), + "msedge.exe": ( + ["tor.exe", "proxifier.exe", "stunnel.exe"], + "Proxy/tunnel tool", + ), + } + + SVCHOST_SUSPICIOUS_CHILDREN: List[str] = [ + "cmd.exe", "powershell.exe", "pwsh.exe", + "wscript.exe", "cscript.exe", "mshta.exe", + ] + + LATERAL_PORTS: Dict[int, str] = { + 445: "SMB connection", + 139: "SMB connection", + 135: "RPC/DCOM port", + 5985: "WinRM connection", + 5986: "WinRM connection", + } + + BROWSER_NAMES: Set[str] = { + "chrome.exe", "firefox.exe", "msedge.exe", + "iexplore.exe", "opera.exe", "brave.exe", + } + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", + component=pslist.PsList, + version=(3, 0, 1), + ), + requirements.VersionRequirement( + name="netscan", + component=netscan.NetScan, + version=(2, 0, 0), + ), + requirements.StringRequirement( + name="config-file", + description="Path to JSON file with custom parent-child combos", + optional=True, + default=None, + ), + requirements.BooleanRequirement( + name="all-connections", + description="Include non-ESTABLISHED connections in Module 2", + optional=True, + default=False, + ), + ] + + def _check_parent_child( + self, + proc_name: str, + parent_name: str, + combos: Dict, + ) -> Optional[str]: + parent_lower = parent_name.lower() + child_lower = proc_name.lower() + + if parent_lower in combos: + children, technique = combos[parent_lower] + if "*" in children or child_lower in [c.lower() for c in children]: + return technique + + if parent_lower == "svchost.exe": + if child_lower in [c.lower() for c in self.SVCHOST_SUSPICIOUS_CHILDREN]: + return "Scheduled task spawn" + + return None + + def _check_connections( + self, + pid: int, + proc_name: str, + connections: Dict[int, List], + ) -> List[Tuple[str, str]]: + findings: List[Tuple[str, str]] = [] + + for conn in connections.get(pid, []): + try: + foreign_port = int(conn.ForeignPort) + + if proc_name.lower() == "lsass.exe" and foreign_port > 0: + findings.append(("Suspicious lsass conn", f"port {foreign_port}")) + continue + + if foreign_port in self.LATERAL_PORTS: + findings.append((self.LATERAL_PORTS[foreign_port], f"port {foreign_port}")) + + if foreign_port in (80, 443) and proc_name.lower() not in self.BROWSER_NAMES: + findings.append(("Non-browser HTTP/S", f"{proc_name} port {foreign_port}")) + + except (exceptions.InvalidAddressException, AttributeError): + continue + + return findings + + def _check_token( + self, + proc, + proc_name: str, + parent_name: str, + parent_session: int, + ) -> List[str]: + findings: List[str] = [] + + try: + proc_session = _get_session(proc) + + if ( + proc_session != -1 + and parent_session != -1 + and proc_session != parent_session + and parent_name.lower() not in SESSION_BRIDGE_PARENTS + and proc_name.lower() in SESSION_MISMATCH_SUSPICIOUS_CHILDREN + ): + findings.append("Token impersonation") + + try: + token = proc.Token.dereference().cast("_TOKEN") + groups = token.Groups + for i in range(int(token.GroupCount)): + group = groups[i] + attrs = int(group.Attributes) + if attrs & 0x20: + sid = group.Sid.dereference() + sub = int(sid.SubAuthority[0]) + if sub >= 0x3000 and proc_session > 0: + findings.append("Privilege escalation") + break + except (exceptions.InvalidAddressException, AttributeError): + pass + + except (exceptions.InvalidAddressException, AttributeError): + pass + + return findings + + def _load_combos(self) -> Dict: + combos = dict(self.DEFAULT_COMBOS) + config_path = self.config.get("config-file") + + if config_path: + try: + import json + + with open(config_path) as f: + user_combos = json.load(f) + combos.update({k.lower(): v for k, v in user_combos.items()}) + vollog.info( + "Loaded %d custom combos from %s", len(user_combos), config_path + ) + except Exception as exc: + vollog.warning("Could not load config file %s: %s", config_path, exc) + + return combos + + def _generator(self) -> Iterator[Tuple[int, Tuple]]: + kernel_name = self.config["kernel"] + combos = self._load_combos() + + proc_map: Dict[int, Tuple[str, int]] = {} + all_procs = list( + pslist.PsList.list_processes( + context=self.context, + kernel_module_name=kernel_name, + ) + ) + + for proc in all_procs: + try: + name = proc.ImageFileName.cast( + "string", + max_length=proc.ImageFileName.vol.count, + errors="replace", + ) + proc_map[int(proc.UniqueProcessId)] = (name, _get_session(proc)) + except exceptions.InvalidAddressException: + continue + + connections: Dict[int, List] = {} + try: + netscan_table = netscan.NetScan.create_netscan_symbol_table( + context=self.context, + kernel_module_name=kernel_name, + config_path=self.config_path, + ) + for conn in netscan.NetScan.scan( + context=self.context, + kernel_module_name=kernel_name, + netscan_symbol_table=netscan_table, + ): + try: + pid = int(conn.Owner.UniqueProcessId) + connections.setdefault(pid, []).append(conn) + except (exceptions.InvalidAddressException, AttributeError): + continue + except Exception as exc: + vollog.warning("NetScan unavailable, Module 2 skipped: %s", exc) + + seen: Set[Tuple[int, str]] = set() + + for proc in all_procs: + try: + pid = int(proc.UniqueProcessId) + ppid = int(proc.InheritedFromUniqueProcessId) + + name = proc.ImageFileName.cast( + "string", + max_length=proc.ImageFileName.vol.count, + errors="replace", + ) + + try: + peb = proc.Peb.dereference() + cmdline = peb.ProcessParameters.CommandLine.get_string() or "" + except Exception: + cmdline = "" + + parent_name, parent_session = proc_map.get(ppid, ("unknown", -1)) + all_findings: List[Tuple[str, str]] = [] + + hit = self._check_parent_child(name, parent_name, combos) + if hit: + all_findings.append((hit, MITRE.get(hit, "-"))) + + for technique, _detail in self._check_connections(pid, name, connections): + all_findings.append((technique, MITRE.get(technique, "-"))) + + for technique in self._check_token(proc, name, parent_name, parent_session): + all_findings.append((technique, MITRE.get(technique, "-"))) + + for technique, mitre_id in all_findings: + key = (pid, technique) + if key in seen: + continue + seen.add(key) + + yield ( + 0, + ( + pid, + ppid, + name, + parent_name, + cmdline[:120], + technique, + mitre_id, + ), + ) + + except exceptions.InvalidAddressException: + continue + + def run(self): + return renderers.TreeGrid( + [ + ("PID", int), + ("PPID", int), + ("Process", str), + ("Parent", str), + ("CmdLine", str), + ("Technique", str), + ("MITRE", str), + ], + self._generator(), + ) \ No newline at end of file diff --git a/volatility3/plugins/windows/test_lateralmove.py b/volatility3/plugins/windows/test_lateralmove.py new file mode 100644 index 0000000000..ac7b7b9bd8 --- /dev/null +++ b/volatility3/plugins/windows/test_lateralmove.py @@ -0,0 +1,270 @@ +import importlib.util +import json +import os +import tempfile +import unittest +from unittest.mock import MagicMock, patch + +_PLUGIN_PATH = os.path.join(os.path.dirname(__file__), "lateralmove.py") +_spec = importlib.util.spec_from_file_location("lateralmove", _PLUGIN_PATH) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +LateralMove = _mod.LateralMove +SESSION_BRIDGE_PARENTS = _mod.SESSION_BRIDGE_PARENTS +SESSION_MISMATCH_SUSPICIOUS_CHILDREN = _mod.SESSION_MISMATCH_SUSPICIOUS_CHILDREN +_get_session = _mod._get_session + + +def _make_plugin(): + plugin = LateralMove.__new__(LateralMove) + plugin._context = MagicMock() + mock_config = MagicMock() + mock_config.get = MagicMock(return_value=None) + mock_config.__getitem__ = MagicMock(return_value="kernel") + plugin._config = mock_config + plugin.config_path = "test.config" + return plugin + + +class TestGetSession(unittest.TestCase): + + def test_returns_session_attribute(self): + proc = MagicMock() + proc.Session = 2 + self.assertEqual(_get_session(proc), 2) + + def test_falls_back_to_session_id(self): + proc = MagicMock(spec=["SessionId"]) + proc.SessionId = 1 + self.assertEqual(_get_session(proc), 1) + + def test_returns_minus_one_when_neither_exists(self): + proc = MagicMock(spec=[]) + self.assertEqual(_get_session(proc), -1) + + +class TestCheckParentChild(unittest.TestCase): + + def setUp(self): + self.plugin = _make_plugin() + self.combos = self.plugin._load_combos() + + def test_wmiprvse_any_child_flagged(self): + self.assertEqual( + self.plugin._check_parent_child("notepad.exe", "wmiprvse.exe", self.combos), + "WMI execution", + ) + + def test_wmiprvse_cmd_flagged(self): + self.assertEqual( + self.plugin._check_parent_child("cmd.exe", "wmiprvse.exe", self.combos), + "WMI execution", + ) + + def test_services_powershell_flagged(self): + self.assertEqual( + self.plugin._check_parent_child("powershell.exe", "services.exe", self.combos), + "PsExec / remote service", + ) + + def test_services_notepad_not_flagged(self): + self.assertIsNone( + self.plugin._check_parent_child("notepad.exe", "services.exe", self.combos) + ) + + def test_mmc_cmd_flagged(self): + self.assertEqual( + self.plugin._check_parent_child("cmd.exe", "mmc.exe", self.combos), + "MMC DCOM", + ) + + def test_dllhost_wscript_flagged(self): + self.assertEqual( + self.plugin._check_parent_child("wscript.exe", "dllhost.exe", self.combos), + "DCOM execution", + ) + + def test_svchost_cmd_flagged(self): + self.assertEqual( + self.plugin._check_parent_child("cmd.exe", "svchost.exe", self.combos), + "Scheduled task spawn", + ) + + def test_svchost_explorer_not_flagged(self): + self.assertIsNone( + self.plugin._check_parent_child("explorer.exe", "svchost.exe", self.combos) + ) + + def test_firefox_tor_flagged(self): + self.assertEqual( + self.plugin._check_parent_child("tor.exe", "firefox.exe", self.combos), + "Proxy/tunnel tool", + ) + + def test_firefox_child_not_flagged(self): + self.assertIsNone( + self.plugin._check_parent_child("firefox.exe", "firefox.exe", self.combos) + ) + + def test_case_insensitive(self): + self.assertEqual( + self.plugin._check_parent_child("CMD.EXE", "WmiPrvSE.exe", self.combos), + "WMI execution", + ) + + def test_benign_parent_none(self): + self.assertIsNone( + self.plugin._check_parent_child("cmd.exe", "explorer.exe", self.combos) + ) + + +class TestCheckConnections(unittest.TestCase): + + def setUp(self): + self.plugin = _make_plugin() + + def _run(self, proc_name, port): + conn = MagicMock() + conn.ForeignPort = port + results = self.plugin._check_connections(1, proc_name, {1: [conn]}) + return [r[0] for r in results] + + def test_smb_445(self): + self.assertIn("SMB connection", self._run("svchost.exe", 445)) + + def test_smb_139(self): + self.assertIn("SMB connection", self._run("svchost.exe", 139)) + + def test_winrm_5985(self): + self.assertIn("WinRM connection", self._run("svchost.exe", 5985)) + + def test_winrm_5986(self): + self.assertIn("WinRM connection", self._run("svchost.exe", 5986)) + + def test_rpc_135(self): + self.assertIn("RPC/DCOM port", self._run("svchost.exe", 135)) + + def test_lsass_outbound(self): + self.assertIn("Suspicious lsass conn", self._run("lsass.exe", 4444)) + + def test_lsass_port_zero_not_flagged(self): + self.assertEqual(self._run("lsass.exe", 0), []) + + def test_non_browser_443(self): + self.assertIn("Non-browser HTTP/S", self._run("powershell.exe", 443)) + + def test_non_browser_80(self): + self.assertIn("Non-browser HTTP/S", self._run("powershell.exe", 80)) + + def test_browser_443_not_flagged(self): + self.assertNotIn("Non-browser HTTP/S", self._run("firefox.exe", 443)) + + def test_browser_80_not_flagged(self): + self.assertNotIn("Non-browser HTTP/S", self._run("chrome.exe", 80)) + + def test_no_connections_empty(self): + self.assertEqual( + self.plugin._check_connections(9999, "explorer.exe", {}), [] + ) + + def test_benign_port_not_flagged(self): + self.assertEqual(self._run("svchost.exe", 8080), []) + + +class TestCheckToken(unittest.TestCase): + + def setUp(self): + self.plugin = _make_plugin() + + def _proc(self, session): + proc = MagicMock() + proc.Session = session + proc.Token.dereference.side_effect = AttributeError("no token") + return proc + + def test_mismatch_suspicious_child_flagged(self): + self.assertIn( + "Token impersonation", + self.plugin._check_token(self._proc(2), "cmd.exe", "explorer.exe", 0), + ) + + def test_mismatch_non_suspicious_child_not_flagged(self): + self.assertNotIn( + "Token impersonation", + self.plugin._check_token(self._proc(2), "notepad.exe", "explorer.exe", 0), + ) + + def test_mismatch_bridge_parent_not_flagged(self): + self.assertNotIn( + "Token impersonation", + self.plugin._check_token(self._proc(2), "cmd.exe", "svchost.exe", 0), + ) + + def test_same_session_not_flagged(self): + self.assertNotIn( + "Token impersonation", + self.plugin._check_token(self._proc(2), "cmd.exe", "explorer.exe", 2), + ) + + def test_unknown_parent_session_not_flagged(self): + self.assertNotIn( + "Token impersonation", + self.plugin._check_token(self._proc(2), "cmd.exe", "unknown", -1), + ) + + +class TestSessionBridgeParents(unittest.TestCase): + + def test_svchost_in_set(self): + self.assertIn("svchost.exe", SESSION_BRIDGE_PARENTS) + + def test_services_in_set(self): + self.assertIn("services.exe", SESSION_BRIDGE_PARENTS) + + def test_smss_in_set(self): + self.assertIn("smss.exe", SESSION_BRIDGE_PARENTS) + + def test_explorer_not_in_set(self): + self.assertNotIn("explorer.exe", SESSION_BRIDGE_PARENTS) + + +class TestLoadCombos(unittest.TestCase): + + def setUp(self): + self.plugin = _make_plugin() + + def test_defaults_present(self): + combos = self.plugin._load_combos() + self.assertIn("wmiprvse.exe", combos) + self.assertIn("services.exe", combos) + self.assertIn("firefox.exe", combos) + + def test_custom_combos_merged(self): + custom = {"winword.exe": [["cmd.exe"], "Macro execution"]} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(custom, f) + tmp_path = f.name + try: + def patched_load(): + import json as _json + combos = dict(LateralMove.DEFAULT_COMBOS) + with open(tmp_path) as fp: + user_combos = _json.load(fp) + combos.update({k.lower(): v for k, v in user_combos.items()}) + return combos + self.plugin._load_combos = patched_load + combos = self.plugin._load_combos() + self.assertIn("winword.exe", combos) + self.assertIn("wmiprvse.exe", combos) + finally: + os.unlink(tmp_path) + + def test_bad_config_path_uses_defaults(self): + self.plugin._config.get = MagicMock(return_value="/nonexistent/path.json") + combos = self.plugin._load_combos() + self.assertIn("wmiprvse.exe", combos) + + +if __name__ == "__main__": + unittest.main(verbosity=2)