From 1ce76abc26013f3c7b9ebd78ed893118d5319061 Mon Sep 17 00:00:00 2001 From: Joe Shimkus Date: Tue, 24 Oct 2023 14:40:48 -0400 Subject: [PATCH 1/6] refactor: reduce duplicate run template action code Signed-off-by: Joe Shimkus --- ansible_rulebook/action/run_job_template.py | 147 +++----------- ansible_rulebook/action/run_template.py | 186 ++++++++++++++++++ .../action/run_workflow_template.py | 153 +++----------- tests/unit/action/test_debug.py | 2 +- tests/unit/action/test_post_event.py | 2 +- tests/unit/action/test_retract_fact.py | 3 +- tests/unit/action/test_run_job_template.py | 6 +- tests/unit/action/test_run_playbook.py | 4 +- .../unit/action/test_run_workflow_template.py | 6 +- tests/unit/action/test_set_fact.py | 2 +- 10 files changed, 242 insertions(+), 269 deletions(-) create mode 100644 ansible_rulebook/action/run_template.py diff --git a/ansible_rulebook/action/run_job_template.py b/ansible_rulebook/action/run_job_template.py index 72a95d06..94d8e285 100644 --- a/ansible_rulebook/action/run_job_template.py +++ b/ansible_rulebook/action/run_job_template.py @@ -12,146 +12,43 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import logging -import uuid -from urllib.parse import urljoin -from drools import ruleset as lang +from ansible_rulebook.exception import JobTemplateNotFoundException -from ansible_rulebook.conf import settings -from ansible_rulebook.exception import ( - ControllerApiException, - JobTemplateNotFoundException, -) from ansible_rulebook.job_template_runner import job_template_runner -from ansible_rulebook.util import run_at -from .control import Control -from .helper import Helper -from .metadata import Metadata +from .run_template import RunTemplate logger = logging.getLogger(__name__) -class RunJobTemplate: +class RunJobTemplate(RunTemplate): """run_job_template action launches a specified job template on the controller. It waits for the job to be complete. """ - def __init__(self, metadata: Metadata, control: Control, **action_args): - self.helper = Helper(metadata, control, "run_job_template") - self.action_args = action_args - self.name = self.action_args["name"] - self.organization = self.action_args["organization"] - self.job_id = str(uuid.uuid4()) - self.job_args = self.action_args.get("job_args", {}) - self.job_args["limit"] = ",".join(self.helper.control.hosts) - self.controller_job = {} + @property + def _exceptions(self): + return super()._exceptions + (JobTemplateNotFoundException,) - async def __call__(self): - logger.info( - "running job template: %s, organization: %s", - self.name, - self.organization, - ) - logger.info( - "ruleset: %s, rule %s", - self.helper.metadata.rule_set, - self.helper.metadata.rule, - ) + @property + def _template(self): + return job_template_runner.run_job_template - self.job_args["extra_vars"] = self.helper.collect_extra_vars( - self.job_args.get("extra_vars", {}) - ) - await self._job_start_event() - await self._run() + @property + def _template_id(self): + return "run_job_template" - async def _run(self): - retries = self.action_args.get("retries", 0) - if self.action_args.get("retry", False): - retries = max(self.action_args.get("retries", 0), 1) - delay = self.action_args.get("delay", 0) + @property + def _template_name(self): + return "job template" - try: - for i in range(retries + 1): - if i > 0: - if delay > 0: - await asyncio.sleep(delay) - logger.info( - "Previous run_job_template failed. Retry %d of %d", - i, - retries, - ) - controller_job = await job_template_runner.run_job_template( - self.name, - self.organization, - self.job_args, - ) - if controller_job["status"] != "failed": - break - except (ControllerApiException, JobTemplateNotFoundException) as ex: - logger.error(ex) - controller_job = {} - controller_job["status"] = "failed" - controller_job["created"] = run_at() - controller_job["error"] = str(ex) + @property + def _url_prefix(self): + return super()._url_prefix + "jobs/" - self.controller_job = controller_job - await self._post_process() - - async def _post_process(self) -> None: - a_log = { - "job_template_name": self.name, - "organization": self.organization, - "job_id": self.job_id, - "status": self.controller_job["status"], - "run_at": self.controller_job["created"], - "url": self._controller_job_url(), - "matching_events": self.helper.get_events(), - } - if "error" in self.controller_job: - a_log["message"] = self.controller_job["error"] - a_log["reason"] = {"error": self.controller_job["error"]} - - await self.helper.send_status(a_log) - set_facts = self.action_args.get("set_facts", False) - post_events = self.action_args.get("post_events", False) - - if set_facts or post_events: - ruleset = self.action_args.get( - "ruleset", self.helper.metadata.rule_set - ) - logger.debug("set_facts") - facts = self.controller_job.get("artifacts", {}) - if facts: - facts = self.helper.embellish_internal_event(facts) - logger.debug("facts %s", facts) - if set_facts: - lang.assert_fact(ruleset, facts) - if post_events: - lang.post(ruleset, facts) - else: - logger.debug("Empty facts are not set") - - async def _job_start_event(self): - await self.helper.send_status( - { - "run_at": run_at(), - "matching_events": self.helper.get_events(), - "action": self.helper.action, - "hosts": ",".join(self.helper.control.hosts), - "name": self.name, - "job_id": self.job_id, - "ansible_rulebook_id": settings.identifier, - }, - obj_type="Job", - ) - - def _controller_job_url(self) -> str: - if "id" in self.controller_job: - return urljoin( - job_template_runner.host, - "/#/jobs/" f"{self.controller_job['id']}/" "details", - ) - return "" + def _make_log(self): + log = super()._make_log() + log["job_template_name"] = self.name + return log diff --git a/ansible_rulebook/action/run_template.py b/ansible_rulebook/action/run_template.py new file mode 100644 index 00000000..9f8b35ba --- /dev/null +++ b/ansible_rulebook/action/run_template.py @@ -0,0 +1,186 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import uuid +from urllib.parse import urljoin + +import drools + +from ansible_rulebook.conf import settings +from ansible_rulebook.exception import ControllerApiException +from ansible_rulebook.job_template_runner import job_template_runner +from ansible_rulebook.util import run_at + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) + + +class RunTemplate: + """Common superclass for template-based actions. Launches the appropriate + specified template on the controller. It waits for the job to be complete. + """ + + @property + def _exceptions(self): + return (ControllerApiException,) + + @property + def _template(self): + raise NotImplementedError + + @property + def _template_id(self): + raise NotImplementedError + + @property + def _template_name(self): + raise NotImplementedError + + @property + def _url_path(self): + return self._url_prefix + f"{self.controller_job['id']}/" + "details" + + @property + def _url_prefix(self): + return "/#/" + + def _make_log(self): + log = { + "organization": self.organization, + "job_id": self.job_id, + "status": self.controller_job["status"], + "run_at": self.controller_job["created"], + "url": self._controller_job_url(), + "matching_events": self.helper.get_events(), + } + if "error" in self.controller_job: + log["message"] = self.controller_job["error"] + log["reason"] = {"error": self.controller_job["error"]} + return log + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, self._template_id) + self.action_args = action_args + self.name = self.action_args["name"] + self.organization = self.action_args["organization"] + self.job_id = str(uuid.uuid4()) + self.job_args = self.action_args.get("job_args", {}) + self.job_args["limit"] = ",".join(self.helper.control.hosts) + self.controller_job = {} + + async def __call__(self): + logger.info( + "running %s: %s, organization: %s", + self._template_name, + self.name, + self.organization, + ) + logger.info( + "ruleset: %s, rule %s", + self.helper.metadata.rule_set, + self.helper.metadata.rule, + ) + + self.job_args["extra_vars"] = self.helper.collect_extra_vars( + self.job_args.get("extra_vars", {}) + ) + await self._job_start_event() + await self._run() + + async def _run(self): + retries = self.action_args.get("retries", 0) + if self.action_args.get("retry", False): + retries = max(self.action_args.get("retries", 0), 1) + delay = self.action_args.get("delay", 0) + + try: + for i in range(retries + 1): + if i > 0: + if delay > 0: + await asyncio.sleep(delay) + logger.info( + "Previous %s failed. " + "Retry %d of %d", + self._template_id, + i, + retries, + ) + controller_job = ( + await self._template( + self.name, + self.organization, + self.job_args, + ) + ) + if controller_job["status"] != "failed": + break + except self._exceptions as ex: + logger.error(ex) + controller_job = {} + controller_job["status"] = "failed" + controller_job["created"] = run_at() + controller_job["error"] = str(ex) + + self.controller_job = controller_job + await self._post_process() + + async def _post_process(self) -> None: + a_log = self._make_log() + + await self.helper.send_status(a_log) + set_facts = self.action_args.get("set_facts", False) + post_events = self.action_args.get("post_events", False) + + if set_facts or post_events: + ruleset = self.action_args.get( + "ruleset", self.helper.metadata.rule_set + ) + logger.debug("set_facts") + facts = self.controller_job.get("artifacts", {}) + if facts: + facts = self.helper.embellish_internal_event(facts) + logger.debug("facts %s", facts) + if set_facts: + drools.ruleset.assert_fact(ruleset, facts) + if post_events: + drools.ruleset.post(ruleset, facts) + else: + logger.debug("Empty facts are not set") + + async def _job_start_event(self): + await self.helper.send_status( + { + "run_at": run_at(), + "matching_events": self.helper.get_events(), + "action": self.helper.action, + "hosts": ",".join(self.helper.control.hosts), + "name": self.name, + "job_id": self.job_id, + "ansible_rulebook_id": settings.identifier, + }, + obj_type="Job", + ) + + def _controller_job_url(self) -> str: + if "id" in self.controller_job: + return urljoin( + job_template_runner.host, + self._url_path, + ) + return "" diff --git a/ansible_rulebook/action/run_workflow_template.py b/ansible_rulebook/action/run_workflow_template.py index 072628da..5dd7dff7 100644 --- a/ansible_rulebook/action/run_workflow_template.py +++ b/ansible_rulebook/action/run_workflow_template.py @@ -12,152 +12,43 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import logging -import uuid -from urllib.parse import urljoin -from drools import ruleset as lang +from ansible_rulebook.exception import WorkflowJobTemplateNotFoundException -from ansible_rulebook.conf import settings -from ansible_rulebook.exception import ( - ControllerApiException, - WorkflowJobTemplateNotFoundException, -) from ansible_rulebook.job_template_runner import job_template_runner -from ansible_rulebook.util import run_at -from .control import Control -from .helper import Helper -from .metadata import Metadata +from .run_template import RunTemplate logger = logging.getLogger(__name__) -class RunWorkflowTemplate: +class RunWorkflowTemplate(RunTemplate): """run_workflow_template action launches a specified workflow template on the controller. It waits for the job to be complete. """ - def __init__(self, metadata: Metadata, control: Control, **action_args): - self.helper = Helper(metadata, control, "run_workflow_template") - self.action_args = action_args - self.name = self.action_args["name"] - self.organization = self.action_args["organization"] - self.job_id = str(uuid.uuid4()) - self.job_args = self.action_args.get("job_args", {}) - self.job_args["limit"] = ",".join(self.helper.control.hosts) - self.controller_job = {} + @property + def _exceptions(self): + return super()._exceptions + (WorkflowJobTemplateNotFoundException,) - async def __call__(self): - logger.info( - "running workflow template: %s, organization: %s", - self.name, - self.organization, - ) - logger.info( - "ruleset: %s, rule %s", - self.helper.metadata.rule_set, - self.helper.metadata.rule, - ) + @property + def _template(self): + return job_template_runner.run_workflow_job_template - self.job_args["extra_vars"] = self.helper.collect_extra_vars( - self.job_args.get("extra_vars", {}) - ) - await self._job_start_event() - await self._run() + @property + def _template_id(self): + return "run_workflow_template" - async def _run(self): - retries = self.action_args.get("retries", 0) - if self.action_args.get("retry", False): - retries = max(self.action_args.get("retries", 0), 1) - delay = self.action_args.get("delay", 0) + @property + def _template_name(self): + return "workflow template" - try: - for i in range(retries + 1): - if i > 0: - if delay > 0: - await asyncio.sleep(delay) - logger.info( - "Previous run_workflow_template failed. " - "Retry %d of %d", - i, - retries, - ) - controller_job = ( - await job_template_runner.run_workflow_job_template( - self.name, - self.organization, - self.job_args, - ) - ) - if controller_job["status"] != "failed": - break - except ( - ControllerApiException, - WorkflowJobTemplateNotFoundException, - ) as ex: - logger.error(ex) - controller_job = {} - controller_job["status"] = "failed" - controller_job["created"] = run_at() - controller_job["error"] = str(ex) + @property + def _url_prefix(self): + return super()._url_prefix + "jobs/workflow/" - self.controller_job = controller_job - await self._post_process() - - async def _post_process(self) -> None: - a_log = { - "name": self.name, - "organization": self.organization, - "job_id": self.job_id, - "status": self.controller_job["status"], - "run_at": self.controller_job["created"], - "url": self._controller_job_url(), - "matching_events": self.helper.get_events(), - } - if "error" in self.controller_job: - a_log["message"] = self.controller_job["error"] - a_log["reason"] = {"error": self.controller_job["error"]} - - await self.helper.send_status(a_log) - set_facts = self.action_args.get("set_facts", False) - post_events = self.action_args.get("post_events", False) - - if set_facts or post_events: - ruleset = self.action_args.get( - "ruleset", self.helper.metadata.rule_set - ) - logger.debug("set_facts") - facts = self.controller_job.get("artifacts", {}) - if facts: - facts = self.helper.embellish_internal_event(facts) - logger.debug("facts %s", facts) - if set_facts: - lang.assert_fact(ruleset, facts) - if post_events: - lang.post(ruleset, facts) - else: - logger.debug("Empty facts are not set") - - async def _job_start_event(self): - await self.helper.send_status( - { - "run_at": run_at(), - "matching_events": self.helper.get_events(), - "action": self.helper.action, - "hosts": ",".join(self.helper.control.hosts), - "name": self.name, - "job_id": self.job_id, - "ansible_rulebook_id": settings.identifier, - }, - obj_type="Job", - ) - - def _controller_job_url(self) -> str: - if "id" in self.controller_job: - return urljoin( - job_template_runner.host, - "/#/jobs/workflow/" f"{self.controller_job['id']}/" "details", - ) - return "" + def _make_log(self): + log = super()._make_log() + log["name"] = self.name + return log diff --git a/tests/unit/action/test_debug.py b/tests/unit/action/test_debug.py index 91dbee97..0c1f5009 100644 --- a/tests/unit/action/test_debug.py +++ b/tests/unit/action/test_debug.py @@ -89,7 +89,7 @@ async def test_debug(): with patch("uuid.uuid4", return_value=DUMMY_UUID): with patch( - "ansible_rulebook.action.run_job_template.lang.get_facts", + "drools.ruleset.get_facts", return_value={"a": 1}, ) as drools_mock: await Debug(metadata, control, **action_args)() diff --git a/tests/unit/action/test_post_event.py b/tests/unit/action/test_post_event.py index f2d0b3be..86bb4b03 100644 --- a/tests/unit/action/test_post_event.py +++ b/tests/unit/action/test_post_event.py @@ -52,7 +52,7 @@ async def test_noop(): with patch("uuid.uuid4", return_value=DUMMY_UUID): with patch( - "ansible_rulebook.action.run_job_template.lang.post" + "drools.ruleset.post", ) as drools_mock: await PostEvent(metadata, control, **action_args)() drools_mock.assert_called_once_with( diff --git a/tests/unit/action/test_retract_fact.py b/tests/unit/action/test_retract_fact.py index 8fc69422..8966b67d 100644 --- a/tests/unit/action/test_retract_fact.py +++ b/tests/unit/action/test_retract_fact.py @@ -59,8 +59,7 @@ async def test_retract_fact(partial, keys_excluded): with patch("uuid.uuid4", return_value=DUMMY_UUID): with patch( - "ansible_rulebook.action.run_job_template." - "lang.retract_matching_facts" + "drools.ruleset.retract_matching_facts" ) as drools_mock: await RetractFact(metadata, control, **action_args)() drools_mock.assert_called_once_with( diff --git a/tests/unit/action/test_run_job_template.py b/tests/unit/action/test_run_job_template.py index 8646729e..557eb695 100644 --- a/tests/unit/action/test_run_job_template.py +++ b/tests/unit/action/test_run_job_template.py @@ -106,11 +106,11 @@ async def test_run_job_template_exception(err_msg, err): DROOLS_CALLS = [ ( - "ansible_rulebook.action.run_job_template.lang.assert_fact", + "drools.ruleset.assert_fact", dict(set_facts=True), ), ( - "ansible_rulebook.action.run_job_template.lang.post", + "drools.ruleset.post", dict(post_events=True), ), ] @@ -209,7 +209,7 @@ async def test_run_job_template_retries(): side_effect=controller_job, ): with patch( - "ansible_rulebook.action.run_job_template.lang.assert_fact" + "drools.ruleset.assert_fact", ) as drools_mock: await RunJobTemplate(metadata, control, **action_args)() drools_mock.assert_called_once() diff --git a/tests/unit/action/test_run_playbook.py b/tests/unit/action/test_run_playbook.py index d4d888ec..82adf576 100644 --- a/tests/unit/action/test_run_playbook.py +++ b/tests/unit/action/test_run_playbook.py @@ -80,11 +80,11 @@ def _validate(queue, metadata, status, rc): DROOLS_CALLS = [ ( - "ansible_rulebook.action.run_job_template.lang.assert_fact", + "drools.ruleset.assert_fact", dict(set_facts=True), ), ( - "ansible_rulebook.action.run_job_template.lang.post", + "drools.ruleset.post", dict(post_events=True), ), ] diff --git a/tests/unit/action/test_run_workflow_template.py b/tests/unit/action/test_run_workflow_template.py index c780fe17..cc40ca98 100644 --- a/tests/unit/action/test_run_workflow_template.py +++ b/tests/unit/action/test_run_workflow_template.py @@ -109,11 +109,11 @@ async def test_run_workflow_template_exception(err_msg, err): DROOLS_CALLS = [ ( - "ansible_rulebook.action.run_workflow_template.lang.assert_fact", + "drools.ruleset.assert_fact", dict(set_facts=True), ), ( - "ansible_rulebook.action.run_workflow_template.lang.post", + "drools.ruleset.post", dict(post_events=True), ), ] @@ -212,7 +212,7 @@ async def test_run_workflow_template_retries(): side_effect=controller_job, ): with patch( - "ansible_rulebook.action.run_workflow_template.lang.assert_fact" + "drools.ruleset.assert_fact", ) as drools_mock: await RunWorkflowTemplate(metadata, control, **action_args)() drools_mock.assert_called_once() diff --git a/tests/unit/action/test_set_fact.py b/tests/unit/action/test_set_fact.py index f06c7dfb..7c4dd309 100644 --- a/tests/unit/action/test_set_fact.py +++ b/tests/unit/action/test_set_fact.py @@ -52,7 +52,7 @@ async def test_noop(): with patch("uuid.uuid4", return_value=DUMMY_UUID): with patch( - "ansible_rulebook.action.run_job_template.lang.assert_fact" + "drools.ruleset.assert_fact", ) as drools_mock: await SetFact(metadata, control, **action_args)() drools_mock.assert_called_once_with( From 30bb0c7cb2a90813d85f6532f759cfde4d021967 Mon Sep 17 00:00:00 2001 From: Joe Shimkus Date: Tue, 24 Oct 2023 18:00:38 -0400 Subject: [PATCH 2/6] refactor: reduce duplicate run action code Signed-off-by: Joe Shimkus --- .../action/{run_template.py => run_base.py} | 146 ++++++++++------- ansible_rulebook/action/run_job_template.py | 5 +- ansible_rulebook/action/run_playbook.py | 147 ++++++++---------- .../action/run_workflow_template.py | 5 +- tests/unit/action/test_retract_fact.py | 4 +- 5 files changed, 161 insertions(+), 146 deletions(-) rename ansible_rulebook/action/{run_template.py => run_base.py} (69%) diff --git a/ansible_rulebook/action/run_template.py b/ansible_rulebook/action/run_base.py similarity index 69% rename from ansible_rulebook/action/run_template.py rename to ansible_rulebook/action/run_base.py index 9f8b35ba..61e357bc 100644 --- a/ansible_rulebook/action/run_template.py +++ b/ansible_rulebook/action/run_base.py @@ -15,6 +15,7 @@ import asyncio import logging import uuid +from typing import Callable from urllib.parse import urljoin import drools @@ -31,36 +32,105 @@ logger = logging.getLogger(__name__) -class RunTemplate: - """Common superclass for template-based actions. Launches the appropriate +class RunBase: + """Common superclass for "run" actions.""" + + @property + def _job_data(self) -> dict: + data = { + "run_at": run_at(), + "matching_events": self.helper.get_events(), + "action": self.helper.action, + "name": self.name, + "job_id": self.job_id, + "ansible_rulebook_id": settings.identifier, + } + return data + + @property + def _template_id(self) -> str: + raise NotImplementedError + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, self._template_id) + self.action_args = action_args + self.job_id = str(uuid.uuid4()) + self.name = self.action_args["name"] + + async def __call__(self): + raise NotImplementedError + + async def _do_run(self) -> bool: + raise NotImplementedError + + async def _run(self): + retries = self.action_args.get("retries", 0) + if self.action_args.get("retry", False): + retries = max(self.action_args.get("retries", 0), 1) + delay = self.action_args.get("delay", 0) + + for i in range(retries + 1): + if i > 0: + if delay > 0: + await asyncio.sleep(delay) + logger.info( + "Previous %s failed. Retry %d of %d", + self._template_id, + i, + retries, + ) + + retry = await self._do_run() + if not retry: + break + + await self._post_process() + + async def _pre_process(self) -> None: + pass + + async def _post_process(self) -> None: + pass + + async def _job_start_event(self): + await self.helper.send_status( + self._job_data, + obj_type="Job", + ) + + +class RunTemplate(RunBase): + """Superclass for template-based run actions. Launches the appropriate specified template on the controller. It waits for the job to be complete. """ @property - def _exceptions(self): + def _exceptions(self) -> tuple: return (ControllerApiException,) @property - def _template(self): - raise NotImplementedError + def _job_data(self) -> dict: + data = super()._job_data + data["hosts"] = ",".join(self.helper.control.hosts) + return data @property - def _template_id(self): + def _run_job(self) -> Callable: raise NotImplementedError @property - def _template_name(self): + def _template_name(self) -> str: raise NotImplementedError @property - def _url_path(self): + def _url_path(self) -> str: return self._url_prefix + f"{self.controller_job['id']}/" + "details" @property - def _url_prefix(self): + def _url_prefix(self) -> str: return "/#/" - def _make_log(self): + def _make_log(self) -> dict: log = { "organization": self.organization, "job_id": self.job_id, @@ -75,11 +145,8 @@ def _make_log(self): return log def __init__(self, metadata: Metadata, control: Control, **action_args): - self.helper = Helper(metadata, control, self._template_id) - self.action_args = action_args - self.name = self.action_args["name"] + super().__init__(metadata, control, **action_args) self.organization = self.action_args["organization"] - self.job_id = str(uuid.uuid4()) self.job_args = self.action_args.get("job_args", {}) self.job_args["limit"] = ",".join(self.helper.control.hosts) self.controller_job = {} @@ -103,34 +170,16 @@ async def __call__(self): await self._job_start_event() await self._run() - async def _run(self): - retries = self.action_args.get("retries", 0) - if self.action_args.get("retry", False): - retries = max(self.action_args.get("retries", 0), 1) - delay = self.action_args.get("delay", 0) - + async def _do_run(self) -> bool: + exception = False try: - for i in range(retries + 1): - if i > 0: - if delay > 0: - await asyncio.sleep(delay) - logger.info( - "Previous %s failed. " - "Retry %d of %d", - self._template_id, - i, - retries, - ) - controller_job = ( - await self._template( - self.name, - self.organization, - self.job_args, - ) - ) - if controller_job["status"] != "failed": - break + controller_job = await self._run_job( + self.name, + self.organization, + self.job_args, + ) except self._exceptions as ex: + exception = True logger.error(ex) controller_job = {} controller_job["status"] = "failed" @@ -138,7 +187,8 @@ async def _run(self): controller_job["error"] = str(ex) self.controller_job = controller_job - await self._post_process() + + return (not exception) and (self.controller_job["status"] == "failed") async def _post_process(self) -> None: a_log = self._make_log() @@ -163,19 +213,7 @@ async def _post_process(self) -> None: else: logger.debug("Empty facts are not set") - async def _job_start_event(self): - await self.helper.send_status( - { - "run_at": run_at(), - "matching_events": self.helper.get_events(), - "action": self.helper.action, - "hosts": ",".join(self.helper.control.hosts), - "name": self.name, - "job_id": self.job_id, - "ansible_rulebook_id": settings.identifier, - }, - obj_type="Job", - ) + await super()._post_process() def _controller_job_url(self) -> str: if "id" in self.controller_job: diff --git a/ansible_rulebook/action/run_job_template.py b/ansible_rulebook/action/run_job_template.py index 94d8e285..114d904e 100644 --- a/ansible_rulebook/action/run_job_template.py +++ b/ansible_rulebook/action/run_job_template.py @@ -15,10 +15,9 @@ import logging from ansible_rulebook.exception import JobTemplateNotFoundException - from ansible_rulebook.job_template_runner import job_template_runner -from .run_template import RunTemplate +from .run_base import RunTemplate logger = logging.getLogger(__name__) @@ -33,7 +32,7 @@ def _exceptions(self): return super()._exceptions + (JobTemplateNotFoundException,) @property - def _template(self): + def _run_job(self): return job_template_runner.run_job_template @property diff --git a/ansible_rulebook/action/run_playbook.py b/ansible_rulebook/action/run_playbook.py index 2325daab..31f57e3d 100644 --- a/ansible_rulebook/action/run_playbook.py +++ b/ansible_rulebook/action/run_playbook.py @@ -19,17 +19,16 @@ import os import shutil import tempfile -import uuid +from typing import Any +import drools import yaml -from drools import ruleset as lang from ansible_rulebook.collection import ( find_playbook, has_playbook, split_collection_name, ) -from ansible_rulebook.conf import settings from ansible_rulebook.exception import ( MissingArtifactKeyException, PlaybookNotFoundException, @@ -38,8 +37,8 @@ from ansible_rulebook.util import create_inventory, run_at from .control import Control -from .helper import Helper from .metadata import Metadata +from .run_base import RunBase from .runner import Runner logger = logging.getLogger(__name__) @@ -47,18 +46,25 @@ tar = shutil.which("tar") -class RunPlaybook: +class RunPlaybook(RunBase): """run_playbook action runs an ansible playbook using the ansible-runner """ + @property + def _job_data(self) -> dict: + data = super()._job_data + data["hosts"] = self.host_limit + return data + + @property + def _template_id(self) -> str: + return "run_playbook" + def __init__(self, metadata: Metadata, control: Control, **action_args): - self.helper = Helper(metadata, control, "run_playbook") - self.action_args = action_args - self.job_id = str(uuid.uuid4()) + super().__init__(metadata, control, **action_args) self.default_copy_files = True self.default_check_files = True - self.name = self.action_args["name"] self.verbosity = self.action_args.get("verbosity", 0) self.json_mode = self.action_args.get("json_mode", False) self.host_limit = ",".join(self.helper.control.hosts) @@ -81,53 +87,24 @@ async def __call__(self): if os.path.exists(self.private_data_dir): shutil.rmtree(self.private_data_dir) - async def _job_start_event(self): - await self.helper.send_status( - { - "run_at": run_at(), - "matching_events": self.helper.get_events(), - "action": self.helper.action, - "hosts": self.host_limit, - "name": self.name, - "job_id": self.job_id, - "ansible_rulebook_id": settings.identifier, - }, - obj_type="Job", - ) - - async def _run(self): - retries = self.action_args.get("retries", 0) - if self.action_args.get("retry", False): - retries = max(self.action_args.get("retries", 0), 1) - - delay = self.action_args.get("delay", 0) - - for i in range(retries + 1): - if i > 0: - if delay > 0: - await asyncio.sleep(delay) - logger.info( - "Previous run_playbook failed. Retry %d of %d", i, retries - ) - - await Runner( - self.private_data_dir, - self.host_limit, - self.verbosity, - self.job_id, - self.json_mode, - self.helper, - self._runner_args(), - )() - if self._get_latest_artifact("status") != "failed": - break - - await self._post_process() - - def _runner_args(self): + async def _do_run(self) -> bool: + await Runner( + self.private_data_dir, + self.host_limit, + self.verbosity, + self.job_id, + self.json_mode, + self.helper, + self._runner_args(), + )() + return self._get_latest_artifact("status") == "failed" + + def _runner_args(self) -> dict: return {"playbook": self.name, "inventory": self.inventory} async def _pre_process(self) -> None: + await super()._pre_process() + playbook_extra_vars = self.helper.collect_extra_vars( self.action_args.get("extra_vars", {}) ) @@ -159,32 +136,7 @@ async def _pre_process(self) -> None: return self._copy_playbook_files(project_dir) - def _copy_playbook_files(self, project_dir): - if self.action_args.get("check_files", self.default_check_files): - if os.path.exists(self.name): - tail_name = os.path.basename(self.name) - shutil.copy(self.name, os.path.join(project_dir, tail_name)) - if self.action_args.get("copy_files", self.default_copy_files): - shutil.copytree( - os.path.dirname(os.path.abspath(self.name)), - project_dir, - dirs_exist_ok=True, - ) - self.name = tail_name - elif has_playbook(*split_collection_name(self.name)): - shutil.copy( - find_playbook(*split_collection_name(self.name)), - os.path.join(project_dir, self.name), - ) - else: - msg = ( - f"Could not find a playbook for {self.name} " - f"from {os.getcwd()}" - ) - logger.error(msg) - raise PlaybookNotFoundException(msg) - - async def _post_process(self): + async def _post_process(self) -> None: rc = int(self._get_latest_artifact("rc")) status = self._get_latest_artifact("status") logger.info("Ansible runner rc: %d, status: %s", rc, status) @@ -230,11 +182,40 @@ async def _post_process(self): fact = self.helper.embellish_internal_event(fact) logger.debug("fact %s", fact) if set_facts: - lang.assert_fact(ruleset, fact) + drools.ruleset.assert_fact(ruleset, fact) if post_events: - lang.post(ruleset, fact) + drools.ruleset.post(ruleset, fact) + + await super()._post_process() + + def _copy_playbook_files(self, project_dir) -> None: + if self.action_args.get("check_files", self.default_check_files): + if os.path.exists(self.name): + tail_name = os.path.basename(self.name) + shutil.copy(self.name, os.path.join(project_dir, tail_name)) + if self.action_args.get("copy_files", self.default_copy_files): + shutil.copytree( + os.path.dirname(os.path.abspath(self.name)), + project_dir, + dirs_exist_ok=True, + ) + self.name = tail_name + elif has_playbook(*split_collection_name(self.name)): + shutil.copy( + find_playbook(*split_collection_name(self.name)), + os.path.join(project_dir, self.name), + ) + else: + msg = ( + f"Could not find a playbook for {self.name} " + f"from {os.getcwd()}" + ) + logger.error(msg) + raise PlaybookNotFoundException(msg) - def _get_latest_artifact(self, component: str, content: bool = True): + def _get_latest_artifact( + self, component: str, content: bool = True + ) -> Any: files = glob.glob( os.path.join(self.private_data_dir, "artifacts", "*", component) ) @@ -247,7 +228,7 @@ def _get_latest_artifact(self, component: str, content: bool = True): return content return files[0] - async def _untar_project(self, output_dir, project_data_file): + async def _untar_project(self, output_dir, project_data_file) -> None: cmd = [tar, "zxvf", project_data_file] proc = await asyncio.create_subprocess_exec( diff --git a/ansible_rulebook/action/run_workflow_template.py b/ansible_rulebook/action/run_workflow_template.py index 5dd7dff7..bbd60b16 100644 --- a/ansible_rulebook/action/run_workflow_template.py +++ b/ansible_rulebook/action/run_workflow_template.py @@ -15,10 +15,9 @@ import logging from ansible_rulebook.exception import WorkflowJobTemplateNotFoundException - from ansible_rulebook.job_template_runner import job_template_runner -from .run_template import RunTemplate +from .run_base import RunTemplate logger = logging.getLogger(__name__) @@ -33,7 +32,7 @@ def _exceptions(self): return super()._exceptions + (WorkflowJobTemplateNotFoundException,) @property - def _template(self): + def _run_job(self): return job_template_runner.run_workflow_job_template @property diff --git a/tests/unit/action/test_retract_fact.py b/tests/unit/action/test_retract_fact.py index 8966b67d..84d07f92 100644 --- a/tests/unit/action/test_retract_fact.py +++ b/tests/unit/action/test_retract_fact.py @@ -58,9 +58,7 @@ async def test_retract_fact(partial, keys_excluded): } with patch("uuid.uuid4", return_value=DUMMY_UUID): - with patch( - "drools.ruleset.retract_matching_facts" - ) as drools_mock: + with patch("drools.ruleset.retract_matching_facts") as drools_mock: await RetractFact(metadata, control, **action_args)() drools_mock.assert_called_once_with( action_args["ruleset"], From 7d35428e88469796d8c573d8c46890c37e1d5293 Mon Sep 17 00:00:00 2001 From: Joe Shimkus Date: Wed, 25 Oct 2023 09:07:43 -0400 Subject: [PATCH 3/6] refactor: rename _template_id as _action_name Signed-off-by: Joe Shimkus --- ansible_rulebook/action/run_base.py | 6 +++--- ansible_rulebook/action/run_job_template.py | 8 ++++---- ansible_rulebook/action/run_playbook.py | 8 ++++---- ansible_rulebook/action/run_workflow_template.py | 8 ++++---- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ansible_rulebook/action/run_base.py b/ansible_rulebook/action/run_base.py index 61e357bc..ebb704b5 100644 --- a/ansible_rulebook/action/run_base.py +++ b/ansible_rulebook/action/run_base.py @@ -48,11 +48,11 @@ def _job_data(self) -> dict: return data @property - def _template_id(self) -> str: + def _action_name(self) -> str: raise NotImplementedError def __init__(self, metadata: Metadata, control: Control, **action_args): - self.helper = Helper(metadata, control, self._template_id) + self.helper = Helper(metadata, control, self._action_name) self.action_args = action_args self.job_id = str(uuid.uuid4()) self.name = self.action_args["name"] @@ -75,7 +75,7 @@ async def _run(self): await asyncio.sleep(delay) logger.info( "Previous %s failed. Retry %d of %d", - self._template_id, + self._action_name, i, retries, ) diff --git a/ansible_rulebook/action/run_job_template.py b/ansible_rulebook/action/run_job_template.py index 114d904e..4c5e61d6 100644 --- a/ansible_rulebook/action/run_job_template.py +++ b/ansible_rulebook/action/run_job_template.py @@ -27,6 +27,10 @@ class RunJobTemplate(RunTemplate): the controller. It waits for the job to be complete. """ + @property + def _action_name(self): + return "run_job_template" + @property def _exceptions(self): return super()._exceptions + (JobTemplateNotFoundException,) @@ -35,10 +39,6 @@ def _exceptions(self): def _run_job(self): return job_template_runner.run_job_template - @property - def _template_id(self): - return "run_job_template" - @property def _template_name(self): return "job template" diff --git a/ansible_rulebook/action/run_playbook.py b/ansible_rulebook/action/run_playbook.py index 31f57e3d..f1869011 100644 --- a/ansible_rulebook/action/run_playbook.py +++ b/ansible_rulebook/action/run_playbook.py @@ -51,16 +51,16 @@ class RunPlaybook(RunBase): ansible-runner """ + @property + def _action_name(self) -> str: + return "run_playbook" + @property def _job_data(self) -> dict: data = super()._job_data data["hosts"] = self.host_limit return data - @property - def _template_id(self) -> str: - return "run_playbook" - def __init__(self, metadata: Metadata, control: Control, **action_args): super().__init__(metadata, control, **action_args) self.default_copy_files = True diff --git a/ansible_rulebook/action/run_workflow_template.py b/ansible_rulebook/action/run_workflow_template.py index bbd60b16..5a9666f1 100644 --- a/ansible_rulebook/action/run_workflow_template.py +++ b/ansible_rulebook/action/run_workflow_template.py @@ -27,6 +27,10 @@ class RunWorkflowTemplate(RunTemplate): the controller. It waits for the job to be complete. """ + @property + def _action_name(self): + return "run_workflow_template" + @property def _exceptions(self): return super()._exceptions + (WorkflowJobTemplateNotFoundException,) @@ -35,10 +39,6 @@ def _exceptions(self): def _run_job(self): return job_template_runner.run_workflow_job_template - @property - def _template_id(self): - return "run_workflow_template" - @property def _template_name(self): return "workflow template" From f21c2242a36f54a2d835584240beb5c411f99cdc Mon Sep 17 00:00:00 2001 From: Joe Shimkus Date: Wed, 25 Oct 2023 12:01:32 -0400 Subject: [PATCH 4/6] refactor: abstract core __call__ processing Makes the core of all run actions' __call__ a sequence of pre-process, job start, run and post-process. Signed-off-by: Joe Shimkus --- ansible_rulebook/action/run_base.py | 10 +++++----- ansible_rulebook/action/run_playbook.py | 6 ++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/ansible_rulebook/action/run_base.py b/ansible_rulebook/action/run_base.py index ebb704b5..f2e4fb64 100644 --- a/ansible_rulebook/action/run_base.py +++ b/ansible_rulebook/action/run_base.py @@ -58,7 +58,10 @@ def __init__(self, metadata: Metadata, control: Control, **action_args): self.name = self.action_args["name"] async def __call__(self): - raise NotImplementedError + await self._pre_process() + await self._job_start_event() + await self._run() + await self._post_process() async def _do_run(self) -> bool: raise NotImplementedError @@ -84,8 +87,6 @@ async def _run(self): if not retry: break - await self._post_process() - async def _pre_process(self) -> None: pass @@ -167,8 +168,7 @@ async def __call__(self): self.job_args["extra_vars"] = self.helper.collect_extra_vars( self.job_args.get("extra_vars", {}) ) - await self._job_start_event() - await self._run() + await super().__call__() async def _do_run(self) -> bool: exception = False diff --git a/ansible_rulebook/action/run_playbook.py b/ansible_rulebook/action/run_playbook.py index f1869011..8d9e65c9 100644 --- a/ansible_rulebook/action/run_playbook.py +++ b/ansible_rulebook/action/run_playbook.py @@ -79,15 +79,13 @@ async def __call__(self): f"rule: {self.helper.metadata.rule}" ) logger.debug("private data dir %s", self.private_data_dir) - await self._pre_process() - await self._job_start_event() - logger.info("Calling Ansible runner") - await self._run() + await super().__call__() finally: if os.path.exists(self.private_data_dir): shutil.rmtree(self.private_data_dir) async def _do_run(self) -> bool: + logger.info("Calling Ansible runner") await Runner( self.private_data_dir, self.host_limit, From acb653483194e5aacb34b0407f1db689a9345207 Mon Sep 17 00:00:00 2001 From: Joe Shimkus Date: Fri, 27 Oct 2023 08:36:08 -0400 Subject: [PATCH 5/6] refactor: override _run in RunPlaybook Re-establishes a single log message of "Calling Ansible runner" rather than one per iteration. Signed-off-by: Joe Shimkus --- ansible_rulebook/action/run_playbook.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ansible_rulebook/action/run_playbook.py b/ansible_rulebook/action/run_playbook.py index 8d9e65c9..e23ba48f 100644 --- a/ansible_rulebook/action/run_playbook.py +++ b/ansible_rulebook/action/run_playbook.py @@ -84,8 +84,11 @@ async def __call__(self): if os.path.exists(self.private_data_dir): shutil.rmtree(self.private_data_dir) - async def _do_run(self) -> bool: + async def _run(self): logger.info("Calling Ansible runner") + await super()._run() + + async def _do_run(self) -> bool: await Runner( self.private_data_dir, self.host_limit, From 42a473556cba065c6cbf94717451be120e8c4136 Mon Sep 17 00:00:00 2001 From: Joe Shimkus Date: Mon, 30 Oct 2023 13:50:50 -0400 Subject: [PATCH 6/6] refactor: use StopAsyncIteration to stop retries Signed-off-by: Joe Shimkus --- ansible_rulebook/action/run_base.py | 16 +++++++++++----- ansible_rulebook/action/run_playbook.py | 5 +++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/ansible_rulebook/action/run_base.py b/ansible_rulebook/action/run_base.py index f2e4fb64..42f5e244 100644 --- a/ansible_rulebook/action/run_base.py +++ b/ansible_rulebook/action/run_base.py @@ -63,7 +63,11 @@ async def __call__(self): await self._run() await self._post_process() - async def _do_run(self) -> bool: + async def _do_run(self): + """Performs a standalone single attempt to effect the requested + operation. In the event of success or non-retriable error raises + StopAsyncIteration exception. + """ raise NotImplementedError async def _run(self): @@ -83,8 +87,9 @@ async def _run(self): retries, ) - retry = await self._do_run() - if not retry: + try: + await self._do_run() + except StopAsyncIteration: break async def _pre_process(self) -> None: @@ -170,7 +175,7 @@ async def __call__(self): ) await super().__call__() - async def _do_run(self) -> bool: + async def _do_run(self): exception = False try: controller_job = await self._run_job( @@ -188,7 +193,8 @@ async def _do_run(self) -> bool: self.controller_job = controller_job - return (not exception) and (self.controller_job["status"] == "failed") + if exception or (self.controller_job["status"] != "failed"): + raise StopAsyncIteration async def _post_process(self) -> None: a_log = self._make_log() diff --git a/ansible_rulebook/action/run_playbook.py b/ansible_rulebook/action/run_playbook.py index e23ba48f..c98fdfc1 100644 --- a/ansible_rulebook/action/run_playbook.py +++ b/ansible_rulebook/action/run_playbook.py @@ -88,7 +88,7 @@ async def _run(self): logger.info("Calling Ansible runner") await super()._run() - async def _do_run(self) -> bool: + async def _do_run(self): await Runner( self.private_data_dir, self.host_limit, @@ -98,7 +98,8 @@ async def _do_run(self) -> bool: self.helper, self._runner_args(), )() - return self._get_latest_artifact("status") == "failed" + if self._get_latest_artifact("status") != "failed": + raise StopAsyncIteration def _runner_args(self) -> dict: return {"playbook": self.name, "inventory": self.inventory}