Skip to content

Commit

Permalink
[AAP-11788] Pulling job instead of job_events to get job status (#501)
Browse files Browse the repository at this point in the history
We no longer need to log job events. Therefore pulling job for status is
much faster than parsing status from job events.

Fixes AAP-11788:EDA should not hit job_events endpoint on controller
jobs
  • Loading branch information
bzwei authored May 15, 2023
2 parents dde9003 + bd8dd75 commit 3a150fc
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 5,363 deletions.
22 changes: 0 additions & 22 deletions ansible_rulebook/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,28 +772,6 @@ async def run_job_template(
)
)

async def event_callback(event: dict) -> None:
await event_log.put(
{
"type": "AnsibleEvent",
"event": {
"uuid": event["uuid"],
"counter": event["counter"],
"stdout": event["stdout"],
"start_line": event["start_line"],
"end_line": event["end_line"],
"event": event["event"],
"created": event["created"],
"parent_uuid": event["parent_uuid"],
"event_data": event["event_data"],
"job_id": job_id,
"controller_job_id": event["job"],
"ansible_rulebook_id": settings.identifier,
},
"run_at": event["created"],
}
)

if retry:
retries = max(retries, 1)

Expand Down
34 changes: 7 additions & 27 deletions ansible_rulebook/job_template_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import os
import ssl
from functools import cached_property
from typing import Any, Callable, Union
from urllib.parse import parse_qsl, urljoin, urlparse
from typing import Union
from urllib.parse import urljoin

import aiohttp
import dpath
Expand Down Expand Up @@ -117,42 +117,22 @@ async def run_job_template(
name: str,
organization: str,
job_params: dict,
event_handler: Union[Callable[[dict], Any], None] = None,
) -> dict:
job = await self.launch(name, organization, job_params)

url_info = urlparse(job["url"])
url = f"{url_info.path}job_events/"
counters = []
params = dict(parse_qsl(url_info.query))
url = job["url"]
params = {}

async with aiohttp.ClientSession(
headers=self._auth_headers()
) as session:
while True:
# fetch and process job events
# fetch and process job status
response = await self._get_page(session, url, params)
json_body = json.loads(response["body"])
job_status = None
for event in json_body["results"]:
job_status = dpath.get(
event, "summary_fields.job.status", "."
)
counter = event["counter"]
if counter not in counters:
counters.append(counter)
logger.debug(event["stdout"])
if event_handler:
await event_handler(event)

if json_body.get("next", None):
params["page"] = params.get("page", 1) + 1
continue

job_status = json_body["status"]
if job_status in self.JOB_COMPLETION_STATUSES:
# fetch and return job object containing artifacts
response = await self._get_page(session, url_info.path, {})
return json.loads(response["body"])
return json_body

await asyncio.sleep(self.refresh_delay)

Expand Down
Loading

0 comments on commit 3a150fc

Please sign in to comment.