From 9d41028517f91582da1e9440a7b06088ac481f0e Mon Sep 17 00:00:00 2001
From: Philip
Date: Fri, 22 May 2026 11:01:51 +1000
Subject: [PATCH] Add windows.lateralmove plugin - lateral movement detection
---
volatility3/plugins/windows/lateralmove.py | 399 ++++++++++++++++++
.../plugins/windows/test_lateralmove.py | 270 ++++++++++++
2 files changed, 669 insertions(+)
create mode 100644 volatility3/plugins/windows/lateralmove.py
create mode 100644 volatility3/plugins/windows/test_lateralmove.py
diff --git a/volatility3/plugins/windows/lateralmove.py b/volatility3/plugins/windows/lateralmove.py
new file mode 100644
index 0000000000..c1290d7f96
--- /dev/null
+++ b/volatility3/plugins/windows/lateralmove.py
@@ -0,0 +1,399 @@
+"""
+lateralmove.py - Volatility 3 plugin for lateral movement detection.
+
+Detects three categories of indicators:
+ Module 1 - Suspicious parent-child process relationships
+ (WMI, PsExec, DCOM, scheduled tasks, MMC, browser proxy tools)
+ Module 2 - Suspicious network connections
+ (SMB, RPC, WinRM, lsass outbound, non-browser HTTP/S)
+ Module 3 - Token and session anomalies
+ (session ID mismatches from non-bridge parents, privilege escalation)
+
+Usage:
+ python vol.py -f memory.dmp windows.lateralmove
+ python vol.py -f memory.dmp windows.lateralmove --lateralmove.config-file custom_combos.json
+
+Custom config format (JSON):
+ {
+ "winword.exe": [["cmd.exe", "powershell.exe"], "Macro execution"],
+ "excel.exe": [["cmd.exe", "powershell.exe"], "Macro execution"]
+ }
+
+Install:
+ Copy this file to: volatility3/volatility3/plugins/windows/lateralmove.py
+
+API verified against Volatility 2.27.0:
+ pslist.PsList._version = (3, 0, 1)
+ netscan.NetScan._version = (2, 0, 0)
+ PsList.list_processes params = (context, kernel_module_name, filter_func)
+ NetScan.scan params = (context, kernel_module_name, netscan_symbol_table)
+ NetScan.create_netscan_symbol_table = (context, kernel_module_name, config_path)
+
+Author: Philip Vieyra (github: cybernerdphil)
+"""
+
+import logging
+from typing import Dict, Iterator, List, Optional, Set, Tuple
+
+from volatility3.framework import exceptions, interfaces, renderers
+from volatility3.framework.configuration import requirements
+from volatility3.plugins.windows import netscan, pslist
+
+vollog = logging.getLogger(__name__)
+
+# ---------------------------------------------------------------------------
+# MITRE ATT&CK technique mappings
+# ---------------------------------------------------------------------------
+MITRE: Dict[str, str] = {
+ "WMI execution": "T1021.003",
+ "PsExec / remote service": "T1021.002",
+ "DCOM execution": "T1021.003",
+ "Scheduled task spawn": "T1053.005",
+ "MMC DCOM": "T1021.003",
+ "SMB connection": "T1021.002",
+ "WinRM connection": "T1021.006",
+ "RPC/DCOM port": "T1021.003",
+ "Suspicious lsass conn": "T1003.001",
+ "Non-browser HTTP/S": "T1071.001",
+ "Token impersonation": "T1134.001",
+ "Privilege escalation": "T1134.002",
+ "Proxy/tunnel tool": "T1090.001",
+}
+
+# Processes that legitimately bridge session 0 -> user sessions.
+# Session mismatch from these parents is normal Windows behaviour.
+SESSION_BRIDGE_PARENTS: Set[str] = {
+ "smss.exe", "services.exe", "svchost.exe", "wininit.exe",
+ "winlogon.exe", "csrss.exe", "lsass.exe", "taskschd.exe",
+}
+
+# Child process names that make a session mismatch suspicious
+SESSION_MISMATCH_SUSPICIOUS_CHILDREN: Set[str] = {
+ "cmd.exe", "powershell.exe", "pwsh.exe", "wscript.exe",
+ "cscript.exe", "mshta.exe", "rundll32.exe", "regsvr32.exe",
+}
+
+
+def _get_session(proc) -> int:
+ """Safely get session ID - attribute name varies by Windows version."""
+ try:
+ return int(proc.Session)
+ except AttributeError:
+ pass
+ try:
+ return int(proc.SessionId)
+ except AttributeError:
+ return -1
+
+
+class LateralMove(interfaces.plugins.PluginInterface):
+ """Detects lateral movement indicators: suspicious parent-child
+ relationships, network connections, and token anomalies."""
+
+ _required_framework_version = (2, 0, 0)
+ _version = (1, 0, 0)
+
+ DEFAULT_COMBOS: Dict[str, Tuple[List[str], str]] = {
+ "wmiprvse.exe": (
+ ["*"],
+ "WMI execution",
+ ),
+ "services.exe": (
+ ["cmd.exe", "powershell.exe", "pwsh.exe",
+ "wscript.exe", "cscript.exe", "mshta.exe"],
+ "PsExec / remote service",
+ ),
+ "mmc.exe": (
+ ["cmd.exe", "powershell.exe", "pwsh.exe"],
+ "MMC DCOM",
+ ),
+ "dllhost.exe": (
+ ["cmd.exe", "powershell.exe", "pwsh.exe", "wscript.exe"],
+ "DCOM execution",
+ ),
+ "taskeng.exe": (
+ ["cmd.exe", "powershell.exe", "pwsh.exe",
+ "wscript.exe", "cscript.exe"],
+ "Scheduled task spawn",
+ ),
+ "firefox.exe": (
+ ["tor.exe", "proxifier.exe", "stunnel.exe"],
+ "Proxy/tunnel tool",
+ ),
+ "chrome.exe": (
+ ["tor.exe", "proxifier.exe", "stunnel.exe"],
+ "Proxy/tunnel tool",
+ ),
+ "msedge.exe": (
+ ["tor.exe", "proxifier.exe", "stunnel.exe"],
+ "Proxy/tunnel tool",
+ ),
+ }
+
+ SVCHOST_SUSPICIOUS_CHILDREN: List[str] = [
+ "cmd.exe", "powershell.exe", "pwsh.exe",
+ "wscript.exe", "cscript.exe", "mshta.exe",
+ ]
+
+ LATERAL_PORTS: Dict[int, str] = {
+ 445: "SMB connection",
+ 139: "SMB connection",
+ 135: "RPC/DCOM port",
+ 5985: "WinRM connection",
+ 5986: "WinRM connection",
+ }
+
+ BROWSER_NAMES: Set[str] = {
+ "chrome.exe", "firefox.exe", "msedge.exe",
+ "iexplore.exe", "opera.exe", "brave.exe",
+ }
+
+ @classmethod
+ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
+ return [
+ requirements.ModuleRequirement(
+ name="kernel",
+ description="Windows kernel",
+ architectures=["Intel32", "Intel64"],
+ ),
+ requirements.VersionRequirement(
+ name="pslist",
+ component=pslist.PsList,
+ version=(3, 0, 1),
+ ),
+ requirements.VersionRequirement(
+ name="netscan",
+ component=netscan.NetScan,
+ version=(2, 0, 0),
+ ),
+ requirements.StringRequirement(
+ name="config-file",
+ description="Path to JSON file with custom parent-child combos",
+ optional=True,
+ default=None,
+ ),
+ requirements.BooleanRequirement(
+ name="all-connections",
+ description="Include non-ESTABLISHED connections in Module 2",
+ optional=True,
+ default=False,
+ ),
+ ]
+
+ def _check_parent_child(
+ self,
+ proc_name: str,
+ parent_name: str,
+ combos: Dict,
+ ) -> Optional[str]:
+ parent_lower = parent_name.lower()
+ child_lower = proc_name.lower()
+
+ if parent_lower in combos:
+ children, technique = combos[parent_lower]
+ if "*" in children or child_lower in [c.lower() for c in children]:
+ return technique
+
+ if parent_lower == "svchost.exe":
+ if child_lower in [c.lower() for c in self.SVCHOST_SUSPICIOUS_CHILDREN]:
+ return "Scheduled task spawn"
+
+ return None
+
+ def _check_connections(
+ self,
+ pid: int,
+ proc_name: str,
+ connections: Dict[int, List],
+ ) -> List[Tuple[str, str]]:
+ findings: List[Tuple[str, str]] = []
+
+ for conn in connections.get(pid, []):
+ try:
+ foreign_port = int(conn.ForeignPort)
+
+ if proc_name.lower() == "lsass.exe" and foreign_port > 0:
+ findings.append(("Suspicious lsass conn", f"port {foreign_port}"))
+ continue
+
+ if foreign_port in self.LATERAL_PORTS:
+ findings.append((self.LATERAL_PORTS[foreign_port], f"port {foreign_port}"))
+
+ if foreign_port in (80, 443) and proc_name.lower() not in self.BROWSER_NAMES:
+ findings.append(("Non-browser HTTP/S", f"{proc_name} port {foreign_port}"))
+
+ except (exceptions.InvalidAddressException, AttributeError):
+ continue
+
+ return findings
+
+ def _check_token(
+ self,
+ proc,
+ proc_name: str,
+ parent_name: str,
+ parent_session: int,
+ ) -> List[str]:
+ findings: List[str] = []
+
+ try:
+ proc_session = _get_session(proc)
+
+ if (
+ proc_session != -1
+ and parent_session != -1
+ and proc_session != parent_session
+ and parent_name.lower() not in SESSION_BRIDGE_PARENTS
+ and proc_name.lower() in SESSION_MISMATCH_SUSPICIOUS_CHILDREN
+ ):
+ findings.append("Token impersonation")
+
+ try:
+ token = proc.Token.dereference().cast("_TOKEN")
+ groups = token.Groups
+ for i in range(int(token.GroupCount)):
+ group = groups[i]
+ attrs = int(group.Attributes)
+ if attrs & 0x20:
+ sid = group.Sid.dereference()
+ sub = int(sid.SubAuthority[0])
+ if sub >= 0x3000 and proc_session > 0:
+ findings.append("Privilege escalation")
+ break
+ except (exceptions.InvalidAddressException, AttributeError):
+ pass
+
+ except (exceptions.InvalidAddressException, AttributeError):
+ pass
+
+ return findings
+
+ def _load_combos(self) -> Dict:
+ combos = dict(self.DEFAULT_COMBOS)
+ config_path = self.config.get("config-file")
+
+ if config_path:
+ try:
+ import json
+
+ with open(config_path) as f:
+ user_combos = json.load(f)
+ combos.update({k.lower(): v for k, v in user_combos.items()})
+ vollog.info(
+ "Loaded %d custom combos from %s", len(user_combos), config_path
+ )
+ except Exception as exc:
+ vollog.warning("Could not load config file %s: %s", config_path, exc)
+
+ return combos
+
+ def _generator(self) -> Iterator[Tuple[int, Tuple]]:
+ kernel_name = self.config["kernel"]
+ combos = self._load_combos()
+
+ proc_map: Dict[int, Tuple[str, int]] = {}
+ all_procs = list(
+ pslist.PsList.list_processes(
+ context=self.context,
+ kernel_module_name=kernel_name,
+ )
+ )
+
+ for proc in all_procs:
+ try:
+ name = proc.ImageFileName.cast(
+ "string",
+ max_length=proc.ImageFileName.vol.count,
+ errors="replace",
+ )
+ proc_map[int(proc.UniqueProcessId)] = (name, _get_session(proc))
+ except exceptions.InvalidAddressException:
+ continue
+
+ connections: Dict[int, List] = {}
+ try:
+ netscan_table = netscan.NetScan.create_netscan_symbol_table(
+ context=self.context,
+ kernel_module_name=kernel_name,
+ config_path=self.config_path,
+ )
+ for conn in netscan.NetScan.scan(
+ context=self.context,
+ kernel_module_name=kernel_name,
+ netscan_symbol_table=netscan_table,
+ ):
+ try:
+ pid = int(conn.Owner.UniqueProcessId)
+ connections.setdefault(pid, []).append(conn)
+ except (exceptions.InvalidAddressException, AttributeError):
+ continue
+ except Exception as exc:
+ vollog.warning("NetScan unavailable, Module 2 skipped: %s", exc)
+
+ seen: Set[Tuple[int, str]] = set()
+
+ for proc in all_procs:
+ try:
+ pid = int(proc.UniqueProcessId)
+ ppid = int(proc.InheritedFromUniqueProcessId)
+
+ name = proc.ImageFileName.cast(
+ "string",
+ max_length=proc.ImageFileName.vol.count,
+ errors="replace",
+ )
+
+ try:
+ peb = proc.Peb.dereference()
+ cmdline = peb.ProcessParameters.CommandLine.get_string() or ""
+ except Exception:
+ cmdline = ""
+
+ parent_name, parent_session = proc_map.get(ppid, ("unknown", -1))
+ all_findings: List[Tuple[str, str]] = []
+
+ hit = self._check_parent_child(name, parent_name, combos)
+ if hit:
+ all_findings.append((hit, MITRE.get(hit, "-")))
+
+ for technique, _detail in self._check_connections(pid, name, connections):
+ all_findings.append((technique, MITRE.get(technique, "-")))
+
+ for technique in self._check_token(proc, name, parent_name, parent_session):
+ all_findings.append((technique, MITRE.get(technique, "-")))
+
+ for technique, mitre_id in all_findings:
+ key = (pid, technique)
+ if key in seen:
+ continue
+ seen.add(key)
+
+ yield (
+ 0,
+ (
+ pid,
+ ppid,
+ name,
+ parent_name,
+ cmdline[:120],
+ technique,
+ mitre_id,
+ ),
+ )
+
+ except exceptions.InvalidAddressException:
+ continue
+
+ def run(self):
+ return renderers.TreeGrid(
+ [
+ ("PID", int),
+ ("PPID", int),
+ ("Process", str),
+ ("Parent", str),
+ ("CmdLine", str),
+ ("Technique", str),
+ ("MITRE", str),
+ ],
+ self._generator(),
+ )
\ No newline at end of file
diff --git a/volatility3/plugins/windows/test_lateralmove.py b/volatility3/plugins/windows/test_lateralmove.py
new file mode 100644
index 0000000000..ac7b7b9bd8
--- /dev/null
+++ b/volatility3/plugins/windows/test_lateralmove.py
@@ -0,0 +1,270 @@
+import importlib.util
+import json
+import os
+import tempfile
+import unittest
+from unittest.mock import MagicMock, patch
+
+_PLUGIN_PATH = os.path.join(os.path.dirname(__file__), "lateralmove.py")
+_spec = importlib.util.spec_from_file_location("lateralmove", _PLUGIN_PATH)
+_mod = importlib.util.module_from_spec(_spec)
+_spec.loader.exec_module(_mod)
+
+LateralMove = _mod.LateralMove
+SESSION_BRIDGE_PARENTS = _mod.SESSION_BRIDGE_PARENTS
+SESSION_MISMATCH_SUSPICIOUS_CHILDREN = _mod.SESSION_MISMATCH_SUSPICIOUS_CHILDREN
+_get_session = _mod._get_session
+
+
+def _make_plugin():
+ plugin = LateralMove.__new__(LateralMove)
+ plugin._context = MagicMock()
+ mock_config = MagicMock()
+ mock_config.get = MagicMock(return_value=None)
+ mock_config.__getitem__ = MagicMock(return_value="kernel")
+ plugin._config = mock_config
+ plugin.config_path = "test.config"
+ return plugin
+
+
+class TestGetSession(unittest.TestCase):
+
+ def test_returns_session_attribute(self):
+ proc = MagicMock()
+ proc.Session = 2
+ self.assertEqual(_get_session(proc), 2)
+
+ def test_falls_back_to_session_id(self):
+ proc = MagicMock(spec=["SessionId"])
+ proc.SessionId = 1
+ self.assertEqual(_get_session(proc), 1)
+
+ def test_returns_minus_one_when_neither_exists(self):
+ proc = MagicMock(spec=[])
+ self.assertEqual(_get_session(proc), -1)
+
+
+class TestCheckParentChild(unittest.TestCase):
+
+ def setUp(self):
+ self.plugin = _make_plugin()
+ self.combos = self.plugin._load_combos()
+
+ def test_wmiprvse_any_child_flagged(self):
+ self.assertEqual(
+ self.plugin._check_parent_child("notepad.exe", "wmiprvse.exe", self.combos),
+ "WMI execution",
+ )
+
+ def test_wmiprvse_cmd_flagged(self):
+ self.assertEqual(
+ self.plugin._check_parent_child("cmd.exe", "wmiprvse.exe", self.combos),
+ "WMI execution",
+ )
+
+ def test_services_powershell_flagged(self):
+ self.assertEqual(
+ self.plugin._check_parent_child("powershell.exe", "services.exe", self.combos),
+ "PsExec / remote service",
+ )
+
+ def test_services_notepad_not_flagged(self):
+ self.assertIsNone(
+ self.plugin._check_parent_child("notepad.exe", "services.exe", self.combos)
+ )
+
+ def test_mmc_cmd_flagged(self):
+ self.assertEqual(
+ self.plugin._check_parent_child("cmd.exe", "mmc.exe", self.combos),
+ "MMC DCOM",
+ )
+
+ def test_dllhost_wscript_flagged(self):
+ self.assertEqual(
+ self.plugin._check_parent_child("wscript.exe", "dllhost.exe", self.combos),
+ "DCOM execution",
+ )
+
+ def test_svchost_cmd_flagged(self):
+ self.assertEqual(
+ self.plugin._check_parent_child("cmd.exe", "svchost.exe", self.combos),
+ "Scheduled task spawn",
+ )
+
+ def test_svchost_explorer_not_flagged(self):
+ self.assertIsNone(
+ self.plugin._check_parent_child("explorer.exe", "svchost.exe", self.combos)
+ )
+
+ def test_firefox_tor_flagged(self):
+ self.assertEqual(
+ self.plugin._check_parent_child("tor.exe", "firefox.exe", self.combos),
+ "Proxy/tunnel tool",
+ )
+
+ def test_firefox_child_not_flagged(self):
+ self.assertIsNone(
+ self.plugin._check_parent_child("firefox.exe", "firefox.exe", self.combos)
+ )
+
+ def test_case_insensitive(self):
+ self.assertEqual(
+ self.plugin._check_parent_child("CMD.EXE", "WmiPrvSE.exe", self.combos),
+ "WMI execution",
+ )
+
+ def test_benign_parent_none(self):
+ self.assertIsNone(
+ self.plugin._check_parent_child("cmd.exe", "explorer.exe", self.combos)
+ )
+
+
+class TestCheckConnections(unittest.TestCase):
+
+ def setUp(self):
+ self.plugin = _make_plugin()
+
+ def _run(self, proc_name, port):
+ conn = MagicMock()
+ conn.ForeignPort = port
+ results = self.plugin._check_connections(1, proc_name, {1: [conn]})
+ return [r[0] for r in results]
+
+ def test_smb_445(self):
+ self.assertIn("SMB connection", self._run("svchost.exe", 445))
+
+ def test_smb_139(self):
+ self.assertIn("SMB connection", self._run("svchost.exe", 139))
+
+ def test_winrm_5985(self):
+ self.assertIn("WinRM connection", self._run("svchost.exe", 5985))
+
+ def test_winrm_5986(self):
+ self.assertIn("WinRM connection", self._run("svchost.exe", 5986))
+
+ def test_rpc_135(self):
+ self.assertIn("RPC/DCOM port", self._run("svchost.exe", 135))
+
+ def test_lsass_outbound(self):
+ self.assertIn("Suspicious lsass conn", self._run("lsass.exe", 4444))
+
+ def test_lsass_port_zero_not_flagged(self):
+ self.assertEqual(self._run("lsass.exe", 0), [])
+
+ def test_non_browser_443(self):
+ self.assertIn("Non-browser HTTP/S", self._run("powershell.exe", 443))
+
+ def test_non_browser_80(self):
+ self.assertIn("Non-browser HTTP/S", self._run("powershell.exe", 80))
+
+ def test_browser_443_not_flagged(self):
+ self.assertNotIn("Non-browser HTTP/S", self._run("firefox.exe", 443))
+
+ def test_browser_80_not_flagged(self):
+ self.assertNotIn("Non-browser HTTP/S", self._run("chrome.exe", 80))
+
+ def test_no_connections_empty(self):
+ self.assertEqual(
+ self.plugin._check_connections(9999, "explorer.exe", {}), []
+ )
+
+ def test_benign_port_not_flagged(self):
+ self.assertEqual(self._run("svchost.exe", 8080), [])
+
+
+class TestCheckToken(unittest.TestCase):
+
+ def setUp(self):
+ self.plugin = _make_plugin()
+
+ def _proc(self, session):
+ proc = MagicMock()
+ proc.Session = session
+ proc.Token.dereference.side_effect = AttributeError("no token")
+ return proc
+
+ def test_mismatch_suspicious_child_flagged(self):
+ self.assertIn(
+ "Token impersonation",
+ self.plugin._check_token(self._proc(2), "cmd.exe", "explorer.exe", 0),
+ )
+
+ def test_mismatch_non_suspicious_child_not_flagged(self):
+ self.assertNotIn(
+ "Token impersonation",
+ self.plugin._check_token(self._proc(2), "notepad.exe", "explorer.exe", 0),
+ )
+
+ def test_mismatch_bridge_parent_not_flagged(self):
+ self.assertNotIn(
+ "Token impersonation",
+ self.plugin._check_token(self._proc(2), "cmd.exe", "svchost.exe", 0),
+ )
+
+ def test_same_session_not_flagged(self):
+ self.assertNotIn(
+ "Token impersonation",
+ self.plugin._check_token(self._proc(2), "cmd.exe", "explorer.exe", 2),
+ )
+
+ def test_unknown_parent_session_not_flagged(self):
+ self.assertNotIn(
+ "Token impersonation",
+ self.plugin._check_token(self._proc(2), "cmd.exe", "unknown", -1),
+ )
+
+
+class TestSessionBridgeParents(unittest.TestCase):
+
+ def test_svchost_in_set(self):
+ self.assertIn("svchost.exe", SESSION_BRIDGE_PARENTS)
+
+ def test_services_in_set(self):
+ self.assertIn("services.exe", SESSION_BRIDGE_PARENTS)
+
+ def test_smss_in_set(self):
+ self.assertIn("smss.exe", SESSION_BRIDGE_PARENTS)
+
+ def test_explorer_not_in_set(self):
+ self.assertNotIn("explorer.exe", SESSION_BRIDGE_PARENTS)
+
+
+class TestLoadCombos(unittest.TestCase):
+
+ def setUp(self):
+ self.plugin = _make_plugin()
+
+ def test_defaults_present(self):
+ combos = self.plugin._load_combos()
+ self.assertIn("wmiprvse.exe", combos)
+ self.assertIn("services.exe", combos)
+ self.assertIn("firefox.exe", combos)
+
+ def test_custom_combos_merged(self):
+ custom = {"winword.exe": [["cmd.exe"], "Macro execution"]}
+ with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
+ json.dump(custom, f)
+ tmp_path = f.name
+ try:
+ def patched_load():
+ import json as _json
+ combos = dict(LateralMove.DEFAULT_COMBOS)
+ with open(tmp_path) as fp:
+ user_combos = _json.load(fp)
+ combos.update({k.lower(): v for k, v in user_combos.items()})
+ return combos
+ self.plugin._load_combos = patched_load
+ combos = self.plugin._load_combos()
+ self.assertIn("winword.exe", combos)
+ self.assertIn("wmiprvse.exe", combos)
+ finally:
+ os.unlink(tmp_path)
+
+ def test_bad_config_path_uses_defaults(self):
+ self.plugin._config.get = MagicMock(return_value="/nonexistent/path.json")
+ combos = self.plugin._load_combos()
+ self.assertIn("wmiprvse.exe", combos)
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)