diff --git a/src/ploigos_step_runner/step_implementers/shared/__init__.py b/src/ploigos_step_runner/step_implementers/shared/__init__.py index 97ccfc45..d1b1552a 100644 --- a/src/ploigos_step_runner/step_implementers/shared/__init__.py +++ b/src/ploigos_step_runner/step_implementers/shared/__init__.py @@ -1,6 +1,7 @@ """StepImplementer parent classes that are shared accross multiple steps. """ +from ploigos_step_runner.step_implementers.shared.ad_hoc import AdHoc from ploigos_step_runner.step_implementers.shared.argocd_generic import \ ArgoCDGeneric from ploigos_step_runner.step_implementers.shared.container_deploy_mixin import \ diff --git a/src/ploigos_step_runner/step_implementers/shared/ad_hoc.py b/src/ploigos_step_runner/step_implementers/shared/ad_hoc.py new file mode 100644 index 00000000..c7c8d861 --- /dev/null +++ b/src/ploigos_step_runner/step_implementers/shared/ad_hoc.py @@ -0,0 +1,125 @@ +"""`StepImplementer` to run an `ad-hoc` bash script/command. + +Step Configuration +------------------ +Step configuration expected as input to this step. +Could come from: + + * static configuration + * runtime configuration + * previous step results + +Configuration Key | Required? | Default | Description +------------------------------|-----------|--------------------------|----------- +`command` | Yes | | Command to execute + +Result Artifacts +---------------- +Results artifacts output by this step. + +Result Artifact Key | Description +-----------------------|------------ +`command-output` | stdout and stderr from the command run +"""# pylint: disable=line-too-long + +from ploigos_step_runner.exceptions import StepRunnerException +from ploigos_step_runner.results import StepResult +from ploigos_step_runner.step_implementer import StepImplementer +from ploigos_step_runner.utils.shell import Shell + +DEFAULT_CONFIG = {} + +REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS = [ + 'command' +] + +class AdHoc(StepImplementer): # pylint: disable=too-few-public-methods + """ + StepImplementer for the ad-hoc step for AdHoc. + """ + + def __init__( # pylint: disable=too-many-arguments + self, + workflow_result, + parent_work_dir_path, + config, + environment=None + ): + super().__init__( + workflow_result=workflow_result, + parent_work_dir_path=parent_work_dir_path, + config=config, + environment=environment + ) + + @staticmethod + def step_implementer_config_defaults(): + """Getter for the StepImplementer's configuration defaults. + + Returns + ------- + dict + Default values to use for step configuration values. + + Notes + ----- + These are the lowest precedence configuration values. + + """ + return {**DEFAULT_CONFIG} + + @staticmethod + def _required_config_or_result_keys(): + """Getter for step configuration or previous step result artifacts that are required before + running this step. + + See Also + -------- + _validate_required_config_or_previous_step_result_artifact_keys + + Returns + ------- + array_list + Array of configuration keys or previous step result artifacts + that are required before running the step. + """ + return REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS + + def _run_step(self): + """Runs the step implemented by this StepImplementer. + + Returns + ------- + StepResult + Object containing the dictionary results of this step. + """ + step_result = StepResult.from_step_implementer(self) + output_file_path = self.write_working_file('ad_hoc_output.txt') + + step_result.add_artifact( + description="Standard out and standard error from ad-hoc command run.", + name='command-output', + value=output_file_path + ) + + command = self.get_value('command') + if command is None: + step_result.success = False + step_result.message = str('"command" is not set!') + return step_result + + try: + # The 'sh' module does not handle command evaluation the same way + # bash does. By explicitly passing the given command to bash with + # the '-c' flag, the step runner should provide an equvilent + # experience to someone running the command in a bash prompt. + Shell().run( + 'bash', + args=['-c', command], + output_file_path=output_file_path + ) + except StepRunnerException as error: + step_result.success = False + step_result.message = str(error) + + return step_result diff --git a/tests/step_implementers/shared/test_ad_hoc.py b/tests/step_implementers/shared/test_ad_hoc.py new file mode 100644 index 00000000..37d2f7e1 --- /dev/null +++ b/tests/step_implementers/shared/test_ad_hoc.py @@ -0,0 +1,76 @@ +import os + +from ploigos_step_runner.results import WorkflowResult +from ploigos_step_runner.step_implementers.shared import AdHoc +from testfixtures import TempDirectory +from tests.helpers.base_step_implementer_test_case import \ + BaseStepImplementerTestCase + + +class TestAdHoc(BaseStepImplementerTestCase): + + def test_run_step_with_command(self): + with TempDirectory() as test_dir: + + # GIVEN a step implementer configured like: + config = { + 'command': 'echo "Hello World!"' + } + step_implementer = self.create_step_implementer(test_dir, config) + + # WHEN I run the step + step_result = step_implementer._run_step() + + # THEN it should return a StepResult + self.assertIsNotNone(step_result) + + # AND the StepResult should have an artifact with the command-output + self.assertIsNotNone(step_result.get_artifact('command-output').value) + + def test_run_step_fails_if_command_fails(self): + with TempDirectory() as test_dir: + + # GIVEN a step implementer configured with a command that will fail + config = { + 'command': 'echooo "Hello World!"' + } + step_implementer = self.create_step_implementer(test_dir, config) + + # WHEN I run the step + step_result = step_implementer._run_step() + + # THEN it should return a StepResult + self.assertIsNotNone(step_result) + + # AND the StepResult should be marked success False + self.assertEqual(step_result.success, False) + + def test_run_step_fails_if_command_not_provided(self): + with TempDirectory() as test_dir: + + # GIVEN a step implementer configured like: + step_implementer = self.create_step_implementer(test_dir, {}) + + # WHEN I run the step + step_result = step_implementer._run_step() + + # THEN it should return a StepResult + self.assertIsNotNone(step_result) + + # AND the StepResult should be marked success False + self.assertEqual(step_result.success, False) + + def test__required_config_or_result_keys(self): + required_keys = AdHoc._required_config_or_result_keys() + self.assertEqual(required_keys, ['command']) + + def create_step_implementer(self, test_dir, step_config): + parent_work_dir_path = os.path.join(test_dir.path, 'working') + return self.create_given_step_implementer( + step_implementer=AdHoc, + step_config=step_config, + step_name='adhoc', + implementer='AdHoc', + workflow_result=WorkflowResult(), + parent_work_dir_path=parent_work_dir_path + )