Skip to content

Commit

Permalink
Use PgConnection for main loop. Part of #1373.
Browse files Browse the repository at this point in the history
  • Loading branch information
mfeit-internet2 committed Dec 8, 2023
1 parent 3cb4bfb commit 2cb5022
Showing 1 changed file with 115 additions and 142 deletions.
257 changes: 115 additions & 142 deletions pscheduler-server/pscheduler-server/daemons/runner
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,17 @@ __run_start_margin = None

def run_start_margin_set(db):
global __run_start_margin
with db.cursor() as cursor:
try:
cursor.execute("SELECT run_start_margin FROM configurables")
except Exception as ex:
raise RuntimeError("Unable to get run start margin: %s" % (str(ex)))

if cursor.rowcount != 1:
raise RuntimeError("Failed to get run start margin")
try:
rows = list(db.query("SELECT run_start_margin FROM configurables"))
except Exception as ex:
raise RuntimeError("Unable to get run start margin: %s" % (str(ex)))

if (len(rows) != 1) or (len(rows[0]) != 1):
raise RuntimeError("Unexpected return from start margin query: %s" % (str(rows)))

__run_start_margin = cursor.fetchone()[0]
log.debug("Run start margin set to %s", __run_start_margin)
__run_start_margin = rows[0][0]
log.debug("Run start margin set to %s", __run_start_margin)


def run_start_margin():
Expand Down Expand Up @@ -605,11 +605,6 @@ class RunWorker(pscheduler.GenericWorker):
if returncode == 0:
try:
local_result = pscheduler.json_load(stdout)
(is_valid, reason) = pscheduler.json_validate(local_result,
{ "$ref": "#/pScheduler/ParticipantResult" },
max_schema=1)
if not is_valid:
raise ValueError();
except ValueError:
error = "Tool {} returned invalid JSON".format(tool)
self.log.error("%d: %s '%s'", self.id, error, stdout)
Expand Down Expand Up @@ -793,18 +788,15 @@ def main_program():


# This is for local use.
db = pscheduler.pg_connection(dsn)
db = pscheduler.PgConnection(dsn, name="runner")
log.debug("Connected to DB")

with db.cursor() as cursor:
cursor.execute("SELECT heartbeat_boot('runner')")

# Listen for notifications.

for listen in ["run_new", "run_change", "configurables_changed" ]:
log.debug("Listening for notification %s" % (listen))
with db.cursor() as cursor:
cursor.execute("LISTEN %s" % (listen))
db.listen(listen)

db.query("SELECT heartbeat_boot('runner')")

# Prime this for the first run
wait_time = datetime.timedelta()
Expand All @@ -822,141 +814,122 @@ def main_program():

if not pscheduler.timedelta_is_zero(wait_time):

# Wait for a notification or the wait time to elapse. Eat all
# notifications as a group; we only care that we were notified.

# TODO: This try needs to be brought to the other programs.
# Better, make it a function in db.py.
# Wait for a notification or the wait time to elapse.

with db.cursor() as cursor:
cursor.execute("SELECT heartbeat('runner', %s)", [wait_time])
db.query("SELECT heartbeat('runner', %s)", [wait_time])

worker_pool.groom()

log.debug("Next run or check in %s", wait_time)
try:
if db.notifies or pscheduler.polled_select(
[db],[],[],
pscheduler.timedelta_as_seconds(wait_time)) \
!= ([],[],[]):
# Notified
db.poll()

if len(db.notifies):
notifies = [ notify.channel for notify in db.notifies]
log.debug("Notifications: %s", notifies)
if 'configurables_changed' in notifies:
log.debug("Configurables changed.")
run_start_margin_set(db)
else:
log.debug("Schedule change.")

# Remove only the first notification so we act on
# all of them. When scheduled start times are
# tight, removing everything causes runs to be missed.
db.notifies.pop(0)

except select.error as ex:

err_no, message = ex.args
if err_no != errno.EINTR:
log.exception()
raise ex


with db.cursor() as cursor:
cursor.execute("SELECT heartbeat('runner')")

with db.cursor() as cursor:

# Operate only on runs that are scheduled to start before the next
# forced refresh.
# TODO: Error check this.
cursor.execute("""
SELECT * FROM (
-- Tasks that haven't started
SELECT
id AS run,
lower(times) - normalized_wall_clock() AS start_in,
lower(times) AS start_at,
FALSE as background_multi
FROM
run
WHERE
lower(run.times) < (normalized_wall_clock() + %s)
AND state = run_state_pending()
AND part_data_full IS NOT NULL
UNION
-- Background tasks that should be running.
SELECT
run.id AS run,
'PT1S'::INTERVAL AS start_in,
normalized_wall_clock() + 'PT1S'::INTERVAL AS start_at,
TRUE as background_multi
FROM
run
JOIN task ON task.id = run.task
JOIN test ON test.id = task.test
WHERE
times @> normalized_now()
AND task.enabled
AND test.scheduling_class = scheduling_class_background_multi()
AND run.state IN (run_state_pending(), run_state_running())
) t
WHERE start_in > '0'::INTERVAL
ORDER BY start_in
""", [refresh]);


wait_time = refresh

run_ids = []
runs_started = False

for row in cursor:

run_id, start_in, start_at, background_multi = row

if run_id in ids_running:
log.debug("%d is already running" % (run_id))
continue

log.debug("Run %d, starts at %s", run_id, start_at)

run_ids.append(run_id)

try:
if db.wait(pscheduler.timedelta_as_seconds(refresh)):
notifies = [ notify[0] for notify in db.notifications() ]
if len(notifies) > 0:
log.debug("Notified: %s", notifies)
if 'configurables_changed' in notifies:
log.debug("Configurables changed.")
run_start_margin_set(db)
else:
log.debug("Schedule change.")

# Make a worker and throw it into the pool.
worker_pool(run_id, RunWorker(run_id, start_at, log.is_forced_debugging()), run_completion)
log.debug("%d: Created worker", run_id)
ids_running.add(run_id)
else:

log.debug("Timed out.")

log.debug("%d processors in pool: %s", len(worker_pool), worker_pool.status())
else:

except Exception as ex:
log.debug("Not waiting.")

db.query("SELECT heartbeat('runner')")

# Operate only on runs that are scheduled to start before the next
# forced refresh.
# TODO: This should be moved into a stored procedure

RUNS_QUERY = """
SELECT * FROM (
-- Tasks that haven't started
SELECT
id AS run,
lower(times) - normalized_wall_clock() AS start_in,
lower(times) AS start_at,
FALSE as background_multi
FROM
run
WHERE
lower(run.times) < (normalized_wall_clock() + %s)
AND state = run_state_pending()
AND part_data_full IS NOT NULL
UNION
-- Background tasks that should be running.
SELECT
run.id AS run,
'PT1S'::INTERVAL AS start_in,
normalized_wall_clock() + 'PT1S'::INTERVAL AS start_at,
TRUE as background_multi
FROM
run
JOIN task ON task.id = run.task
JOIN test ON test.id = task.test
WHERE
times @> normalized_now()
AND task.enabled
AND test.scheduling_class = scheduling_class_background_multi()
AND run.state IN (run_state_pending(), run_state_running())
) t
WHERE start_in > '0'::INTERVAL
ORDER BY start_in
"""

wait_time = refresh

run_ids = []
runs_started = False

for row in db.query(RUNS_QUERY, [refresh]):

run_id, start_in, start_at, background_multi = row

if run_id in ids_running:
log.debug("%d is already running" % (run_id))
continue

log.debug("Run %d, starts at %s", run_id, start_at)

run_ids.append(run_id)

try:

# Make a worker and throw it into the pool.
worker_pool(run_id, RunWorker(run_id, start_at, log.is_forced_debugging()), run_completion)
log.debug("%d: Created worker", run_id)
ids_running.add(run_id)

log.debug("%d processors in pool: %s", len(worker_pool), worker_pool.status())

except Exception as ex:

# Any failure here means failure of the run.
# Any failure here means failure of the run.

diags = "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
log.error("%d: Unable to start worker: %s" % (run_id, diags))
diags = "\n".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
log.error("%d: Unable to start worker: %s" % (run_id, diags))

with db.cursor() as failed:
failed.execute("""
UPDATE RUN
SET
state = run_state_failed(),
status = 1,
errors = %s
WHERE id = %s
""", ["Failed to start worker: %s" % (diags),
run_id])
with db.cursor() as failed:
failed.execute("""
UPDATE RUN
SET
state = run_state_failed(),
status = 1,
errors = %s
WHERE id = %s
""", ["Failed to start worker: %s" % (diags),
run_id])


log.error("Unable to set on-deck status: %s" % (str(ex)))
log.error("Unable to set on-deck status: %s" % (str(ex)))


# Not that this will ever be reached...
Expand Down

0 comments on commit 2cb5022

Please sign in to comment.