diff --git a/docs/dependencies_table.rst b/docs/dependencies_table.rst
index 7ae600d..8558951 100644
--- a/docs/dependencies_table.rst
+++ b/docs/dependencies_table.rst
@@ -165,6 +165,8 @@
+-------------------------------+-------------------+--------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+
| pyinstaller-hooks-contrib | 2026.0 | Apache Software License; GNU General Public License v2 (GPLv2) | Community maintained hooks for PyInstaller |
+-------------------------------+-------------------+--------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+
+| PyMySQL | >=1.1.0 | MIT License | Pure-Python MySQL client library |
++-------------------------------+-------------------+--------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+
| pylint | 4.0.4 | GPL-2.0-or-later | python code static checker |
+-------------------------------+-------------------+--------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+
| pyproject_hooks | 1.2.0 | MIT License | Wrappers to call pyproject.toml-based build backend hooks. |
diff --git a/docs/developer_reference.rst b/docs/developer_reference.rst
index 4703625..53ab1dd 100644
--- a/docs/developer_reference.rst
+++ b/docs/developer_reference.rst
@@ -16,6 +16,7 @@ Documentation of PEAT's Python codebase and APIs.
device_api
device_modules
general_apis
+ mysql
heat_api
python_examples
elastic_implementation
diff --git a/docs/general_apis.rst b/docs/general_apis.rst
index e59f352..cdd883c 100644
--- a/docs/general_apis.rst
+++ b/docs/general_apis.rst
@@ -83,6 +83,12 @@ HTTP
:members:
:private-members:
+MySQL
+^^^^^
+.. automodule:: peat.protocols.mysql
+ :members:
+ :private-members:
+
SNMP
^^^^
.. automodule:: peat.protocols.snmp
diff --git a/docs/mysql.rst b/docs/mysql.rst
new file mode 100644
index 0000000..251ec22
--- /dev/null
+++ b/docs/mysql.rst
@@ -0,0 +1,62 @@
+*****
+MySQL
+*****
+
+MySQL and MariaDB are widely deployed relational database servers. In OT environments, MySQL/MariaDB instances are commonly found on historian servers, HMI backends, and engineering workstations.
+
+PEAT supports both unauthenticated fingerprinting (via the TCP greeting packet) and authenticated enumeration (via SQL queries) using the :class:`~peat.protocols.mysql.MySQL` protocol class.
+
+.. seealso::
+
+ :class:`~peat.protocols.mysql.MySQL`
+ PEAT protocol class for MySQL/MariaDB connections
+
+ `MySQL Documentation `__
+
+ `MariaDB Documentation `__
+
+ `PyMySQL `__
+ The underlying Python library used for authenticated connections
+
+Fingerprinting without credentials
+-----------------------------------
+
+MySQL sends an initial handshake packet immediately after a TCP connection is established, before any authentication takes place. This packet includes the server version string (e.g. ``8.0.32`` or ``10.6.12-MariaDB``).
+
+:meth:`~peat.protocols.mysql.MySQL.read_greeting` reads this packet over a raw TCP socket, allowing PEAT to identify a MySQL/MariaDB server and extract its version without credentials.
+
+Data collected
+--------------
+
+When credentials are available, PEAT can enumerate the following via SQL queries:
+
+- Server version string and parsed version tuple
+- Database names visible to the authenticated user
+- Table names per database
+- Approximate row counts per table (from ``information_schema``)
+- User accounts and their allowed hosts (from ``mysql.user``)
+- Grants for each user (``SHOW GRANTS``)
+- Global system variables (``SHOW GLOBAL VARIABLES``)
+- Active connections and queries (``SHOW FULL PROCESSLIST``)
+
+Configuration
+-------------
+
+Credentials and connection options are specified in the PEAT config file under the ``mysql`` key in ``device_options``:
+
+.. code-block:: yaml
+
+ device_options:
+ mysql:
+ credentials:
+ user: root
+ pass: secret
+ port: 3306
+ timeout: 10
+
+Developer notes
+---------------
+
+The :class:`~peat.protocols.mysql.MySQL` class uses a lazy connection pattern: the underlying PyMySQL connection is not established when the object is created, but on the first call to any query method. This avoids opening TCP connections to hosts that are later filtered or skipped before enumeration begins.
+
+The class is designed as a base class. The :meth:`~peat.protocols.mysql.MySQL.on_connected` and :meth:`~peat.protocols.mysql.MySQL.enumerate` hooks are intended to be overridden in device-specific subclasses. ``on_connected`` runs immediately after authentication and can be used to run setup queries or populate instance attributes. ``enumerate`` should return a dict of any device-specific table data — for example, a subclass targeting a historian might query proprietary tables that have no meaning in a generic MySQL context. The base class implementations are no-ops that return nothing and an empty dict respectively.
diff --git a/docs/tcp_ip_protocols_used_by_peat.csv b/docs/tcp_ip_protocols_used_by_peat.csv
index 44d09a4..0347ce0 100644
--- a/docs/tcp_ip_protocols_used_by_peat.csv
+++ b/docs/tcp_ip_protocols_used_by_peat.csv
@@ -9,6 +9,7 @@
"Modbus/TCP", "502", "TCP", ":class:`~peat.modules.schneider.m340.m340.M340`", ""
"UMAS", "502", "TCP", ":class:`~peat.modules.schneider.m340.m340.M340`", ":term:`UMAS` is a proprietary protocol built on top of Modbus/TCP. Used by Schneider Electric Modicon devices."
"ServLink/TCP", "666, 667", "TCP", ":class:`~peat.modules.woodward.easygen_3500xt.Easygen3500XT`", "ServLink is a Woodward-proprietary protocol for communicating with various devices over either TCP or Serial, including the easYgen 3500XT, 2301E, and potentially other devices. ServLink typically uses port 666, but it may use port 667."
+"MySQL", "3306", "TCP", ":class:`~peat.protocols.mysql.MySQL`", "MySQL/MariaDB initial handshake and query protocol. Used for unauthenticated server fingerprinting via the TCP greeting packet and authenticated enumeration of database server version, schemas, users, and configuration."
"postgres", "5432", "TCP", ":class:`~peat.modules.sel.sel_rtac.SELRTAC`", "PostgreSQL database communication. Commonly known as the 'wire protocol'."
"ION", "7700, 7701, 7702", "TCP", ":class:`~peat.modules.schneider.ion.ion.ION`", "Proprietary protocol for Schneider Electric PowerLogic ION power meters. Used for reading and updating device configuration, firmware updates, and general communications between ION Setup software and a ION meter. ION uses port 7700 by default, but can also use ports 7701 or 7702."
"CIP", "44818", "TCP, UDP", ":class:`~peat.modules.rockwell.controllogix.ControlLogix`", ":term:`CIP` (Common Industrial Protocol) is a Rockwell/Allen-Bradley-developed protocol used by Rockwell devices and others."
diff --git a/examples/peat-config-simple.yaml b/examples/peat-config-simple.yaml
index 7637a88..bb2c33d 100644
--- a/examples/peat-config-simple.yaml
+++ b/examples/peat-config-simple.yaml
@@ -258,6 +258,9 @@ device_options:
timeout: 5.0
servlink_serial:
timeout: 5.0
+ mysql:
+ port: 3306
+ timeout: 5.0
postgres:
port: 5432
timeout: 5.0
diff --git a/examples/peat-config.yaml b/examples/peat-config.yaml
index 5b32807..507cb13 100644
--- a/examples/peat-config.yaml
+++ b/examples/peat-config.yaml
@@ -744,6 +744,13 @@ device_options:
servlink_serial:
timeout: 5.0
+ # MySQL/MariaDB protocol
+ #
+ # "mysql" is not currently used by anyone, but is left here for future compatibility and documentation/reference.
+ mysql:
+ port: 3306
+ timeout: 5.0
+
# PostgreSQL protocol (used for communicating with PostgreSQL databases)
#
# "postgres" is used by the following modules: SELRTAC
diff --git a/newsfragments/59.feature b/newsfragments/59.feature
new file mode 100644
index 0000000..87fa6ab
--- /dev/null
+++ b/newsfragments/59.feature
@@ -0,0 +1 @@
+Add MySQL/MariaDB protocol class (``peat/protocols/mysql.py``) with unauthenticated server fingerprinting via TCP greeting packet, PyMySQL-backed query helpers, and ``on_connected``/``enumerate`` subclass hooks for device-specific extensions.
diff --git a/pdm.lock b/pdm.lock
index 20db3ba..e7a28e1 100644
--- a/pdm.lock
+++ b/pdm.lock
@@ -5,7 +5,7 @@
groups = ["default", "dev", "docs", "exe", "lint", "test"]
strategy = []
lock_version = "4.5.0"
-content_hash = "sha256:8ba4c680cbe5368a6f8da23412190c5467f8ff42199a397f9a3d4979b90ec4f3"
+content_hash = "sha256:4f379386fb19c3e81c88e4da33b12f36f362edbaa7854b0739c1362aa2f19953"
[[metadata.targets]]
requires_python = ">=3.11,<3.14"
@@ -321,19 +321,6 @@ files = [
{file = "check_sdist-1.3.2.tar.gz", hash = "sha256:9faaceca95c03ef9b8edb20db6df631e845d279b2ee6aa97d13a7c3743da7645"},
]
-[[package]]
-name = "click"
-version = "8.3.2"
-requires_python = ">=3.10"
-summary = "Composable command line interface toolkit"
-dependencies = [
- "colorama; platform_system == \"Windows\"",
-]
-files = [
- {file = "click-8.3.2-py3-none-any.whl", hash = "sha256:1924d2c27c5653561cd2cae4548d1406039cb79b858b747cfea24924bbc1616d"},
- {file = "click-8.3.2.tar.gz", hash = "sha256:14162b8b3b3550a7d479eafa77dfd3c38d9dc8951f6f69c78913a8f9a7540fd5"},
-]
-
[[package]]
name = "codespell"
version = "2.4.1"
@@ -1641,6 +1628,16 @@ files = [
{file = "pylint-4.0.4.tar.gz", hash = "sha256:d9b71674e19b1c36d79265b5887bf8e55278cbe236c9e95d22dc82cf044fdbd2"},
]
+[[package]]
+name = "pymysql"
+version = "1.1.2"
+requires_python = ">=3.8"
+summary = "Pure Python MySQL Driver"
+files = [
+ {file = "pymysql-1.1.2-py3-none-any.whl", hash = "sha256:e6b1d89711dd51f8f74b1631fe08f039e7d76cf67a42a323d3178f0f25762ed9"},
+ {file = "pymysql-1.1.2.tar.gz", hash = "sha256:4961d3e165614ae65014e361811a724e2044ad3ea3739de9903ae7c21f539f03"},
+]
+
[[package]]
name = "pynacl"
version = "1.6.2"
@@ -2356,23 +2353,6 @@ files = [
{file = "tomlkit-0.14.0.tar.gz", hash = "sha256:cf00efca415dbd57575befb1f6634c4f42d2d87dbba376128adb42c121b87064"},
]
-[[package]]
-name = "towncrier"
-version = "25.8.0"
-requires_python = ">=3.9"
-summary = "Building newsfiles for your project."
-dependencies = [
- "click",
- "importlib-metadata>=4.6; python_version < \"3.10\"",
- "importlib-resources>=5; python_version < \"3.10\"",
- "jinja2",
- "tomli; python_version < \"3.11\"",
-]
-files = [
- {file = "towncrier-25.8.0-py3-none-any.whl", hash = "sha256:b953d133d98f9aeae9084b56a3563fd2519dfc6ec33f61c9cd2c61ff243fb513"},
- {file = "towncrier-25.8.0.tar.gz", hash = "sha256:eef16d29f831ad57abb3ae32a0565739866219f1ebfbdd297d32894eb9940eb1"},
-]
-
[[package]]
name = "types-pytz"
version = "2025.2.0.20251108"
diff --git a/peat/data/default_options.py b/peat/data/default_options.py
index cc481fd..82cd014 100644
--- a/peat/data/default_options.py
+++ b/peat/data/default_options.py
@@ -39,6 +39,7 @@
"modbus_tcp": {"port": 502, "timeout": config.DEFAULT_TIMEOUT},
"servlink_tcp": {"port": 666, "timeout": config.DEFAULT_TIMEOUT},
"servlink_serial": {"timeout": config.DEFAULT_TIMEOUT},
+ "mysql": {"port": 3306, "timeout": config.DEFAULT_TIMEOUT},
"postgres": {"port": 5432, "timeout": config.DEFAULT_TIMEOUT},
"ion_protocol": {
"port": 7700,
diff --git a/peat/protocols/__init__.py b/peat/protocols/__init__.py
index 3e53e23..5372f23 100644
--- a/peat/protocols/__init__.py
+++ b/peat/protocols/__init__.py
@@ -14,6 +14,7 @@
from .http import HTTP
from .interfaces import *
from .ip import *
+from .mysql import MySQL
from .serial import *
from .snmp import *
from .ssh import SSH
diff --git a/peat/protocols/mysql.py b/peat/protocols/mysql.py
new file mode 100644
index 0000000..d1cbfdc
--- /dev/null
+++ b/peat/protocols/mysql.py
@@ -0,0 +1,311 @@
+"""MySQL/MariaDB connection protocol for PEAT."""
+
+from __future__ import annotations
+
+import socket
+from typing import Any
+
+import pymysql
+import pymysql.cursors
+
+from peat import CommError, log
+
+
+class MySQL:
+ """
+ MySQL/MariaDB connection wrapper.
+
+ A thin wrapper around PyMySQL for connecting to and querying
+ MySQL/MariaDB servers. The underlying connection is established
+ lazily on first use via the :attr:`conn` property.
+
+ Args:
+ ip: Server hostname or IP address.
+ port: MySQL port (default 3306).
+ username: MySQL username.
+ password: MySQL password.
+ database: Default database to connect to (optional).
+ timeout: Connection and query timeout in seconds.
+ """
+
+ def __init__(
+ self,
+ ip: str,
+ port: int = 3306,
+ username: str = "root",
+ password: str = "",
+ database: str = "",
+ timeout: float = 10.0,
+ ) -> None:
+ self.ip = ip
+ self.port = port
+ self.username = username
+ self.password = password
+ self.database = database
+ self.timeout = timeout
+ self._conn = None
+ self.server_info: str = ""
+ self.server_version: tuple[int, ...] = ()
+
+ self.log = log.bind(
+ classname=self.__class__.__name__,
+ target=f"{self.ip}:{self.port}",
+ )
+
+ def __enter__(self) -> "MySQL":
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
+ self.disconnect()
+
+ @staticmethod
+ def read_greeting(ip: str, port: int = 3306, timeout: float = 5.0) -> str | None:
+ """
+ Read the MySQL/MariaDB initial handshake packet over a raw TCP connection.
+
+ MySQL sends a greeting immediately after TCP connect, before any
+ authentication. This allows fingerprinting the server without credentials.
+
+ The packet layout (Protocol v10, used since MySQL 4.1):
+
+ - bytes 0-2: payload length (little-endian uint24)
+ - byte 3: sequence number (``0x00``)
+ - byte 4: protocol version (``0x0a`` = 10)
+ - bytes 5-N: server version string, null-terminated
+
+ Args:
+ ip: Server IP address or hostname.
+ port: MySQL port (default 3306).
+ timeout: Seconds to wait for the greeting.
+
+ Returns:
+ The null-terminated version string (e.g. ``"8.0.32"`` or
+ ``"10.6.12-MariaDB"``), or ``None`` if the host did not respond
+ with a valid MySQL greeting.
+ """
+ try:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ sock.settimeout(timeout)
+ sock.connect((ip, port))
+ data = sock.recv(256)
+ except Exception as exc:
+ log.debug(f"MySQL greeting read failed ({ip}:{port}): {exc}")
+ return None
+
+ if len(data) < 6 or data[4] != 0x0A:
+ return None
+
+ try:
+ null_pos = data.index(b"\x00", 5)
+ return data[5:null_pos].decode("ascii", errors="replace")
+ except ValueError:
+ return None
+
+ @property
+ def connected(self) -> bool:
+ """True if currently connected to a MySQL server."""
+ return self._conn is not None
+
+ @property
+ def conn(self):
+ """
+ PyMySQL connection, established lazily on first access.
+
+ Raises:
+ CommError: If the connection attempt fails.
+ """
+ if self._conn is not None:
+ return self._conn
+
+ try:
+ self._conn = pymysql.connect(
+ host=self.ip,
+ port=self.port,
+ user=self.username,
+ password=self.password,
+ database=self.database or None,
+ connect_timeout=int(self.timeout),
+ read_timeout=int(self.timeout),
+ write_timeout=int(self.timeout),
+ autocommit=True,
+ )
+
+ self.server_info = self._conn.get_server_info()
+
+ version_str = self.server_info.split("-")[0]
+ try:
+ self.server_version = tuple(int(x) for x in version_str.split("."))
+ except ValueError:
+ self.log.trace(f"Could not parse server version from {self.server_info!r}")
+ self.server_version = ()
+
+ self.on_connected()
+
+ except Exception as exc:
+ self._conn = None
+ raise CommError(f"MySQL connect failed ({self.ip}:{self.port}): {exc}") from exc
+
+ return self._conn
+
+ def connect(self) -> bool:
+ """
+ Explicitly establish an authenticated connection to the MySQL server.
+
+ Calling this is optional — the connection is also established
+ automatically on first use of any query method.
+
+ Returns:
+ True if the connection succeeded, False otherwise.
+ """
+ try:
+ _ = self.conn
+ return True
+ except CommError as exc:
+ self.log.debug(str(exc))
+ return False
+
+ def disconnect(self) -> None:
+ """Close the MySQL connection."""
+ if self._conn is not None:
+ try:
+ self._conn.close()
+ except Exception:
+ pass
+
+ self._conn = None
+
+ def on_connected(self) -> None:
+ """
+ Hook called immediately after a successful authenticated connection.
+
+ Override in subclasses to run device-specific setup queries or
+ populate additional instance attributes before enumeration begins.
+ """
+
+ def enumerate(self) -> dict[str, Any]:
+ """
+ Hook for device-specific enumeration queries.
+
+ Override in subclasses to run additional queries and return the
+ results as a dict. The returned dict is merged into the pull result
+ under the key ``"extra_enumeration"``.
+
+ Returns:
+ Dict of additional enumeration results, or empty dict.
+ """
+ return {}
+
+ def query(self, sql: str, args: tuple | None = None) -> list[dict[str, Any]]:
+ """
+ Execute a SQL statement and return all rows as a list of dicts.
+
+ Args:
+ sql: SQL statement to execute.
+ args: Optional tuple of arguments for parameterized queries.
+
+ Returns:
+ List of row dicts, or empty list on error or failed connection.
+ """
+ try:
+ with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
+ cursor.execute(sql, args)
+ return list(cursor.fetchall())
+ except CommError:
+ return []
+ except Exception as exc:
+ self.log.debug(f"MySQL query failed: {exc}")
+ return []
+
+ def get_databases(self) -> list[str]:
+ """
+ Return names of all databases visible to the current user.
+
+ Returns:
+ List of database name strings.
+ """
+ rows = self.query("SHOW DATABASES")
+ return [r["Database"] for r in rows]
+
+ def get_tables(self, database: str) -> list[str]:
+ """
+ Return table names for the given database.
+
+ Args:
+ database: Name of the database to list tables from.
+
+ Returns:
+ List of table name strings.
+ """
+ rows = self.query(f"SHOW TABLES FROM `{database}`")
+ key = f"Tables_in_{database}"
+ return [r[key] for r in rows if key in r]
+
+ def get_table_row_count(self, database: str, table: str) -> int | None:
+ """
+ Return the approximate row count for a table from information_schema.
+
+ Args:
+ database: Database the table belongs to.
+ table: Table name.
+
+ Returns:
+ Approximate row count as an integer, or ``None`` if unavailable.
+ """
+ rows = self.query(
+ "SELECT TABLE_ROWS FROM information_schema.TABLES "
+ "WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s",
+ (database, table),
+ )
+ if rows and rows[0].get("TABLE_ROWS") is not None:
+ return int(rows[0]["TABLE_ROWS"])
+ return None
+
+ def get_users(self) -> list[dict[str, str]]:
+ """
+ Return all MySQL user accounts from mysql.user.
+
+ Returns:
+ List of dicts with ``"user"`` and ``"host"`` keys.
+ """
+ rows = self.query("SELECT User, Host FROM mysql.user ORDER BY User, Host")
+ return [{"user": r["User"], "host": r["Host"]} for r in rows]
+
+ def get_grants(self, user: str, host: str) -> list[str]:
+ """
+ Return SHOW GRANTS output lines for a specific user@host.
+
+ Args:
+ user: MySQL username.
+ host: MySQL host the user connects from.
+
+ Returns:
+ List of GRANT statement strings, or empty list if none found.
+ """
+ rows = self.query(f"SHOW GRANTS FOR '{user}'@'{host}'")
+ if not rows:
+ return []
+
+ key = next(iter(rows[0]))
+ return [r[key] for r in rows]
+
+ def get_global_variables(self, like: str = "%") -> dict[str, str]:
+ """
+ Return global system variables whose names match a LIKE pattern.
+
+ Args:
+ like: SQL LIKE pattern to filter variable names (default ``"%"``).
+
+ Returns:
+ Dict mapping variable name to value.
+ """
+ rows = self.query("SHOW GLOBAL VARIABLES LIKE %s", (like,))
+ return {r["Variable_name"]: r["Value"] for r in rows}
+
+ def get_process_list(self) -> list[dict[str, Any]]:
+ """
+ Return active MySQL connections and queries from SHOW FULL PROCESSLIST.
+
+ Returns:
+ List of process dicts as returned by MySQL.
+ """
+ return self.query("SHOW FULL PROCESSLIST")
diff --git a/pyproject.toml b/pyproject.toml
index 06cab60..71550bd 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -202,6 +202,10 @@ dependencies = [
# NOTE: "psycopg2-binary" provides pre-built binary wheels
"psycopg2-binary==2.9.10",
+ # MySQL/MariaDB database interaction (pure Python, no binary build required)
+ # Required for: MySQL protocol
+ "PyMySQL>=1.1.0",
+
# Library to load kernel modules for PEAT Pillage
"kmodpy~=0.1.13;platform_system=='Linux'",
diff --git a/tests/protocols/test_mysql.py b/tests/protocols/test_mysql.py
new file mode 100644
index 0000000..8399e68
--- /dev/null
+++ b/tests/protocols/test_mysql.py
@@ -0,0 +1,273 @@
+from unittest.mock import MagicMock
+
+import pytest
+
+from peat.protocols.mysql import MySQL
+
+
+def _make_greeting(version: bytes) -> bytes:
+ """Build a minimal MySQL Initial Handshake Packet for the given version string."""
+ payload = b"\x0a" + version + b"\x00" + b"\x00" * 20
+ length = len(payload).to_bytes(3, "little")
+ return length + b"\x00" + payload
+
+
+MYSQL_GREETING = _make_greeting(b"8.0.32")
+MARIADB_GREETING = _make_greeting(b"10.6.12-MariaDB")
+NOT_MYSQL = b"HTTP/1.1 200 OK\r\n\r\n"
+TOO_SHORT = b"\x00\x00"
+
+
+def _mock_socket(data: bytes):
+ sock = MagicMock()
+ sock.recv.return_value = data
+ sock.__enter__ = lambda s: s
+ sock.__exit__ = MagicMock(return_value=False)
+ return sock
+
+
+def _mock_pymysql(mocker, server_info="8.0.32"):
+ mock_conn = MagicMock()
+ mock_conn.get_server_info.return_value = server_info
+ mocker.patch("pymysql.connect", return_value=mock_conn)
+ return mock_conn
+
+
+@pytest.fixture
+def connected_mysql(mocker):
+ def _make(rows=None):
+ mock_cursor = MagicMock()
+ mock_cursor.__enter__ = lambda s: s
+ mock_cursor.__exit__ = MagicMock(return_value=False)
+ mock_cursor.fetchall.return_value = rows or []
+
+ mock_conn = MagicMock()
+ mock_conn.get_server_info.return_value = "8.0.32"
+ mock_conn.cursor.return_value = mock_cursor
+ mocker.patch("pymysql.connect", return_value=mock_conn)
+ mocker.patch("pymysql.cursors.DictCursor", MagicMock())
+
+ m = MySQL("127.0.0.1")
+ m.connect()
+ return m, mock_cursor
+
+ return _make
+
+
+def test_read_greeting_mysql_version_returned(mocker):
+ mocker.patch("socket.socket", return_value=_mock_socket(MYSQL_GREETING))
+ assert MySQL.read_greeting("127.0.0.1") == "8.0.32"
+
+
+def test_read_greeting_mariadb_version_returned(mocker):
+ mocker.patch("socket.socket", return_value=_mock_socket(MARIADB_GREETING))
+ assert MySQL.read_greeting("127.0.0.1") == "10.6.12-MariaDB"
+
+
+def test_read_greeting_non_mysql_response_returns_none(mocker):
+ mocker.patch("socket.socket", return_value=_mock_socket(NOT_MYSQL))
+ assert MySQL.read_greeting("127.0.0.1") is None
+
+
+def test_read_greeting_too_short_returns_none(mocker):
+ mocker.patch("socket.socket", return_value=_mock_socket(TOO_SHORT))
+ assert MySQL.read_greeting("127.0.0.1") is None
+
+
+def test_read_greeting_connection_error_returns_none(mocker):
+ sock = MagicMock()
+ sock.connect.side_effect = TimeoutError("timed out")
+ sock.__enter__ = lambda s: s
+ sock.__exit__ = MagicMock(return_value=False)
+ mocker.patch("socket.socket", return_value=sock)
+ assert MySQL.read_greeting("127.0.0.1", timeout=0.01) is None
+
+
+def test_read_greeting_custom_port_and_timeout_passed(mocker):
+ mock_sock = _mock_socket(MYSQL_GREETING)
+ mocker.patch("socket.socket", return_value=mock_sock)
+ MySQL.read_greeting("10.0.0.1", port=3307, timeout=2.0)
+ mock_sock.settimeout.assert_called_once_with(2.0)
+ mock_sock.connect.assert_called_once_with(("10.0.0.1", 3307))
+
+
+def test_init_defaults():
+ m = MySQL("192.168.1.1")
+ assert m.ip == "192.168.1.1"
+ assert m.port == 3306
+ assert m.username == "root"
+ assert m.password == ""
+ assert m.database == ""
+ assert m.timeout == 10.0
+ assert not m.connected
+ assert m.server_info == ""
+ assert m.server_version == ()
+
+
+def test_init_custom_args():
+ m = MySQL("10.0.0.1", port=3307, username="admin", password="secret", timeout=5.0)
+ assert m.port == 3307
+ assert m.username == "admin"
+ assert m.password == "secret"
+ assert m.timeout == 5.0
+
+
+def test_connect_success(mocker):
+ _mock_pymysql(mocker)
+ m = MySQL("127.0.0.1")
+ assert m.connect()
+ assert m.connected
+ assert m.server_info == "8.0.32"
+ assert m.server_version == (8, 0, 32)
+
+
+def test_connect_parses_mariadb_version(mocker):
+ _mock_pymysql(mocker, server_info="10.6.12-MariaDB")
+ m = MySQL("127.0.0.1")
+ assert m.connect()
+ assert m.server_version == (10, 6, 12)
+
+
+def test_connect_failure_returns_false(mocker):
+ mocker.patch("pymysql.connect", side_effect=Exception("refused"))
+ m = MySQL("127.0.0.1")
+ assert not m.connect()
+ assert not m.connected
+
+
+def test_on_connected_hook_called(mocker):
+ _mock_pymysql(mocker)
+ m = MySQL("127.0.0.1")
+ m.on_connected = MagicMock()
+ m.connect()
+ m.on_connected.assert_called_once()
+
+
+def test_disconnect_clears_connection(mocker):
+ _mock_pymysql(mocker)
+ m = MySQL("127.0.0.1")
+ m.connect()
+ assert m.connected
+ m.disconnect()
+ assert not m.connected
+
+
+def test_disconnect_when_not_connected():
+ m = MySQL("127.0.0.1")
+ m.disconnect()
+ assert not m.connected
+
+
+def test_query_returns_rows(connected_mysql):
+ rows = [{"id": 1}, {"id": 2}]
+ m, _ = connected_mysql(rows)
+ assert m.query("SELECT 1") == rows
+
+
+def test_query_without_connection_returns_empty():
+ m = MySQL("127.0.0.1")
+ assert m.query("SELECT 1") == []
+
+
+def test_get_databases(connected_mysql):
+ rows = [{"Database": "app"}, {"Database": "logs"}]
+ m, _ = connected_mysql(rows)
+ assert m.get_databases() == ["app", "logs"]
+
+
+def test_get_tables(connected_mysql):
+ rows = [{"Tables_in_app": "users"}, {"Tables_in_app": "events"}]
+ m, _ = connected_mysql(rows)
+ assert m.get_tables("app") == ["users", "events"]
+
+
+def test_get_table_row_count(connected_mysql):
+ rows = [{"TABLE_ROWS": 42}]
+ m, _ = connected_mysql(rows)
+ assert m.get_table_row_count("app", "users") == 42
+
+
+def test_get_table_row_count_none(connected_mysql):
+ rows = [{"TABLE_ROWS": None}]
+ m, _ = connected_mysql(rows)
+ assert m.get_table_row_count("app", "users") is None
+
+
+def test_get_users(connected_mysql):
+ rows = [{"User": "root", "Host": "localhost"}, {"User": "app", "Host": "%"}]
+ m, _ = connected_mysql(rows)
+ assert m.get_users() == [
+ {"user": "root", "host": "localhost"},
+ {"user": "app", "host": "%"},
+ ]
+
+
+def test_get_grants(connected_mysql):
+ rows = [{"Grants for root@localhost": "GRANT ALL PRIVILEGES ON *.* TO 'root'@'localhost'"}]
+ m, _ = connected_mysql(rows)
+ result = m.get_grants("root", "localhost")
+ assert len(result) == 1
+ assert "GRANT ALL" in result[0]
+
+
+def test_get_grants_empty(connected_mysql):
+ m, _ = connected_mysql(rows=[])
+ assert m.get_grants("nobody", "localhost") == []
+
+
+def test_get_global_variables(connected_mysql):
+ rows = [{"Variable_name": "version", "Value": "8.0.32"}]
+ m, _ = connected_mysql(rows)
+ assert m.get_global_variables(like="version") == {"version": "8.0.32"}
+
+
+def test_get_process_list(connected_mysql):
+ rows = [{"Id": 1, "User": "root", "Command": "Query"}]
+ m, _ = connected_mysql(rows)
+ assert m.get_process_list() == rows
+
+
+def test_on_connected_default_is_noop():
+ m = MySQL("127.0.0.1")
+ m.on_connected()
+
+
+def test_enumerate_default_returns_empty_dict():
+ m = MySQL("127.0.0.1")
+ assert m.enumerate() == {}
+
+
+def test_subclass_on_connected_called(mocker):
+ called = []
+
+ class CustomMySQL(MySQL):
+ def on_connected(self):
+ called.append(True)
+
+ mock_conn = MagicMock()
+ mock_conn.get_server_info.return_value = "8.0.32"
+ mocker.patch("pymysql.connect", return_value=mock_conn)
+
+ c = CustomMySQL("127.0.0.1")
+ c.connect()
+ assert called == [True]
+
+
+def test_subclass_enumerate_returned(mocker):
+ class CustomMySQL(MySQL):
+ def enumerate(self):
+ return {"custom_table": [{"row": 1}]}
+
+ mock_cursor = MagicMock()
+ mock_cursor.__enter__ = lambda s: s
+ mock_cursor.__exit__ = MagicMock(return_value=False)
+ mock_cursor.fetchall.return_value = []
+ mock_conn = MagicMock()
+ mock_conn.get_server_info.return_value = "8.0.32"
+ mock_conn.cursor.return_value = mock_cursor
+ mocker.patch("pymysql.connect", return_value=mock_conn)
+ mocker.patch("pymysql.cursors.DictCursor", MagicMock())
+
+ c = CustomMySQL("127.0.0.1")
+ c.connect()
+ assert c.enumerate() == {"custom_table": [{"row": 1}]}