Skip to content

Commit

Permalink
feat: [AAP-9829] added run_workflow_template action (#543)
Browse files Browse the repository at this point in the history
Added support for run_workflow_template action
This is similar to run_job_template.

Fixes #414 

https://issues.redhat.com/browse/AAP-9829
  • Loading branch information
mkanoor authored Aug 8, 2023
1 parent bc4f8b8 commit 050184c
Show file tree
Hide file tree
Showing 14 changed files with 702 additions and 474 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

### Added
- rulebook and Drools bracket notation syntax
- new action called run_workflow_template


### Fixed

Expand Down
138 changes: 134 additions & 4 deletions ansible_rulebook/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
PlaybookNotFoundException,
PlaybookStatusNotFoundException,
ShutdownException,
WorkflowJobTemplateNotFoundException,
)
from .job_template_runner import job_template_runner
from .messages import Shutdown
Expand Down Expand Up @@ -763,7 +764,6 @@ async def run_job_template(
)

job_id = str(uuid.uuid4())

await event_log.put(
dict(
type="Job",
Expand Down Expand Up @@ -828,11 +828,140 @@ async def run_job_template(
a_log["message"] = controller_job["error"]
await event_log.put(a_log)

if set_facts or post_events:
_post_process_awx(
controller_job, set_facts, post_events, "run_job_template", ruleset
)


async def run_workflow_template(
event_log,
inventory: str,
hosts: List,
variables: Dict,
project_data_file: str,
source_ruleset_name: str,
source_ruleset_uuid: str,
source_rule_name: str,
source_rule_uuid: str,
rule_run_at: str,
ruleset: str,
name: str,
organization: str,
job_args: Optional[dict] = None,
set_facts: Optional[bool] = None,
post_events: Optional[bool] = None,
verbosity: int = 0,
copy_files: Optional[bool] = False,
json_mode: Optional[bool] = False,
retries: Optional[int] = 0,
retry: Optional[bool] = False,
delay: Optional[int] = 0,
**kwargs,
):

logger.info(
"running workflow template: %s organization: %s", name, organization
)
logger.info("ruleset: %s, rule %s", source_ruleset_name, source_rule_name)

hosts_limit = ",".join(hosts)
if not job_args:
job_args = {}
job_args["limit"] = hosts_limit

job_args["extra_vars"] = _collect_extra_vars(
variables,
job_args.get("extra_vars", {}),
source_ruleset_name,
source_rule_name,
)

job_id = str(uuid.uuid4())

await event_log.put(
dict(
type="Job",
job_id=job_id,
ansible_rulebook_id=settings.identifier,
name=name,
ruleset=source_ruleset_name,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
hosts=hosts_limit,
action="run_workflow_template",
)
)

if retry:
retries = max(retries, 1)

try:
for i in range(retries + 1):
if i > 0:
if delay > 0:
await asyncio.sleep(delay)
logger.info(
"Previous run_workflow_template failed. Retry %d of %d",
i,
retries,
)
controller_job = (
await job_template_runner.run_workflow_job_template(
name,
organization,
job_args,
)
)
if controller_job["status"] != "failed":
break
except (
ControllerApiException,
WorkflowJobTemplateNotFoundException,
) as ex:
logger.error(ex)
controller_job = {}
controller_job["status"] = "failed"
controller_job["created"] = run_at()
controller_job["error"] = str(ex)

a_log = dict(
type="Action",
action="run_workflow_template",
action_uuid=str(uuid.uuid4()),
activation_id=settings.identifier,
job_template_name=name,
organization=organization,
job_id=job_id,
ruleset=ruleset,
ruleset_uuid=source_ruleset_uuid,
rule=source_rule_name,
rule_uuid=source_rule_uuid,
status=controller_job["status"],
run_at=controller_job["created"],
url=_controller_job_url(controller_job),
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
if "error" in controller_job:
a_log["message"] = controller_job["error"]
await event_log.put(a_log)

_post_process_awx(
controller_job,
set_facts,
post_events,
"run_workflow_template",
ruleset,
)


def _post_process_awx(controller_job, set_facts, post_events, action, ruleset):
if controller_job["status"] == "successful" and (set_facts or post_events):
logger.debug("set_facts")
facts = controller_job["artifacts"]
facts = controller_job.get("artifacts", None)
if facts:
facts = _embellish_internal_event(facts, "run_job_template")
facts = _embellish_internal_event(facts, action)
logger.debug("facts %s", facts)
if set_facts:
lang.assert_fact(ruleset, facts)
Expand Down Expand Up @@ -896,6 +1025,7 @@ async def shutdown(
run_playbook=run_playbook,
run_module=run_module,
run_job_template=run_job_template,
run_workflow_template=run_workflow_template,
shutdown=shutdown,
)

Expand Down
5 changes: 5 additions & 0 deletions ansible_rulebook/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class JobTemplateNotFoundException(Exception):
pass


class WorkflowJobTemplateNotFoundException(Exception):

pass


class WebSocketExchangeException(Exception):

pass
Expand Down
114 changes: 84 additions & 30 deletions ansible_rulebook/job_template_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
from ansible_rulebook.exception import (
ControllerApiException,
JobTemplateNotFoundException,
WorkflowJobTemplateNotFoundException,
)

logger = logging.getLogger(__name__)


class JobTemplateRunner:
JOB_TEMPLATE_SLUG = "/api/v2/job_templates"
UNIFIED_TEMPLATE_SLUG = "/api/v2/unified_job_templates/"
CONFIG_SLUG = "/api/v2/config"
JOB_COMPLETION_STATUSES = ["successful", "failed", "error", "canceled"]

Expand All @@ -43,8 +44,8 @@ def __init__(
self.token = token
self.host = host
self.verify_ssl = verify_ssl
self.refresh_delay = int(
os.environ.get("EDA_JOB_TEMPLATE_REFRESH_DELAY", 10)
self.refresh_delay = float(
os.environ.get("EDA_JOB_TEMPLATE_REFRESH_DELAY", 10.0)
)
self._session = None

Expand Down Expand Up @@ -89,65 +90,118 @@ def _sslcontext(self) -> Union[bool, ssl.SSLContext]:
return ssl.create_default_context(cafile=self.verify_ssl)
return False

async def _get_job_template_id(self, name: str, organization: str) -> int:
slug = f"{self.JOB_TEMPLATE_SLUG}/"
async def _get_template_obj(
self, name: str, organization: str, unified_type: str
) -> dict:
params = {"name": name}

while True:
json_body = await self._get_page(slug, params)
json_body = await self._get_page(
self.UNIFIED_TEMPLATE_SLUG, params
)
for jt in json_body["results"]:
if (
jt["name"] == name
and dpath.get(jt, "summary_fields.organization.name", ".")
jt["type"] == unified_type
and jt["name"] == name
and dpath.get(
jt,
"summary_fields.organization.name",
".",
organization,
)
== organization
):
return jt["id"]
return {
"launch": dpath.get(jt, "related.launch", ".", None),
"ask_limit_on_launch": jt["ask_limit_on_launch"],
"ask_inventory_on_launch": jt[
"ask_inventory_on_launch"
],
"ask_variables_on_launch": jt[
"ask_variables_on_launch"
],
}

if json_body.get("next", None):
params["page"] = params.get("page", 1) + 1
else:
break

raise JobTemplateNotFoundException(
(
f"Job template {name} in organization "
f"{organization} does not exist"
)
)

async def run_job_template(
self,
name: str,
organization: str,
job_params: dict,
) -> dict:
job = await self.launch(name, organization, job_params)
obj = await self._get_template_obj(name, organization, "job_template")
if not obj:
raise JobTemplateNotFoundException(
(
f"Job template {name} in organization "
f"{organization} does not exist"
)
)
url = urljoin(self.host, obj["launch"])
job = await self._launch(job_params, url)
return await self._monitor_job(job["url"])

url = job["url"]
params = {}
async def run_workflow_job_template(
self,
name: str,
organization: str,
job_params: dict,
) -> dict:
obj = await self._get_template_obj(
name, organization, "workflow_job_template"
)
if not obj:
raise WorkflowJobTemplateNotFoundException(
(
f"Workflow template {name} in organization "
f"{organization} does not exist"
)
)
url = urljoin(self.host, obj["launch"])
if not obj["ask_limit_on_launch"] and "limit" in job_params:
logger.warning(
"Workflow template %s does not accept limit, removing it", name
)
job_params.pop("limit")
if not obj["ask_variables_on_launch"] and "extra_vars" in job_params:
logger.warning(
"Workflow template %s does not accept extra vars, "
"removing it",
name,
)
job_params.pop("extra_vars")
job = await self._launch(job_params, url)
return await self._monitor_job(job["url"])

async def _monitor_job(self, url) -> dict:
while True:
# fetch and process job status
json_body = await self._get_page(url, params)
job_status = json_body["status"]
if job_status in self.JOB_COMPLETION_STATUSES:
json_body = await self._get_page(url, {})
if json_body["status"] in self.JOB_COMPLETION_STATUSES:
return json_body

await asyncio.sleep(self.refresh_delay)

async def launch(
self, name: str, organization: str, job_params: dict
) -> dict:
jt_id = await self._get_job_template_id(name, organization)
url = urljoin(self.host, f"{self.JOB_TEMPLATE_SLUG}/{jt_id}/launch/")

async def _launch(self, job_params: dict, url: str) -> dict:
body = None
try:
async with self._session.post(
url, json=job_params, ssl=self._sslcontext
url,
json=job_params,
ssl=self._sslcontext,
raise_for_status=False,
) as post_response:
return json.loads(await post_response.text())
body = json.loads(await post_response.text())
post_response.raise_for_status()
return body
except aiohttp.ClientError as e:
logger.error("Error connecting to controller %s", str(e))
if body:
logger.error("Error %s", body)
raise ControllerApiException(str(e))


Expand Down
Loading

0 comments on commit 050184c

Please sign in to comment.