diff --git a/pscheduler-server/pscheduler-server/daemons/runner b/pscheduler-server/pscheduler-server/daemons/runner index ed5a433b1..e7146aa9e 100755 --- a/pscheduler-server/pscheduler-server/daemons/runner +++ b/pscheduler-server/pscheduler-server/daemons/runner @@ -112,17 +112,17 @@ __run_start_margin = None def run_start_margin_set(db): global __run_start_margin - with db.cursor() as cursor: - try: - cursor.execute("SELECT run_start_margin FROM configurables") - except Exception as ex: - raise RuntimeError("Unable to get run start margin: %s" % (str(ex))) - if cursor.rowcount != 1: - raise RuntimeError("Failed to get run start margin") + try: + rows = list(db.query("SELECT run_start_margin FROM configurables")) + except Exception as ex: + raise RuntimeError("Unable to get run start margin: %s" % (str(ex))) + + if (len(rows) != 1) or (len(rows[0]) != 1): + raise RuntimeError("Unexpected return from start margin query: %s" % (str(rows))) - __run_start_margin = cursor.fetchone()[0] - log.debug("Run start margin set to %s", __run_start_margin) + __run_start_margin = rows[0][0] + log.debug("Run start margin set to %s", __run_start_margin) def run_start_margin(): @@ -605,11 +605,6 @@ class RunWorker(pscheduler.GenericWorker): if returncode == 0: try: local_result = pscheduler.json_load(stdout) - (is_valid, reason) = pscheduler.json_validate(local_result, - { "$ref": "#/pScheduler/ParticipantResult" }, - max_schema=1) - if not is_valid: - raise ValueError(); except ValueError: error = "Tool {} returned invalid JSON".format(tool) self.log.error("%d: %s '%s'", self.id, error, stdout) @@ -793,18 +788,15 @@ def main_program(): # This is for local use. - db = pscheduler.pg_connection(dsn) + db = pscheduler.PgConnection(dsn, name="runner") log.debug("Connected to DB") - with db.cursor() as cursor: - cursor.execute("SELECT heartbeat_boot('runner')") - # Listen for notifications. - for listen in ["run_new", "run_change", "configurables_changed" ]: log.debug("Listening for notification %s" % (listen)) - with db.cursor() as cursor: - cursor.execute("LISTEN %s" % (listen)) + db.listen(listen) + + db.query("SELECT heartbeat_boot('runner')") # Prime this for the first run wait_time = datetime.timedelta() @@ -822,141 +814,122 @@ def main_program(): if not pscheduler.timedelta_is_zero(wait_time): - # Wait for a notification or the wait time to elapse. Eat all - # notifications as a group; we only care that we were notified. - - # TODO: This try needs to be brought to the other programs. - # Better, make it a function in db.py. + # Wait for a notification or the wait time to elapse. - with db.cursor() as cursor: - cursor.execute("SELECT heartbeat('runner', %s)", [wait_time]) + db.query("SELECT heartbeat('runner', %s)", [wait_time]) worker_pool.groom() log.debug("Next run or check in %s", wait_time) - try: - if db.notifies or pscheduler.polled_select( - [db],[],[], - pscheduler.timedelta_as_seconds(wait_time)) \ - != ([],[],[]): - # Notified - db.poll() - - if len(db.notifies): - notifies = [ notify.channel for notify in db.notifies] - log.debug("Notifications: %s", notifies) - if 'configurables_changed' in notifies: - log.debug("Configurables changed.") - run_start_margin_set(db) - else: - log.debug("Schedule change.") - - # Remove only the first notification so we act on - # all of them. When scheduled start times are - # tight, removing everything causes runs to be missed. - db.notifies.pop(0) - - except select.error as ex: - - err_no, message = ex.args - if err_no != errno.EINTR: - log.exception() - raise ex - - - with db.cursor() as cursor: - cursor.execute("SELECT heartbeat('runner')") - - with db.cursor() as cursor: - - # Operate only on runs that are scheduled to start before the next - # forced refresh. - # TODO: Error check this. - cursor.execute(""" - SELECT * FROM ( - - -- Tasks that haven't started - SELECT - id AS run, - lower(times) - normalized_wall_clock() AS start_in, - lower(times) AS start_at, - FALSE as background_multi - FROM - run - WHERE - lower(run.times) < (normalized_wall_clock() + %s) - AND state = run_state_pending() - AND part_data_full IS NOT NULL - - UNION - - -- Background tasks that should be running. - SELECT - run.id AS run, - 'PT1S'::INTERVAL AS start_in, - normalized_wall_clock() + 'PT1S'::INTERVAL AS start_at, - TRUE as background_multi - FROM - run - JOIN task ON task.id = run.task - JOIN test ON test.id = task.test - WHERE - times @> normalized_now() - AND task.enabled - AND test.scheduling_class = scheduling_class_background_multi() - AND run.state IN (run_state_pending(), run_state_running()) - ) t - WHERE start_in > '0'::INTERVAL - ORDER BY start_in - """, [refresh]); - - - wait_time = refresh - - run_ids = [] - runs_started = False - - for row in cursor: - - run_id, start_in, start_at, background_multi = row - - if run_id in ids_running: - log.debug("%d is already running" % (run_id)) - continue - - log.debug("Run %d, starts at %s", run_id, start_at) - - run_ids.append(run_id) - try: + if db.wait(pscheduler.timedelta_as_seconds(refresh)): + notifies = [ notify[0] for notify in db.notifications() ] + if len(notifies) > 0: + log.debug("Notified: %s", notifies) + if 'configurables_changed' in notifies: + log.debug("Configurables changed.") + run_start_margin_set(db) + else: + log.debug("Schedule change.") - # Make a worker and throw it into the pool. - worker_pool(run_id, RunWorker(run_id, start_at, log.is_forced_debugging()), run_completion) - log.debug("%d: Created worker", run_id) - ids_running.add(run_id) + else: + + log.debug("Timed out.") - log.debug("%d processors in pool: %s", len(worker_pool), worker_pool.status()) + else: - except Exception as ex: + log.debug("Not waiting.") + + db.query("SELECT heartbeat('runner')") + + # Operate only on runs that are scheduled to start before the next + # forced refresh. + # TODO: This should be moved into a stored procedure + + RUNS_QUERY = """ + SELECT * FROM ( + + -- Tasks that haven't started + SELECT + id AS run, + lower(times) - normalized_wall_clock() AS start_in, + lower(times) AS start_at, + FALSE as background_multi + FROM + run + WHERE + lower(run.times) < (normalized_wall_clock() + %s) + AND state = run_state_pending() + AND part_data_full IS NOT NULL + + UNION + + -- Background tasks that should be running. + SELECT + run.id AS run, + 'PT1S'::INTERVAL AS start_in, + normalized_wall_clock() + 'PT1S'::INTERVAL AS start_at, + TRUE as background_multi + FROM + run + JOIN task ON task.id = run.task + JOIN test ON test.id = task.test + WHERE + times @> normalized_now() + AND task.enabled + AND test.scheduling_class = scheduling_class_background_multi() + AND run.state IN (run_state_pending(), run_state_running()) + ) t + WHERE start_in > '0'::INTERVAL + ORDER BY start_in + """ + + wait_time = refresh + + run_ids = [] + runs_started = False + + for row in db.query(RUNS_QUERY, [refresh]): + + run_id, start_in, start_at, background_multi = row + + if run_id in ids_running: + log.debug("%d is already running" % (run_id)) + continue + + log.debug("Run %d, starts at %s", run_id, start_at) + + run_ids.append(run_id) + + try: + + # Make a worker and throw it into the pool. + worker_pool(run_id, RunWorker(run_id, start_at, log.is_forced_debugging()), run_completion) + log.debug("%d: Created worker", run_id) + ids_running.add(run_id) + + log.debug("%d processors in pool: %s", len(worker_pool), worker_pool.status()) + + except Exception as ex: - # Any failure here means failure of the run. + # Any failure here means failure of the run. - diags = "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) - log.error("%d: Unable to start worker: %s" % (run_id, diags)) + diags = "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) + log.error("%d: Unable to start worker: %s" % (run_id, diags)) - with db.cursor() as failed: - failed.execute(""" - UPDATE RUN - SET - state = run_state_failed(), - status = 1, - errors = %s - WHERE id = %s - """, ["Failed to start worker: %s" % (diags), - run_id]) + with db.cursor() as failed: + failed.execute(""" + UPDATE RUN + SET + state = run_state_failed(), + status = 1, + errors = %s + WHERE id = %s + """, ["Failed to start worker: %s" % (diags), + run_id]) - log.error("Unable to set on-deck status: %s" % (str(ex))) + log.error("Unable to set on-deck status: %s" % (str(ex))) # Not that this will ever be reached...