diff --git a/distribution/peat.spec b/distribution/peat.spec index 5b5c9ed..bc31112 100644 --- a/distribution/peat.spec +++ b/distribution/peat.spec @@ -69,8 +69,8 @@ hidden_imports = [ excludes = [ - 'matplotlib', 'IPython', 'jedi', 'Cryptodome', - 'zmq', 'pydoc', 'setuptools', 'pycryptodomex', + 'matplotlib', 'IPython', 'jedi', + 'zmq', 'pydoc', 'setuptools', 'pygments', 'traitlets', 'tcl', 'Tkinter', 'Cython', 'cython', 'ipython', 'tox', 'wheel', 'virtualenv', 'requests-toolbelt', 'fire', 'coverage', 'mypy', 'pip', 'PyGObject', diff --git a/distribution/sneakypeat.spec b/distribution/sneakypeat.spec index 0d73ec9..68cb5c6 100644 --- a/distribution/sneakypeat.spec +++ b/distribution/sneakypeat.spec @@ -55,8 +55,8 @@ hidden_imports = [ excludes = [ - 'matplotlib', 'IPython', 'jedi', 'Cryptodome', - 'zmq', 'pydoc', 'setuptools', 'pycryptodomex', + 'matplotlib', 'IPython', 'jedi', + 'zmq', 'pydoc', 'setuptools', 'pygments', 'traitlets', 'tcl', 'Tkinter', 'Cython', 'cython', 'ipython', 'tox', 'wheel', 'virtualenv', 'requests-toolbelt', 'fire', 'coverage', 'mypy', 'pip', 'PyGObject', diff --git a/docs/operate.rst b/docs/operate.rst index 05a67fd..4dcf811 100644 --- a/docs/operate.rst +++ b/docs/operate.rst @@ -9,6 +9,8 @@ PEAT's primary interface is a command-line program with sub-commands for each fu - ``push``: push firmware, logic, or configuration to a device - ``pillage``: search for :term:`OT` device-specific configuration and project files on a host machine - ``heat``: extract and parse device artifacts from network traffic captures (PCAPs) +- ``encrypt-results``: encrypt a PEAT results directory into a password-protected zip archive +- ``decrypt-results``: decrypt a PEAT encrypted results archive into a results directory Basics ====== @@ -28,6 +30,8 @@ Basics peat parse -h peat push -h peat pillage -h + peat encrypt-results --help + peat decrypt-results -h # Examples peat scan --examples @@ -171,6 +175,8 @@ The output directory structure generally looks like this: temp/ ... +Encrypted result archives created with ``peat encrypt-results`` are written separately from the run directory structure shown above. By default, the archive is created in the current working directory as ``encrypted_.zip``. + Viewing the results ------------------- @@ -193,6 +199,48 @@ Examples and helpful commands for inspecting the file results. # Filtering memory and event entries from device results for 192.168.3.200 using 'jq' cat peat_results/example_pull/devices/192.168.3.200/device-data-full.json | jq 'del(.memory,.event)' +Encrypting results +------------------ +PEAT can encrypt a PEAT results run directory into a password-protected zip archive using the ``encrypt-results`` command. This is useful when PEAT results need to be stored or shared more securely after a scan, pull, parse, or other run. + +The command expects the path to a PEAT results directory, such as ``./examples/encryption/example_peat_results/``. By default, the encrypted archive is written to the current working directory and named ``encrypted_.zip``. + +Use ``-w`` (``--write-file``) to choose where the encrypted archive is written. Use ``-p`` (``--password``) to provide the archive password on the command line. If ``-p`` is not provided, PEAT will prompt interactively for a password. + +.. warning:: + PEAT does not save the password for the encrypted archive. If the password is lost, the encrypted results cannot be recovered. + +.. code-block:: bash + + # Encrypt the example PEAT results directory and write the archive + # to the current directory + peat encrypt-results -f ./examples/encryption/example_peat_results + + # Encrypt the example PEAT results directory into examples/encryption/ + peat encrypt-results -f ./examples/encryption/example_peat_results -w ./examples/encryption/ + + # Encrypt a PEAT run directory with an explicit password + peat encrypt-results -f ./examples/encryption/example_peat_results -w ./examples/encryption/ -p example-password + +Decrypting results +------------------ +PEAT can decrypt an archive previously created with ``encrypt-results`` using the ``decrypt-results`` command and the same password that was used when the archive was created. + +The command expects the path to an encrypted PEAT archive, such as ``./examples/encryption/encrypted_example_peat_results.zip``. Use ``-w`` (``--write-file``) to choose where the decrypted results directory is written. Use ``-p`` (``--password``) to provide the archive password on the command line. If ``-p`` is not provided, PEAT will prompt interactively for a password. + +By default, the decrypted directory is written to the current working directory and named after the original run directory. + +.. code-block:: bash + + # Decrypt an encrypted archive into the current directory + peat decrypt-results -f ./examples/encryption/encrypted_example_peat_results.zip + + # Decrypt an encrypted archive into a specific output directory + peat decrypt-results -f ./examples/encryption/encrypted_example_peat_results.zip -w ./restored_results/ + + # Decrypt an encrypted archive with an explicit password + peat decrypt-results -f ./examples/encryption/encrypted_example_peat_results.zip -w ./restored_results/ -p example-password + Device-specific results ----------------------- .. warning:: diff --git a/examples/encryption/encrypted_example_peat_results.zip b/examples/encryption/encrypted_example_peat_results.zip new file mode 100644 index 0000000..6df220c Binary files /dev/null and b/examples/encryption/encrypted_example_peat_results.zip differ diff --git a/examples/encryption/example_peat_results/README.md b/examples/encryption/example_peat_results/README.md new file mode 100644 index 0000000..eb6add1 --- /dev/null +++ b/examples/encryption/example_peat_results/README.md @@ -0,0 +1,2 @@ +This directory contains a minimal example of PEAT-style results for the +``encrypt-results`` and ``decrypt-results`` documentation examples. diff --git a/examples/encryption/example_peat_results/devices/192.0.2.10/device-data-summary.json b/examples/encryption/example_peat_results/devices/192.0.2.10/device-data-summary.json new file mode 100644 index 0000000..1c1a486 --- /dev/null +++ b/examples/encryption/example_peat_results/devices/192.0.2.10/device-data-summary.json @@ -0,0 +1,6 @@ +{ + "ip": "192.0.2.10", + "device_type": "example-device", + "hostname": "example-plc", + "pull_success": true +} diff --git a/examples/encryption/example_peat_results/logs/peat.log b/examples/encryption/example_peat_results/logs/peat.log new file mode 100644 index 0000000..aa025dc --- /dev/null +++ b/examples/encryption/example_peat_results/logs/peat.log @@ -0,0 +1 @@ +INFO example PEAT run log for encryption documentation diff --git a/examples/encryption/example_peat_results/summaries/pull-summary.json b/examples/encryption/example_peat_results/summaries/pull-summary.json new file mode 100644 index 0000000..55a8b5c --- /dev/null +++ b/examples/encryption/example_peat_results/summaries/pull-summary.json @@ -0,0 +1,11 @@ +{ + "command": "pull", + "device_count": 1, + "devices": [ + { + "ip": "192.0.2.10", + "device_type": "example-device", + "pull_success": true + } + ] +} diff --git a/pdm.lock b/pdm.lock index 20db3ba..c353bdf 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev", "docs", "exe", "lint", "test"] strategy = [] lock_version = "4.5.0" -content_hash = "sha256:8ba4c680cbe5368a6f8da23412190c5467f8ff42199a397f9a3d4979b90ec4f3" +content_hash = "sha256:6c5927fe258faad045b9a3fba4988a063f9c78f34827e501f3c4db21d472644b" [[metadata.targets]] requires_python = ">=3.11,<3.14" @@ -321,19 +321,6 @@ files = [ {file = "check_sdist-1.3.2.tar.gz", hash = "sha256:9faaceca95c03ef9b8edb20db6df631e845d279b2ee6aa97d13a7c3743da7645"}, ] -[[package]] -name = "click" -version = "8.3.2" -requires_python = ">=3.10" -summary = "Composable command line interface toolkit" -dependencies = [ - "colorama; platform_system == \"Windows\"", -] -files = [ - {file = "click-8.3.2-py3-none-any.whl", hash = "sha256:1924d2c27c5653561cd2cae4548d1406039cb79b858b747cfea24924bbc1616d"}, - {file = "click-8.3.2.tar.gz", hash = "sha256:14162b8b3b3550a7d479eafa77dfd3c38d9dc8951f6f69c78913a8f9a7540fd5"}, -] - [[package]] name = "codespell" version = "2.4.1" @@ -1520,6 +1507,37 @@ files = [ {file = "pycparser-3.0.tar.gz", hash = "sha256:600f49d217304a5902ac3c37e1281c9fe94e4d0489de643a9504c5cdfdfc6b29"}, ] +[[package]] +name = "pycryptodomex" +version = "3.23.0" +requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +summary = "Cryptographic library for Python" +files = [ + {file = "pycryptodomex-3.23.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:7b37e08e3871efe2187bc1fd9320cc81d87caf19816c648f24443483005ff886"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:91979028227543010d7b2ba2471cf1d1e398b3f183cb105ac584df0c36dac28d"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b8962204c47464d5c1c4038abeadd4514a133b28748bcd9fa5b6d62e3cec6fa"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a33986a0066860f7fcf7c7bd2bc804fa90e434183645595ae7b33d01f3c91ed8"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c7947ab8d589e3178da3d7cdeabe14f841b391e17046954f2fbcd941705762b5"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:c25e30a20e1b426e1f0fa00131c516f16e474204eee1139d1603e132acffc314"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:da4fa650cef02db88c2b98acc5434461e027dce0ae8c22dd5a69013eaf510006"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:58b851b9effd0d072d4ca2e4542bf2a4abcf13c82a29fd2c93ce27ee2a2e9462"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-win32.whl", hash = "sha256:a9d446e844f08299236780f2efa9898c818fe7e02f17263866b8550c7d5fb328"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-win_amd64.whl", hash = "sha256:bc65bdd9fc8de7a35a74cab1c898cab391a4add33a8fe740bda00f5976ca4708"}, + {file = "pycryptodomex-3.23.0-cp313-cp313t-win_arm64.whl", hash = "sha256:c885da45e70139464f082018ac527fdaad26f1657a99ee13eecdce0f0ca24ab4"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:06698f957fe1ab229a99ba2defeeae1c09af185baa909a31a5d1f9d42b1aaed6"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-macosx_10_9_x86_64.whl", hash = "sha256:b2c2537863eccef2d41061e82a881dcabb04944c5c06c5aa7110b577cc487545"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:43c446e2ba8df8889e0e16f02211c25b4934898384c1ec1ec04d7889c0333587"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f489c4765093fb60e2edafdf223397bc716491b2b69fe74367b70d6999257a5c"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bdc69d0d3d989a1029df0eed67cc5e8e5d968f3724f4519bd03e0ec68df7543c"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6bbcb1dd0f646484939e142462d9e532482bc74475cecf9c4903d4e1cd21f003"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_i686.whl", hash = "sha256:8a4fcd42ccb04c31268d1efeecfccfd1249612b4de6374205376b8f280321744"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:55ccbe27f049743a4caf4f4221b166560d3438d0b1e5ab929e07ae1702a4d6fd"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-win32.whl", hash = "sha256:189afbc87f0b9f158386bf051f720e20fa6145975f1e76369303d0f31d1a8d7c"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-win_amd64.whl", hash = "sha256:52e5ca58c3a0b0bd5e100a9fbc8015059b05cffc6c66ce9d98b4b45e023443b9"}, + {file = "pycryptodomex-3.23.0-cp37-abi3-win_arm64.whl", hash = "sha256:02d87b80778c171445d67e23d1caef279bf4b25c3597050ccd2e13970b57fd51"}, + {file = "pycryptodomex-3.23.0.tar.gz", hash = "sha256:71909758f010c82bc99b0abf4ea12012c98962fbf0583c2164f8b84533c2e4da"}, +] + [[package]] name = "pydantic" version = "1.10.22" @@ -1928,6 +1946,19 @@ files = [ {file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"}, ] +[[package]] +name = "pyzipper" +version = "0.4.0" +requires_python = ">=3.4" +summary = "AES encryption for zipfile." +dependencies = [ + "pycryptodomex", +] +files = [ + {file = "pyzipper-0.4.0-py3-none-any.whl", hash = "sha256:aa7b8a0fe741d67aac36ead85f6e735af107b72f84e0775f2ed565fc0d3a2f02"}, + {file = "pyzipper-0.4.0.tar.gz", hash = "sha256:a4b96afcac04c5589d5abdc6158dd362166374e3cc6810aa441e65f8a17cb9e3"}, +] + [[package]] name = "requests" version = "2.29.0" @@ -2356,23 +2387,6 @@ files = [ {file = "tomlkit-0.14.0.tar.gz", hash = "sha256:cf00efca415dbd57575befb1f6634c4f42d2d87dbba376128adb42c121b87064"}, ] -[[package]] -name = "towncrier" -version = "25.8.0" -requires_python = ">=3.9" -summary = "Building newsfiles for your project." -dependencies = [ - "click", - "importlib-metadata>=4.6; python_version < \"3.10\"", - "importlib-resources>=5; python_version < \"3.10\"", - "jinja2", - "tomli; python_version < \"3.11\"", -] -files = [ - {file = "towncrier-25.8.0-py3-none-any.whl", hash = "sha256:b953d133d98f9aeae9084b56a3563fd2519dfc6ec33f61c9cd2c61ff243fb513"}, - {file = "towncrier-25.8.0.tar.gz", hash = "sha256:eef16d29f831ad57abb3ae32a0565739866219f1ebfbdd297d32894eb9940eb1"}, -] - [[package]] name = "types-pytz" version = "2025.2.0.20251108" diff --git a/peat/__init__.py b/peat/__init__.py index 9d43d70..54f10a8 100644 --- a/peat/__init__.py +++ b/peat/__init__.py @@ -110,4 +110,9 @@ def emit(self, record: logging.LogRecord) -> None: from .api.push_api import push from .api.heat_api import heat_main from .api.config_builder_api import generate_simple_config, generate_full_config -from .api.crypto_api import encrypt, decrypt +from .api.crypto_api import ( + encrypt_config_api, + decrypt_config_api, + encrypt_results_api, + decrypt_results_api, +) diff --git a/peat/api/crypto_api.py b/peat/api/crypto_api.py index 88c082d..c879fdc 100644 --- a/peat/api/crypto_api.py +++ b/peat/api/crypto_api.py @@ -2,10 +2,10 @@ import yaml -from peat import config_crypto, log +from peat import config_crypto, log, results_crypto -def encrypt(config_path: str, user_password: str | None = None) -> bool: +def encrypt_config_api(config_path: str, user_password: str | None = None) -> bool: """ PEAT CLI functionality to encrypt a file @@ -14,6 +14,10 @@ def encrypt(config_path: str, user_password: str | None = None) -> bool: user_password: (Optional) password for encryption specified by CLI, defaults to None. If none is given by CLI command, user will be asked to input one """ + if config_path is None: + log.error("No config specified") + return False + fp = Path(config_path) result = config_crypto.encrypt_config(fp, user_password) if result: @@ -23,7 +27,7 @@ def encrypt(config_path: str, user_password: str | None = None) -> bool: return False -def decrypt( +def decrypt_config_api( config_path: str, output_path: str | None = None, new_filename: str | None = "decrypted_config.yaml", @@ -41,6 +45,10 @@ def decrypt( user_password: (Optional) password for encryption specified by CLI, defaults to None. If none is given by CLI command, user will be asked to input one """ + if config_path is None: + log.error("No config specified") + return False + fp = Path(config_path) decrypted_str = config_crypto.decrypt_config(fp, user_password=user_password) if not decrypted_str: @@ -63,3 +71,57 @@ def decrypt( yaml.dump(yaml_data, file, default_flow_style=False, sort_keys=False) log.info(f"Encrypted config saved to current directory as {new_filename}") return True + + +def encrypt_results_api( + results_dir_path: str, write_path: str | None = None, user_password: str | None = None +) -> bool: + """ + API for CLI -> Encrypt results function + + Args: + results_dir_path: PEAT results directory + write_path: Path to write encrypted archive + user_password: password to encrypt archive with + Returns: + bool + """ + if results_dir_path is None: + log.error("No directory specified") + return False + + results_dir_path = Path(results_dir_path) + + if write_path is None: + write_path = Path("./") + else: + write_path = Path(write_path) + + return results_crypto.zip_encrypt_results(results_dir_path, write_path, user_password) + + +def decrypt_results_api( + encrypted_dir_path: str, write_path: str | None = None, user_password: str | None = None +) -> bool: + """ + API for CLI -> Decrypt archive function + + Args: + encrypted_dir_path: Encrypted zip archive path + write_path: Path to write extracted PEAT results + user_password: password to decrypt archive with + Returns: + bool + """ + if encrypted_dir_path is None: + log.error("No archive specified") + return False + + encrypted_dir_path = Path(encrypted_dir_path) + + if write_path is None: + write_path = Path("./") + else: + write_path = Path(write_path) + + return results_crypto.unzip_decrypt_results(encrypted_dir_path, write_path, user_password) diff --git a/peat/cli_args.py b/peat/cli_args.py index 72cf588..3110012 100644 --- a/peat/cli_args.py +++ b/peat/cli_args.py @@ -511,42 +511,81 @@ def build_argument_parser(version: str = "0.0.0") -> argparse.ArgumentParser: ) config_builder_parser.set_defaults(func="config-builder") - # Encrypt command - encrypt_description = ( + # Encrypt config command + encrypt_config_description = ( "Encrypt a config file using PEAT's built in encryption capability. Must specify the file path to the config file using the -f flag. " "The encrypted file will be saved to the same directory as the original unencrypted config. " "The new file will be named the same as the unencrypted file, but will have 'encrypted_' added to the beginning of the filename. " "WARNING: PEAT will not save the encrypted file's password for you, it is up to you to remember it" ) - encrypt_parser = subparsers.add_parser( - name="encrypt", + encrypt_config_parser = subparsers.add_parser( + name="encrypt-config", + aliases=["encrypt"], # for legacy users # help: displayed next to the sub-command when running "peat --help" - help=encrypt_description, + help=encrypt_config_description, # description: displayed before the arguments when running "peat --help" - description=encrypt_description, + description=encrypt_config_description, ) - encrypt_parser.set_defaults(func="encrypt") + encrypt_config_parser.set_defaults(func="encrypt_config") - decrypt_description = ( + decrypt_config_description = ( "Decrypt a config file using PEAT's built in decryption capability. Must specify the file path to the config file. " "The decrypted file will be saved to the same directory as the original encrypted config. " "IMPORTANT: PEAT will only decrypt configs that have previously been encrypted by PEAT, and upon receiving the correct password" ) - decrypt_parser = subparsers.add_parser( - name="decrypt", + decrypt_config_parser = subparsers.add_parser( + name="decrypt-config", + aliases=["decrypt"], # help: displayed next to the sub-command when running "peat --help" - help=decrypt_description, + help=decrypt_config_description, # description: displayed before the arguments when running "peat --help" - description=decrypt_description, + description=decrypt_config_description, ) - decrypt_parser.set_defaults(func="decrypt") + decrypt_config_parser.set_defaults(func="decrypt_config") + + # Encrypt results command + encrypt_results_description = ( + "Encrypt the results directory using PEAT's built in encryption capability. Must specify the directory path if not using the default location using -f flag. " + "The encrypted zip file will be saved to the same directory you are running from" + "The new file will be named the same as the unencrypted file, but will have 'encrypted_' added to the beginning of the filename. " + "WARNING: PEAT will not save the encrypted archive's password for you, it is up to you to remember it" + ) + encrypt_results_parser = subparsers.add_parser( + name="encrypt-results", + # help: displayed next to the sub-command when running "peat --help" + help=encrypt_results_description, + # description: displayed before the arguments when running "peat --help" + description=encrypt_results_description, + ) + encrypt_results_parser.set_defaults(func="encrypt_results") + + decrypt_results_description = ( + "Decrypt the results zip file using PEAT's built in decryption capability. Must specify the file path to the zip file " + "The decrypted directory will be saved to the directory you run from" + "IMPORTANT: PEAT will only decrypt zips that have previously been encrypted by PEAT, and upon receiving the correct password" + ) + decrypt_results_parser = subparsers.add_parser( + name="decrypt-results", + # help: displayed next to the sub-command when running "peat --help" + help=decrypt_results_description, + # description: displayed before the arguments when running "peat --help" + description=decrypt_results_description, + ) + decrypt_results_parser.set_defaults(func="decrypt_results") # Add arguments that we want specified after a command to all subparsers. # This is where any "general" peat arguments go (e.g "verbose"). # NOTE: We add these before the command-specific # arguments to order the usage list properly. # NOTE: "default=None" means "use the default in peat.config or elsewhere" + seen_subp = ( + set() + ) # helps resolve conflict issues when parsers are added twice due to alias, etc. for _, subp in subparsers.choices.items(): + if id(subp) in seen_subp: + continue + seen_subp.add(id(subp)) + group = subp.add_argument_group("general arguments") group.add_argument( "-c", @@ -1046,8 +1085,8 @@ def build_argument_parser(version: str = "0.0.0") -> argparse.ArgumentParser: ) add_list_module_args(pillage_parser) # Hack to add "--list-*" commands - # Encrypt command - encrypt_parser.add_argument( + # Encrypt Config command + encrypt_config_parser.add_argument( "-f", "--file-path", type=str, @@ -1057,7 +1096,7 @@ def build_argument_parser(version: str = "0.0.0") -> argparse.ArgumentParser: help="File path for config file to encrypt", ) - encrypt_parser.add_argument( + encrypt_config_parser.add_argument( "-p", "--password", type=str, @@ -1066,9 +1105,9 @@ def build_argument_parser(version: str = "0.0.0") -> argparse.ArgumentParser: dest="user-password", help="Specify password to use to encrypt/decrypt file", ) - add_list_module_args(encrypt_parser) # Hack to add "--list-*" commands + add_list_module_args(encrypt_config_parser) # Hack to add "--list-*" commands - decrypt_parser.add_argument( + decrypt_config_parser.add_argument( "-f", "--file-path", type=str, @@ -1077,7 +1116,7 @@ def build_argument_parser(version: str = "0.0.0") -> argparse.ArgumentParser: dest="filepath", help="File path for config file to decrypt", ) - decrypt_parser.add_argument( + decrypt_config_parser.add_argument( "-w", "--write-file", type=str, @@ -1087,7 +1126,7 @@ def build_argument_parser(version: str = "0.0.0") -> argparse.ArgumentParser: help="File path to save decrypted file to", ) - decrypt_parser.add_argument( + decrypt_config_parser.add_argument( "-p", "--password", type=str, @@ -1096,7 +1135,66 @@ def build_argument_parser(version: str = "0.0.0") -> argparse.ArgumentParser: dest="user-password", help="Specify password to use to encrypt/decrypt file", ) - add_list_module_args(decrypt_parser) # Hack to add "--list-*" commands + add_list_module_args(decrypt_config_parser) # Hack to add "--list-*" commands + + # Encrypt Results + encrypt_results_parser.add_argument( + "-f", + "--file-path", + type=str, + metavar="PATH", + default=None, + dest="filepath", + help="File path for results directory to encrypt", + ) + encrypt_results_parser.add_argument( + "-w", + "--write-file", + type=str, + metavar="DIR", + default=None, + dest="write-path", + help="File path to save encrypted archive to", + ) + encrypt_results_parser.add_argument( + "-p", + "--password", + type=str, + metavar="USER_PASS", + default=None, + dest="user-password", + help="Specify password to use to encrypt/decrypt zip", + ) + add_list_module_args(encrypt_results_parser) # Hack to add "--list-*" commands + + decrypt_results_parser.add_argument( + "-f", + "--file-path", + type=str, + metavar="PATH", + default=None, + dest="filepath", + help="File path for encrypted results zip", + ) + decrypt_results_parser.add_argument( + "-w", + "--write-file", + type=str, + metavar="DIR", + default=None, + dest="write-path", + help="File path to save decrypted zip to", + ) + decrypt_results_parser.add_argument( + "-p", + "--password", + type=str, + metavar="USER_PASS", + default=None, + dest="user-password", + help="Specify password to use to encrypt/decrypt zip", + ) + add_list_module_args(decrypt_results_parser) # Hack to add "--list-*" commands return parser diff --git a/peat/cli_main.py b/peat/cli_main.py index f0e5196..948acb1 100644 --- a/peat/cli_main.py +++ b/peat/cli_main.py @@ -15,8 +15,10 @@ config, consts, datastore, - decrypt, - encrypt, + decrypt_config_api, + decrypt_results_api, + encrypt_config_api, + encrypt_results_api, exit_handler, heat_main, initialize_peat, @@ -183,8 +185,8 @@ def oneshot_main(args: dict[str, Any]) -> bool: if args["func"] == "heat": return heat_main() - if args["func"] == "encrypt": - result = encrypt(args["filepath"], args["user-password"]) + if args["func"] == "encrypt_config": + result = encrypt_config_api(args["filepath"], args["user-password"]) if result: log.info("Done encrypting file, exiting...") sys.exit(0) @@ -192,8 +194,8 @@ def oneshot_main(args: dict[str, Any]) -> bool: log.critical("Error encountered while encrypting file, exiting...") sys.exit(1) - if args["func"] == "decrypt": - result = decrypt( + if args["func"] == "decrypt_config": + result = decrypt_config_api( config_path=args["filepath"], output_path=args["write-path"], user_password=args["user-password"], @@ -205,6 +207,30 @@ def oneshot_main(args: dict[str, Any]) -> bool: log.critical("Error encountered while decrypting file, exiting...") sys.exit(1) + if args["func"] == "encrypt_results": + filepath = args["filepath"] + writepath = args["write-path"] + user_password = args["user-password"] + result = encrypt_results_api(filepath, writepath, user_password) + if result: + log.info("Done encrypting results, exiting...") + sys.exit(0) + else: + log.critical("Error encountered while encrypting results, exiting...") + sys.exit(1) + + if args["func"] == "decrypt_results": + filepath = args["filepath"] + writepath = args["write-path"] + user_password = args["user-password"] + result = decrypt_results_api(filepath, writepath, user_password) + if result: + log.info("Done decrypting results archive, exiting...") + sys.exit(0) + else: + log.critical("Error encountered while decrypting results archive, exiting...") + sys.exit(1) + targets = [] # type: list[str] device_types = set() # type: set[str] diff --git a/peat/results_crypto.py b/peat/results_crypto.py new file mode 100644 index 0000000..68c3240 --- /dev/null +++ b/peat/results_crypto.py @@ -0,0 +1,108 @@ +""" +Functionality for encrypting/decrypting the results directory +""" + +import getpass +import os +from pathlib import Path + +import pyzipper + +from peat import log + + +def zip_encrypt_results( + results_dir_path: Path, write_path: Path = Path("./"), user_password: str | None = None +) -> bool: + """ + zip and encrypted the target 'peat_results' directory + + Args: + results_dir_path: target peat results directory + write_path: where to write archive + user_password: password to encrypt the zip + + Returns: + bool + """ + + if not results_dir_path.is_dir(): + log.error(f"directory path {results_dir_path} does not exist") + return False + + results_dir_path = results_dir_path.resolve() + write_path = write_path.resolve() + + zip_name = write_path / f"encrypted_{results_dir_path.name}.zip" + + try: + # pyzipper needs a password + if user_password is None: + user_password = getpass.getpass( + "WARNING: PEAT will not save the encrypted archive's password for you," + "it is up to you to remember it\n" + "Please provide a password for the archive: " + ) + + log.info("Zippping and Encrypting Results...") + with pyzipper.AESZipFile( + zip_name, + "w", + compression=pyzipper.ZIP_DEFLATED, + encryption=pyzipper.WZ_AES, + ) as zf: + zf.setpassword(user_password.encode("utf-8")) + + for dirpath, _, filenames in os.walk(results_dir_path): + dirpath = Path(dirpath) + for filename in filenames: + full_path = dirpath / filename + arcname = full_path.relative_to(results_dir_path) + zf.write(full_path, arcname=str(arcname)) + log.info(f"encrypted results written: {zip_name}") + return True + + except Exception as e: + log.error(f"Issue zipping and encrypting results: {e}") + return False + + +def unzip_decrypt_results( + encrypted_dir_path: Path, write_path: Path = Path("./"), user_password: str | None = None +) -> bool: + """ + Decrypt target archive with provided password and write to target path + + Args: + encrypted_dir_path: target encrypted archive + write_path: where to write extracted dir + user_password: password to decrypt the zip + + Returns: + bool + """ + + if not pyzipper.is_zipfile(encrypted_dir_path): + log.error(f"archive path {encrypted_dir_path} does not exist") + return False + + write_path = write_path.resolve() + encrypted_dir_path = encrypted_dir_path.resolve() + + extract_dir = write_path / encrypted_dir_path.stem.replace("encrypted_", "", 1) + + try: + if user_password is None: + user_password = getpass.getpass("Please provide a password for the archive: ") + + log.info("Unzipping and Decrypting Results...") + with pyzipper.AESZipFile(encrypted_dir_path, "r") as zf: + zf.setpassword(user_password.encode("utf-8")) + zf.extractall(path=extract_dir) + + log.info(f"decrypted results written: {write_path}") + return True + + except Exception as e: + log.error(f"Issue unzipping and decrypting results: {e}") + return False diff --git a/pyproject.toml b/pyproject.toml index 06cab60..c6fe26d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -223,6 +223,7 @@ dependencies = [ # Required for: Fortigate "scp>=0.15.0", "textual>=6.1.0", + "pyzipper>=0.4.0", ] [project.scripts] diff --git a/tests/conftest.py b/tests/conftest.py index 46b79e8..508a1c2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,7 +20,7 @@ from deepdiff import DeepDiff from elasticsearch import Elasticsearch -from peat import config, exit_handler +from peat import config, exit_handler, results_crypto # NOTE: we can't have arguments before subcommands anymore PEAT_CMD = [sys.executable, "-m", "peat"] @@ -488,3 +488,20 @@ def _assert_meta_files(run_dir=None) -> None: assert_glob_path(run_dir / "peat_metadata", "peat_state.yaml") return _assert_meta_files + + +@pytest.fixture +def mock_peat_results(tmp_path) -> Path: + mock_peat_results: Path = tmp_path / "mock_peat_results" + mock_peat_results.mkdir() + (mock_peat_results / "file1.txt").write_text("hello") + (mock_peat_results / "file2.txt").write_text("world") + + return mock_peat_results + + +@pytest.fixture +def mock_encrypted_peat_results(mock_peat_results, tmp_path) -> Path: + encrypted_zip_path = tmp_path / "encrypted_mock_peat_results.zip" + results_crypto.zip_encrypt_results(mock_peat_results, tmp_path, "testpass") + return encrypted_zip_path diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index d33ee3b..978cdcf 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -319,7 +319,7 @@ def test_cli_scan_localhost_subnet(run_peat, tmp_path, assert_meta_files): @pytest.mark.slow -def test_cli_encryption(run_peat, tmp_path, examples_dir, read_text, datapath): +def test_cli_config_encryption(run_peat, tmp_path, examples_dir, read_text, datapath): testing_path = tmp_path / "config" testing_path.mkdir() path_to_config = f"{testing_path.as_posix()}/example_config.yaml" @@ -341,7 +341,7 @@ def test_cli_encryption(run_peat, tmp_path, examples_dir, read_text, datapath): @pytest.mark.slow -def test_cli_decryption(run_peat, tmp_path, examples_dir, read_text, datapath): +def test_cli_config_decryption(run_peat, tmp_path, examples_dir, read_text, datapath): testing_path = tmp_path / "config" testing_path.mkdir() # create a path to the encrypted file in tmp_path we'll attempt to decrypt @@ -364,3 +364,35 @@ def test_cli_decryption(run_peat, tmp_path, examples_dir, read_text, datapath): run_peat(args)[0] assert read_text(datapath(f"{testing_path.as_posix()}/decrypted_config.yaml")) + + +@pytest.mark.slow +def test_cli_results_encryption(run_peat, tmp_path, mock_peat_results): + args = [ + "encrypt-results", + "-f", + mock_peat_results, + "-p", + "secret", + "-w", + tmp_path, + ] + run_peat(args)[0] + + assert tmp_path / "encrypted_mock_peat_results.zip" + + +@pytest.mark.slow +def test_cli_results_decryption(run_peat, tmp_path, mock_encrypted_peat_results): + args = [ + "decrypt-results", + "-f", + mock_encrypted_peat_results, + "-p", + "testpass", + "-w", + tmp_path / "unencrypted", + ] + run_peat(args)[0] + + assert tmp_path / "unencrypted" / "mock_peat_results.zip" diff --git a/tests/test_results_crypto.py b/tests/test_results_crypto.py new file mode 100644 index 0000000..6a77b71 --- /dev/null +++ b/tests/test_results_crypto.py @@ -0,0 +1,127 @@ +import zipfile +from unittest.mock import MagicMock + +import pytest + +from peat import results_crypto + + +def test_mock_peat_results_exists(mock_peat_results): + assert mock_peat_results.is_dir() + + +def test_zip_encrypt_missing_dir(tmp_path): + missing_dir = tmp_path / "does_not_exist" + + result = results_crypto.zip_encrypt_results(missing_dir) + assert result is False + + +def test_zip_encrypt_wrong_target(tmp_path): + wrong_file = tmp_path / "wrong.txt" + wrong_file.write_text("hello there") + + result = results_crypto.zip_encrypt_results(wrong_file) + assert result is False + + +def test_zip_encrypt_success(monkeypatch, mock_peat_results, tmp_path): + mock_zip_class = MagicMock() + # need to return the mock zip in context manager to check for password + mock_zip = mock_zip_class.return_value.__enter__.return_value + + # patch the pyzipper AESZipFile function and return a mock zip class + monkeypatch.setattr(results_crypto.pyzipper, "AESZipFile", mock_zip_class) + + # patch the os walk to return our mock files + monkeypatch.setattr( + results_crypto.os, + "walk", + lambda _path: [(mock_peat_results, [], ["file1.txt", "file2.txt"])], + ) + + result = results_crypto.zip_encrypt_results(mock_peat_results, tmp_path, "secret") + assert result is True + mock_zip.setpassword.assert_called_once_with(b"secret") + assert mock_zip.write.call_count == 2 + + +def test_zip_encrypt_pwd_prompt(monkeypatch, mock_peat_results, tmp_path): + mock_zip_class = MagicMock() + # need to return the mock zip in context manager to check for password + mock_zip = mock_zip_class.return_value.__enter__.return_value + + # patch the pyzipper AESZipFile function and return a mock zip class + monkeypatch.setattr(results_crypto.pyzipper, "AESZipFile", mock_zip_class) + + # patch the os walk to return our mock files + monkeypatch.setattr(results_crypto.os, "walk", lambda _path: []) + # patch getpass to simulate a prompt + monkeypatch.setattr(results_crypto.getpass, "getpass", lambda _prompt: "typed_pw") + + result = results_crypto.zip_encrypt_results(mock_peat_results, tmp_path, None) + assert result is True + mock_zip.setpassword.assert_called_once_with(b"typed_pw") + + +def test_unzip_decrypt_success(monkeypatch, tmp_path): + encrypted_file = tmp_path / "encrypted_peat_results.zip" + encrypted_file.write_text("fake zip content") + + mock_zip_class = MagicMock() + # need to return the mock zip in context manager to check for password + mock_zip = mock_zip_class.return_value.__enter__.return_value + + # patch check for zipfile + monkeypatch.setattr(results_crypto.pyzipper, "is_zipfile", lambda _is_dir: True) + # patch the pyzipper AESZipFile function and return a mock zip class + monkeypatch.setattr(results_crypto.pyzipper, "AESZipFile", mock_zip_class) + + result = results_crypto.unzip_decrypt_results( + encrypted_dir_path=encrypted_file, + write_path=tmp_path, + user_password="secret", + ) + + assert result is True + mock_zip.setpassword.assert_called_once_with(b"secret") + mock_zip.extractall.assert_called_once() + + +def test_unzip_decrypt_requires_pwd(monkeypatch, tmp_path): + encrypted_file = tmp_path / "encrypted_peat_results.zip" + encrypted_file.write_text("fake zip content") + + mock_zip_class = MagicMock() + # need to return the mock zip in context manager to check for password + mock_zip = mock_zip_class.return_value.__enter__.return_value + + monkeypatch.setattr(results_crypto.pyzipper, "is_zipfile", lambda _is_dir: True) + # patch the pyzipper AESZipFile function and return a mock zip class + monkeypatch.setattr(results_crypto.pyzipper, "AESZipFile", mock_zip_class) + + # patch getpass to simulate a prompt + monkeypatch.setattr(results_crypto.getpass, "getpass", lambda _prompt: "typed_pw") + + result = results_crypto.unzip_decrypt_results(encrypted_file, tmp_path, None) + assert result is True + mock_zip.setpassword.assert_called_once_with(b"typed_pw") + + +def test_encrypted_zip_cannot_be_opened_wrong_pwd(mock_encrypted_peat_results, tmp_path): + result = results_crypto.unzip_decrypt_results( + encrypted_dir_path=mock_encrypted_peat_results, + write_path=tmp_path, + user_password="wrongpass", + ) + + assert result is False + + +def test_encrypted_zip_cannot_be_opened_with_zip(mock_encrypted_peat_results): + with zipfile.ZipFile(mock_encrypted_peat_results, "r") as zf: + # archive is readable, but file contents are encrypted + assert set(zf.namelist()) == {"file1.txt", "file2.txt"} + + with pytest.raises(RuntimeError, match="encrypted, password required for extraction"): + zf.read("file1.txt")