Skip to content

Commit

Permalink
Pulling job instead of job_events to get job status
Browse files Browse the repository at this point in the history
We no longer need to log job events. Therefore pulling job for status
is much faster than parsing status from job events.

Fixes AAP-11677:EDA should not hit job_events endpoint on controller jobs
  • Loading branch information
bzwei committed May 3, 2023
1 parent 554bf7a commit 34f7546
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 5,363 deletions.
22 changes: 0 additions & 22 deletions ansible_rulebook/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,28 +771,6 @@ async def run_job_template(
)
)

async def event_callback(event: dict) -> None:
await event_log.put(
{
"type": "AnsibleEvent",
"event": {
"uuid": event["uuid"],
"counter": event["counter"],
"stdout": event["stdout"],
"start_line": event["start_line"],
"end_line": event["end_line"],
"event": event["event"],
"created": event["created"],
"parent_uuid": event["parent_uuid"],
"event_data": event["event_data"],
"job_id": job_id,
"controller_job_id": event["job"],
"ansible_rulebook_id": settings.identifier,
},
"run_at": event["created"],
}
)

if retry:
retries = max(retries, 1)

Expand Down
34 changes: 7 additions & 27 deletions ansible_rulebook/job_template_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import os
import ssl
from functools import cached_property
from typing import Any, Callable, Union
from urllib.parse import parse_qsl, urljoin, urlparse
from typing import Union
from urllib.parse import urljoin

import aiohttp
import dpath
Expand Down Expand Up @@ -114,42 +114,22 @@ async def run_job_template(
name: str,
organization: str,
job_params: dict,
event_handler: Union[Callable[[dict], Any], None] = None,
) -> dict:
job = await self.launch(name, organization, job_params)

url_info = urlparse(job["url"])
url = f"{url_info.path}job_events/"
counters = []
params = dict(parse_qsl(url_info.query))
url = job["url"]
params = {}

async with aiohttp.ClientSession(
headers=self._auth_headers()
) as session:
while True:
# fetch and process job events
# fetch and process job status
response = await self._get_page(session, url, params)
json_body = json.loads(response["body"])
job_status = None
for event in json_body["results"]:
job_status = dpath.get(
event, "summary_fields.job.status", "."
)
counter = event["counter"]
if counter not in counters:
counters.append(counter)
logger.debug(event["stdout"])
if event_handler:
await event_handler(event)

if json_body.get("next", None):
params["page"] = params.get("page", 1) + 1
continue

job_status = json_body["status"]
if job_status in self.JOB_COMPLETION_STATUSES:
# fetch and return job object containing artifacts
response = await self._get_page(session, url_info.path, {})
return json.loads(response["body"])
return json_body

await asyncio.sleep(self.refresh_delay)

Expand Down
Loading

0 comments on commit 34f7546

Please sign in to comment.