diff --git a/python/res/job_queue/queue.py b/python/res/job_queue/queue.py index 93e4d0a43..9e90203ad 100644 --- a/python/res/job_queue/queue.py +++ b/python/res/job_queue/queue.py @@ -413,11 +413,11 @@ def execute_queue(self, pool_sema, evaluators): self._transition() @staticmethod - def _translate_change_to_cloudevent(real_id, status): + def _translate_change_to_cloudevent(ee_id, real_id, status): return CloudEvent( { "type": _queue_state_event_type(status), - "source": f"/ert/ee/{0}/real/{real_id}/step/{0}", + "source": f"/ert/ee/{ee_id}/real/{real_id}/step/{0}", "datacontenttype": "application/json", }, { @@ -426,16 +426,16 @@ def _translate_change_to_cloudevent(real_id, status): ) @staticmethod - async def _publish_changes(changes, websocket): + async def _publish_changes(ee_id, changes, websocket): events = [ - JobQueue._translate_change_to_cloudevent(real_id, status) + JobQueue._translate_change_to_cloudevent(ee_id, real_id, status) for real_id, status in changes.items() ] for event in events: await websocket.send(to_json(event)) async def execute_queue_async( - self, ws_uri, pool_sema, evaluators, cert=None, token=None + self, ws_uri, ee_id, pool_sema, evaluators, cert=None, token=None ): if cert is not None: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) @@ -448,7 +448,7 @@ async def execute_queue_async( async with websockets.connect( ws_uri, ssl=ssl_context, extra_headers=headers ) as websocket: - await JobQueue._publish_changes(self.snapshot(), websocket) + await JobQueue._publish_changes(ee_id, self.snapshot(), websocket) try: while self.is_active() and not self.stopped: @@ -461,7 +461,7 @@ async def execute_queue_async( func() await JobQueue._publish_changes( - self._changes_after_transition(), websocket + ee_id, self._changes_after_transition(), websocket ) except asyncio.CancelledError: if self.stopped: @@ -478,7 +478,7 @@ async def execute_queue_async( logger.debug("jobs now stopped") self.assert_complete() self._transition() - await JobQueue._publish_changes(self.snapshot(), websocket) + await JobQueue._publish_changes(ee_id, self.snapshot(), websocket) def add_job_from_run_arg(self, run_arg, res_config, max_runtime, ok_cb, exit_cb): job_name = run_arg.job_name