Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/srun cancellation #3293

Open
wants to merge 5 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/radical/pilot/agent/agent_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,10 +541,12 @@ def _start_sub_agents(self):
}

# find a launcher to use
launcher = self._rm.find_launcher(agent_task)
launcher, lname = self._rm.find_launcher(agent_task)
if not launcher:
raise RuntimeError('no launch method found for sub agent')

self._log.debug('found sa launcher %s', lname)

# FIXME: set RP environment (as in Popen Executor)

tmp = '#!%s\n\n' % self._shell
Expand Down
46 changes: 30 additions & 16 deletions src/radical/pilot/agent/executing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@ def control_cb(self, topic, msg):

self._log.info('cancel_tasks command (%s)', arg)
for tid in arg['uids']:
self.cancel_task(tid)
task = self.get_task(tid)
if task:
self.cancel_task(task)


# --------------------------------------------------------------------------
#
def cancel_task(self, uid):
def cancel_task(self, task):

raise NotImplementedError('cancel_task is not implemented')

Expand All @@ -156,24 +158,35 @@ def _to_watcher(self):
`self._cancel_task(task)`. That has to be implemented by al executors.
'''

while not self._term.is_set():
# tasks to watch for timeout, sorted by absolut timeout timestamp
to_list = list()

# check once per second at most
time.sleep(1)
while not self._term.is_set():

now = time.time()
# collect new tasks to watch
with self._to_lock:

# running tasks for next check
to_list = list()
for to, start, task in self._to_tasks:
if now - start > to:
self._prof.prof('task_timeout', uid=task['uid'])
self.cancel_task(uid=task['uid'])
else:
to_list.append([to, start, task])
for task, cancel_time in self._to_tasks:
to_list.append([task, cancel_time])

self._to_tasks = list()

# avoid busy wait
if not to_list:
time.sleep(1)
continue

# sort by timeout, smallest first
to_list.sort(key=lambda x: x[1])

self._to_tasks = to_list
# cancel all tasks which have timed out
for task, cancel_time in to_list:
now = time.time()
if now > cancel_time:
self._prof.prof('task_timeout', uid=task['uid'])
self.cancel_task(task=task)
else:
break


# --------------------------------------------------------------------------
Expand All @@ -184,7 +197,8 @@ def handle_timeout(self, task):

if to > 0.0:
with self._to_lock:
self._to_tasks.append([to, time.time(), task])
cancel_time = time.time() + to
self._to_tasks.append([task, cancel_time])


# --------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/radical/pilot/agent/executing/dragon.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _launch_task(self, task):

# --------------------------------------------------------------------------
#
def cancel_task(self, uid):
def cancel_task(self, task):
'''
This method is called by the base class to actually
cancel the task.
Expand Down
193 changes: 96 additions & 97 deletions src/radical/pilot/agent/executing/popen.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@ def _kill(*args, **kwargs):
#
class Popen(AgentExecutingComponent):

# flags for the watcher queue
TO_WATCH = 0
TO_CANCEL = 1

# --------------------------------------------------------------------------
#
def initialize(self):

# self._log.debug('popen initialize start')
super().initialize()

self._tasks = dict()
self._watch_queue = queue.Queue()

# run watcher thread
Expand All @@ -68,10 +65,59 @@ def initialize(self):

# --------------------------------------------------------------------------
#
def cancel_task(self, uid):
def get_task(self, tid):

return self._tasks.get(tid)


# --------------------------------------------------------------------------
#
def cancel_task(self, task):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something wrong with states update - tmgr has the following states for cancelled tasks

state: "FAILED"
target_state: "CANCELED"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ cat agent_staging_output.0000.log | grep "m0.i0.s9.hello.2.0000"
1734567093.938 : agent_staging_output.0000 : 47102 : 140624563013184 : DEBUG    : task m0.i0.s9.hello.2.0000 skips staging: FAILED
1734567094.000 : agent_staging_output.0000 : 47102 : 140624563013184 : DEBUG    : task m0.i0.s9.hello.2.0000 skips staging: CANCELED

appears two times in staging output

Copy link
Contributor

@mtitov mtitov Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following fix 6449329 (and this 04a48bb) helped to overcome the issue (it is within the project/impeccable branch)


# was the task even started?
tid = task['uid']
proc = task.get('proc')
if not proc:
# task was not started, nothing to do
self._log.debug('task %s was not started', tid)
return

# check if the task is, maybe, already done
exit_code = proc.poll()
if exit_code is not None:
# task is done, nothing to do
self._log.debug('task %s is already done', tid)
return

# task is still running -- cancel it
self._log.debug('cancel %s', tid)
self._prof.prof('task_run_cancel_start', uid=tid)

launcher = self._rm.get_launcher(task['launcher_name'])
launcher.cancel_task(task, proc.pid)

proc.wait() # make sure proc is collected

try:
# might race with task collection
del task['proc'] # proc is not json serializable
except KeyError:
pass

task['exit_code'] = None
task['target_state'] = rps.CANCELED

self._log.debug('request cancel task %s', uid)
self._watch_queue.put([self.TO_CANCEL, uid])
self._prof.prof('task_run_cancel_stop', uid=tid)
self._prof.prof('unschedule_start', uid=tid)
self.publish(rpc.AGENT_UNSCHEDULE_PUBSUB, task)

self.advance([task], rps.AGENT_STAGING_OUTPUT_PENDING,
publish=True, push=True)

try:
del self._tasks[tid]
except KeyError:
pass


# --------------------------------------------------------------------------
Expand All @@ -84,6 +130,7 @@ def work(self, tasks):

try:
self._prof.prof('task_start', uid=task['uid'])
self._tasks.update({task['uid']: task})
self._handle_task(task)

except Exception as e:
Expand Down Expand Up @@ -195,7 +242,12 @@ def _handle_task(self, task):
# # now do the very same stuff for the `post_exec` directive
# ...

launcher = self._rm.find_launcher(task)
launcher, lname = self._rm.find_launcher(task)

if not launcher:
raise RuntimeError('no launcher found for %s' % task)

task['launcher_name'] = lname

exec_path , _ = self._create_exec_script(launcher, task)
_, launch_path = self._create_launch_script(launcher, task, exec_path)
Expand Down Expand Up @@ -232,15 +284,14 @@ def _handle_task(self, task):
self.handle_timeout(task)

# watch task for completion
self._watch_queue.put([self.TO_WATCH, task])
self._watch_queue.put(task)


# --------------------------------------------------------------------------
#
def _watch(self):

to_watch = list() # contains task dicts
to_cancel = set() # contains task IDs
to_watch = list() # contains task dicts

try:
while not self._term.is_set():
Expand All @@ -250,35 +301,24 @@ def _watch(self):
# also don't want to learn about tasks until all
# slots are filled, because then we may not be able
# to catch finishing tasks in time -- so there is
# a fine balance here. Balance means 100.
# a fine balance here. Fine balance means 100.
MAX_QUEUE_BULKSIZE = 100
count = 0

try:
while count < MAX_QUEUE_BULKSIZE:

flag, thing = self._watch_queue.get_nowait()
to_watch.append(self._watch_queue.get_nowait())
count += 1

# NOTE: `thing` can be task id or task dict, depending
# on the flag value
if flag == self.TO_WATCH : to_watch.append(thing)
elif flag == self.TO_CANCEL: to_cancel.add(thing)
else: raise RuntimeError('unknown flag %s' % flag)

except queue.Empty:
# nothing found -- no problem, see if any tasks finished
pass

# check on the known tasks.
action = self._check_running(to_watch, to_cancel)

# FIXME: remove uids from lists after completion
self._check_running(to_watch)

if not action and not count:
# nothing happened at all! Zzz for a bit.
# FIXME: make configurable
time.sleep(0.1)
if not count:
# no new tasks, no new state -- sleep a bit
time.sleep(0.05)

except Exception as e:
self._log.exception('Error in ExecWorker watch loop (%s)' % e)
Expand All @@ -288,9 +328,9 @@ def _watch(self):
# --------------------------------------------------------------------------
# Iterate over all running tasks, check their status, and decide on the
# next step. Also check for a requested cancellation for the tasks.
def _check_running(self, to_watch, to_cancel):
def _check_running(self, to_watch):

action = False
tasks_to_advance = list()

# `to_watch.remove()` in the loop requires copy to iterate over the list
for task in list(to_watch):
Expand All @@ -299,52 +339,7 @@ def _check_running(self, to_watch, to_cancel):

# poll subprocess object
exit_code = task['proc'].poll()

tasks_to_advance = list()
tasks_to_cancel = list()

if exit_code is None:

# process is still running - cancel if needed
if tid in to_cancel:

self._log.debug('cancel %s', tid)

action = True
self._prof.prof('task_run_cancel_start', uid=tid)

# got a request to cancel this task - send SIGTERM to the
# process group (which should include the actual launch
# method)
try:
# kill the whole process group.
# Try SIGINT first to allow signal handlers, then
# SIGTERM to allow clean termination, then SIGKILL to
# enforce termination.
pgrp = os.getpgid(task['proc'].pid)
os.killpg(pgrp, signal.SIGINT)
time.sleep(0.1)
os.killpg(pgrp, signal.SIGTERM)
time.sleep(0.1)
os.killpg(pgrp, signal.SIGKILL)

except OSError:
# lost race: task is already gone, we ignore this
# FIXME: collect and move to DONE/FAILED
pass

task['proc'].wait() # make sure proc is collected

to_cancel.remove(tid)
to_watch.remove(task)
del task['proc'] # proc is not json serializable

self._prof.prof('task_run_cancel_stop', uid=tid)

self._prof.prof('unschedule_start', uid=tid)
tasks_to_cancel.append(task)

else:
if exit_code is not None:

action = True
self._prof.prof('task_run_stop', uid=tid)
Expand All @@ -355,41 +350,45 @@ def _check_running(self, to_watch, to_cancel):
# we have a valid return code -- task is final
self._log.info("Task %s has return code %s.", tid, exit_code)

task['exit_code'] = exit_code

# Free the Slots, Flee the Flots, Ree the Frots!
to_watch.remove(task)
if tid in to_cancel:
to_cancel.remove(tid)
del task['proc'] # proc is not json serializable

try:
# might race with task cancellation
del task['proc'] # proc is not json serializable
except KeyError:
pass

tasks_to_advance.append(task)

self._prof.prof('unschedule_start', uid=tid)

if exit_code != 0:
# task failed - fail after staging output
task['exception'] = 'RuntimeError("task failed")'
task['exception_detail'] = 'exit code: %s' % exit_code
task['target_state' ] = rps.FAILED

else:
if exit_code == 0:
# The task finished cleanly, see if we need to deal with
# output data. We always move to stageout, even if there
# are no directives -- at the very least, we'll upload
# stdout/stderr
task['exit_code'] = exit_code
task['target_state'] = rps.DONE

self.publish(rpc.AGENT_UNSCHEDULE_PUBSUB,
tasks_to_cancel + tasks_to_advance)
else:
# task failed (we still run staging output)
task['exit_code'] = exit_code
task['exception'] = 'RuntimeError("task failed")'
task['exception_detail'] = 'exit code: %s' % exit_code
task['target_state'] = rps.FAILED

if tasks_to_cancel:
self.advance(tasks_to_cancel, rps.CANCELED,
publish=True, push=False)
if tasks_to_advance:
self.advance(tasks_to_advance, rps.AGENT_STAGING_OUTPUT_PENDING,
publish=True, push=True)
self.publish(rpc.AGENT_UNSCHEDULE_PUBSUB, tasks_to_advance)

return action
if tasks_to_advance:
self.advance(tasks_to_advance, rps.AGENT_STAGING_OUTPUT_PENDING,
publish=True, push=True)

for task in tasks_to_advance:
try:
del self._tasks[task['uid']]
except KeyError:
pass


# ------------------------------------------------------------------------------
Expand Down
Loading
Loading