From 3201480e13eac843a7c6cb3dc578a8870472373a Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 14 Jun 2023 21:58:55 +0200 Subject: [PATCH 1/5] snapshot on service startup flag --- examples/agent_services.py | 9 +++-- setup.py | 1 + src/radical/pilot/agent/agent_0.py | 56 +++++++++++++++++++++++++++--- 3 files changed, 59 insertions(+), 7 deletions(-) diff --git a/examples/agent_services.py b/examples/agent_services.py index f854af6536..1c231e13cb 100755 --- a/examples/agent_services.py +++ b/examples/agent_services.py @@ -43,6 +43,11 @@ # Define an [n]-core local pilot that runs for [x] minutes # Here we use a dict to initialize the description object + + sd = rp.TaskDescription({'executable': '/bin/sh', + 'arguments' : ['-c', 'radical-pilot-service-signal'], + 'named_env' : 'rp'}) + pd_init = {'resource' : resource, 'runtime' : 30, # pilot runtime (min) 'exit_on_error' : True, @@ -51,9 +56,7 @@ 'access_schema' : config.get('schema'), 'cores' : config.get('cores', 1), 'gpus' : config.get('gpus', 0), - # TODO create shell script - 'services' :[rp.TaskDescription({'executable':'free -h'}), - rp.TaskDescription({'executable':'free -h'}) ] + 'services' : [sd, sd] } pdesc = rp.PilotDescription(pd_init) diff --git a/setup.py b/setup.py index 5354e774ab..05dea0b6a9 100755 --- a/setup.py +++ b/setup.py @@ -263,6 +263,7 @@ def run(self): 'bin/radical-pilot-raptor-worker', 'bin/radical-pilot-resources', 'bin/radical-pilot-run-session', + 'bin/radical-pilot-service-signal', 'bin/radical-pilot-stats', 'bin/radical-pilot-stats.plot', 'bin/radical-pilot-ve', diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index f6b2b658d3..6ebc2f4cf2 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -694,11 +694,24 @@ def _check_control(self, _, msg): ''' cmd = msg['cmd'] - arg = msg['arg'] - if cmd != 'rpc_req': - # not an rpc request - return True + if cmd == 'rpc_req': + return self._ctrl_rpc(msg) + + elif cmd == 'service_up': + self._log.debug('=== got service_up: %s', msg) + return self._ctrl_service_up(msg) + + # ignore all other message types + return True + + + # -------------------------------------------------------------------------- + # + def _ctrl_rpc(self, msg): + + cmd = msg['cmd'] + arg = msg['arg'] req = arg['rpc'] if req not in ['hello', 'prepare_env']: @@ -737,6 +750,41 @@ def _check_control(self, _, msg): return True + # -------------------------------------------------------------------------- + # + def _ctrl_service_up(self, msg): + + cmd = msg['cmd'] + uid = msg['arg']['uid'] + + # This message signals that an agent service instance is up and running. + # We expect to find the service UID in args and can then unblock the + # service startup wait for that uid + + if uid not in self._service_uids_launched: + # we do not know this service instance + self._log.warn('=== ignore service startup signal for %s', uid) + return True + + if uid in self._service_uids_running: + self._log.warn('=== duplicated service startup signal for %s', uid) + return True + + self._log.debug('=== service startup message for %s', uid) + + self._service_uids_running.append(uid) + self._log.debug('=== service %s started (%s / %s)', uid, + len(self._service_uids_running), + len(self._service_uids_launched)) + + # signal main thread when all services are up + if len(self._service_uids_launched) == \ + len(self._service_uids_running): + self._services_setup.set() + + return True + + # -------------------------------------------------------------------------- # def _check_state(self): From b6e53197e3db2727a8e689f2042465186562b2e3 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 14 Jun 2023 22:39:07 +0200 Subject: [PATCH 2/5] add missing file --- bin/radical-pilot-service-signal | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100755 bin/radical-pilot-service-signal diff --git a/bin/radical-pilot-service-signal b/bin/radical-pilot-service-signal new file mode 100755 index 0000000000..aa931e491f --- /dev/null +++ b/bin/radical-pilot-service-signal @@ -0,0 +1,46 @@ +#!/bin/sh + +HELP=$(cat < + + uid: UID of the service whose startup completed + + +This script is expected to be executed by a service instance which was started +by the pilot agent. The agent will block any further activity until all started +services signal theor readiness. A service specification may define a timeout +after which the startup is declaired as failed and the agent will abort. + +Internally the script will activate the agent's virtualenv and then run a small +embedded Python script which sends a message to the Agent's control channel, +informing it about the service startup. +EOT +) + +SCRIPT=$(cat < Date: Wed, 9 Aug 2023 13:06:40 +0200 Subject: [PATCH 3/5] merge from devel --- bin/radical-pilot-service-signal | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/bin/radical-pilot-service-signal b/bin/radical-pilot-service-signal index aa931e491f..4bce4a1eab 100755 --- a/bin/radical-pilot-service-signal +++ b/bin/radical-pilot-service-signal @@ -25,11 +25,16 @@ import os import radical.utils as ru -reg = ru.zmq.RegistryClient(url=os.environ['RP_REGISTRY_URL']) -pub = ru.zmq.Publisher('control_pubsub', reg['bridges.control_pubsub.pub_addr']) +reg_addr = os.environ['RP_REGISTRY_URL'] +reg = ru.zmq.RegistryClient(url=reg_addr) + +ctr_addr = reg['bridges.control_pubsub.addr_pub'] +task_uid = os.environ['RP_TASK_ID'] + +pub = ru.zmq.Publisher('control_pubsub', ctr_addr) pub.put('control_pubsub', msg={'cmd': 'service_up', - 'uid': os.environ['RP_TASK_UID']}) + 'uid': task_uid}) # make sure the message goes out time.sleep(1) @@ -43,4 +48,5 @@ cd $RP_PILOT_SANDBOX set -x echo "$SCRIPT" | python3 +echo "Done" From 5fdb9b98f073c05da39257ccd40d40e77ee4e508 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Sat, 16 Mar 2024 21:54:29 +0100 Subject: [PATCH 4/5] merge --- src/radical/pilot/agent/agent_0.py | 786 ++++++++++------------------- 1 file changed, 274 insertions(+), 512 deletions(-) diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index 6ebc2f4cf2..2fb1005d22 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -4,9 +4,9 @@ import copy import os -import pprint import stat import time +import pprint import threading as mt @@ -17,161 +17,111 @@ from .. import constants as rpc from .. import Session from .. import TaskDescription, AGENT_SERVICE -from ..db import DBSession - -from .resource_manager import ResourceManager # ------------------------------------------------------------------------------ # -class Agent_0(rpu.Worker): +class Agent_0(rpu.AgentComponent): ''' This is the main agent. It starts sub-agents and watches them. If any of the sub-agents die, it will shut down the other sub-agents and itself. - This class inherits the rpu.Worker, so that it can use its communication - bridges and callback mechanisms. Specifically, it will pull the DB for - new tasks to be executed and forwards them to the agent's component - network (see `work()`). It will also watch the DB for any commands to be - forwarded (pilot termination, task cancellation, etc.), and will take care - of heartbeat messages to be sent to the client module. To do all this, it - initializes a DB connection in `initialize()`. + This class inherits the rpu.AgentComponent, so that it can use its + communication bridges and callback mechanisms. ''' # -------------------------------------------------------------------------- # - def __init__(self, cfg: ru.Config, session: Session): + def __init__(self): + + cfg = ru.Config(path='./agent_0.cfg') - self._uid = 'agent.0' - self._cfg = cfg + self._uid = cfg.uid self._pid = cfg.pid self._sid = cfg.sid + self._owner = cfg.owner self._pmgr = cfg.pmgr self._pwd = cfg.pilot_sandbox - self._session = session - self._log = ru.Logger(self._uid, ns='radical.pilot') - self._starttime = time.time() - self._final_cause = None + self._session = Session(uid=cfg.sid, cfg=cfg, _role=Session._AGENT_0) - # this is the earliest point to sync bootstrap and agent profiles - self._prof = ru.Profiler(ns='radical.pilot', name=self._uid) - self._prof.prof('hostname', uid=cfg.pid, msg=ru.get_hostname()) + self._rm = self._session.get_rm() - # run an inline registry service to share runtime config with other - # agents and components - reg_uid = 'radical.pilot.reg.%s' % self._uid - self._reg_service = ru.zmq.Registry(uid=reg_uid) - self._reg_service.start() - self._reg_addr = self._reg_service.addr + # init the worker / component base classes, connects registry + super().__init__(cfg, self._session) - # let all components know where to look for the registry - self._cfg['reg_addr'] = self._reg_addr + self._starttime = time.time() + self._final_cause = None - # connect to MongoDB for state push/pull - self._connect_db() + # keep some state about service startups + self._service_uids_launched = list() + self._service_uids_running = list() + self._services_setup = mt.Event() - # configure ResourceManager before component startup, as components need - # ResourceManager information for function (scheduler, executor) - self._configure_rm() + # this is the earliest point to sync bootstrap and agent profiles + self._prof.prof('hostname', uid=cfg.pid, msg=ru.get_hostname()) # ensure that app communication channels are visible to workload self._configure_app_comm() - # expose heartbeat channel to sub-agents, bridges and components, - # and start those - self._cmgr = rpu.ComponentManager(self._cfg) - self._cfg.heartbeat = self._cmgr.cfg.heartbeat - - self._cmgr.start_bridges() - self._cmgr.start_components() + # start the sub agents + self._start_sub_agents() - # service tasks uids, which were launched - self._service_uids_launched = list() - # service tasks uids, which were confirmed to be started - self._service_uids_running = list() - # set flag when all services are running - self._services_setup = mt.Event() - - # create the sub-agent configs and start the sub agents - self._write_sa_configs() - self._start_sub_agents() # TODO: move to cmgr? - - # at this point the session is up and connected, and it should have - # brought up all communication bridges and components. We are - # ready to rumble! - rpu.Worker.__init__(self, self._cfg, session) - - self.register_subscriber(rpc.CONTROL_PUBSUB, self._check_control) - self.register_subscriber(rpc.STATE_PUBSUB, self._service_state_cb) - - # run our own slow-paced heartbeat monitor to watch pmgr heartbeats - # FIXME: we need to get pmgr freq - freq = 60 - tint = freq / 3 - tout = freq * 10 - self._hb = ru.Heartbeat(uid=self._uid, - timeout=tout, - interval=tint, - beat_cb=self._hb_check, # no own heartbeat(pmgr pulls) - term_cb=self._hb_term_cb, - log=self._log) - self._hb.start() - - # register pmgr heartbeat - self._log.info('hb init for %s', self._pmgr) - self._hb.beat(uid=self._pmgr) + # regularly check for lifetime limit + self.register_timed_cb(self._check_lifetime, timer=10) # -------------------------------------------------------------------------- # - def _hb_check(self): + def _proxy_input_cb(self, msg): - self._log.debug('hb check') + self._log.debug_8('proxy input cb: %s', len(msg)) + to_advance = list() - # -------------------------------------------------------------------------- - # - def _hb_term_cb(self, msg=None): + for task in msg: - self._cmgr.close() - self._log.warn('hb termination: %s', msg) + # make sure the tasks obtain env settings (if needed) + if 'task_environment' in self.session.rcfg: - return None + if not task['description'].get('environment'): + task['description']['environment'] = dict() + for k,v in self.session.rcfg.task_environment.items(): + # FIXME: this might overwrite user specified env + task['description']['environment'][k] = v + + # FIXME: raise or fail task! + if task['state'] != rps.AGENT_STAGING_INPUT_PENDING: + self._log.error('invalid state: %s:%s:%s', task['uid'], + task['state'], task.get('states')) + continue + + to_advance.append(task) + + # now we really own the tasks and can start working on them (ie. push + # them into the pipeline). We don't publish nor profile as advance, + # since the state transition happened already on the client side when + # the state was set. + self.advance(to_advance, publish=False, push=True) - # -------------------------------------------------------------------------- - # - def _connect_db(self): - - # Check for the RADICAL_PILOT_DB_HOSTPORT env var, which will hold - # the address of the tunnelized DB endpoint. If it exists, we - # overrule the agent config with it. - hostport = os.environ.get('RADICAL_PILOT_DB_HOSTPORT') - if hostport: - host, port = hostport.split(':', 1) - dburl = ru.Url(self._cfg.dburl) - dburl.host = host - dburl.port = port - self._cfg.dburl = str(dburl) - - self._dbs = DBSession(sid=self._cfg.sid, dburl=self._cfg.dburl, - cfg=self._cfg, log=self._log) # -------------------------------------------------------------------------- # - def _configure_rm(self): + def _proxy_output_cb(self, msg): + + # we just forward the tasks to the task proxy queue + self._log.debug('proxy output cb: %s', len(msg)) + self.advance(msg, publish=False, push=True, qname=self._sid) - # Create ResourceManager which will give us the set of agent_nodes to - # use for sub-agent startup. Add the remaining ResourceManager - # information to the config, for the benefit of the scheduler). - self._rm = ResourceManager.create(name=self._cfg.resource_manager, - cfg=self._cfg, log=self._log, - prof=self._prof) + # -------------------------------------------------------------------------- + # + def _client_ctrl_cb(self, topic, msg): - self._log.debug(pprint.pformat(self._rm.info)) + self._log.debug('ctl sub cb: %s %s', topic, msg) + ## FIXME? # -------------------------------------------------------------------------- @@ -182,40 +132,56 @@ def _configure_app_comm(self): # channels, merge those into the agent config # # FIXME: this needs to start the app_comm bridges - app_comm = self._cfg.get('app_comm') + app_comm = self.session.rcfg.get('app_comm') if app_comm: + + # bridge addresses also need to be exposed to the workload + if 'task_environment' not in self.session.rcfg: + self.session.rcfg['task_environment'] = dict() + if isinstance(app_comm, list): app_comm = {ac: {'bulk_size': 0, 'stall_hwm': 1, 'log_level': 'error'} for ac in app_comm} for ac in app_comm: - if ac in self._cfg['bridges']: + + if ac in self._reg['bridges']: raise ValueError('reserved app_comm name %s' % ac) - self._cfg['bridges'][ac] = app_comm[ac] + self._reg['bridges.%s' % ac] = app_comm[ac] - # some of the bridge addresses also need to be exposed to the workload - if app_comm: - if 'task_environment' not in self._cfg: - self._cfg['task_environment'] = dict() - for ac in app_comm: - if ac not in self._cfg['bridges']: - raise RuntimeError('missing app_comm %s' % ac) - self._cfg['task_environment']['RP_%s_IN' % ac.upper()] = \ - self._cfg['bridges'][ac]['addr_in'] - self._cfg['task_environment']['RP_%s_OUT' % ac.upper()] = \ - self._cfg['bridges'][ac]['addr_out'] + AC = ac.upper() + + self.session.rcfg.task_environment['RP_%s_IN' % AC] = ac['addr_in'] + self.session.rcfg.task_environment['RP_%s_OUT' % AC] = ac['addr_out'] # -------------------------------------------------------------------------- # def initialize(self): - # registers the staging_input_queue as this is what we want to push - # tasks to + # listen for new tasks from the client + self.register_input(rps.AGENT_STAGING_INPUT_PENDING, + rpc.PROXY_TASK_QUEUE, + qname=self._pid, + cb=self._proxy_input_cb) + + # and forward to agent input staging self.register_output(rps.AGENT_STAGING_INPUT_PENDING, rpc.AGENT_STAGING_INPUT_QUEUE) + # listen for completed tasks to forward to client + self.register_input(rps.TMGR_STAGING_OUTPUT_PENDING, + rpc.AGENT_COLLECTING_QUEUE, + cb=self._proxy_output_cb) + + # and register output + self.register_output(rps.TMGR_STAGING_OUTPUT_PENDING, + rpc.PROXY_TASK_QUEUE) + + self.register_rpc_handler('prepare_env', self._prepare_env, + rpc_addr=self._pid) + # before we run any tasks, prepare a named_env `rp` for tasks which use # the pilot's own environment, such as raptors env_spec = {'type' : os.environ['RP_VENV_TYPE'], @@ -225,29 +191,28 @@ def initialize(self): 'export PATH=%s' % os.environ.get('PATH', '')] } - self._prepare_env('rp', env_spec) - - # register the command callback which pulls the DB for commands - self.register_timed_cb(self._agent_control_cb, - timer=self._cfg['db_poll_sleeptime']) - - # register idle callback to pull for tasks - self.register_timed_cb(self._check_tasks_cb, - timer=self._cfg['db_poll_sleeptime']) + self.rpc('prepare_env', env_name='rp', env_spec=env_spec, + rpc_addr=self._pid) + # start any services if they are requested self._start_services() # sub-agents are started, components are started, bridges are up: we are - # ready to roll! Update pilot state. - pilot = {'type' : 'pilot', - 'uid' : self._pid, - 'state' : rps.PMGR_ACTIVE, - 'resource_details' : { - # 'lm_info' : self._rm.lm_info.get('version_info'), - # 'lm_detail' : self._rm.lm_info.get('lm_detail'), - 'rm_info' : self._rm.info}, - '$set' : ['resource_details']} - self.advance(pilot, publish=True, push=False) + # ready to roll! Send state update + rm_info = self._rm.info + n_nodes = len(rm_info['node_list']) + + self._log.debug('advance to PMGR_ACTIVE') + + pilot = {'$all' : True, # pass full info to client side + 'type' : 'pilot', + 'uid' : self._pid, + 'state' : rps.PMGR_ACTIVE, + 'resources': {'rm_info': rm_info, + 'cpu' : rm_info['cores_per_node'] * n_nodes, + 'gpu' : rm_info['gpus_per_node'] * n_nodes}} + + self.advance(pilot, publish=True, push=False, fwd=True) # -------------------------------------------------------------------------- @@ -283,14 +248,16 @@ def finalize(self): self._log.debug('stage output parent') self.stage_output() - # tear things down in reverse order - self._hb.stop() - self._cmgr.close() + self._log.info('rusage: %s', rpu.get_rusage()) - if self._rm: - self._rm.stop() + out, err, log = '', '', '' - self._reg_service.stop() + try : out = open('./agent_0.out', 'r').read(1024) + except: pass + try : err = open('./agent_0.err', 'r').read(1024) + except: pass + try : log = open('./agent_0.log', 'r').read(1024) + except: pass if self._final_cause == 'timeout' : state = rps.DONE elif self._final_cause == 'cancel' : state = rps.CANCELED @@ -302,140 +269,136 @@ def finalize(self): with ru.ru_open('./killme.signal', 'w') as fout: fout.write('%s\n' % state) - # we don't rely on the existence / viability of the update worker at - # that point. - self._log.debug('update db state: %s: %s', state, self._final_cause) - self._log.info('rusage: %s', rpu.get_rusage()) - - out, err, log = '', '', '' + pilot = {'type' : 'pilot', + 'uid' : self._pid, + 'stdout' : out, + 'stderr' : err, + 'logfile': log, + 'state' : state} - try : out = ru.ru_open('./agent.0.out', 'r').read(1024) - except: pass - try : err = ru.ru_open('./agent.0.err', 'r').read(1024) - except: pass - try : log = ru.ru_open('./agent.0.log', 'r').read(1024) - except: pass - - ret = self._dbs._c.update({'type' : 'pilot', - 'uid' : self._pid}, - {'$set' : {'stdout' : rpu.tail(out), - 'stderr' : rpu.tail(err), - 'logfile': rpu.tail(log), - 'state' : state}, - '$push': {'states' : state} - }) - self._log.debug('update ret: %s', ret) - - - # -------------------------------------------------------------------- - # - def _write_sa_configs(self): - - # we have all information needed by the subagents -- write the - # sub-agent config files. - - # write deep-copies of the config for each sub-agent (sans from agent.0) - for sa in self._cfg.get('agents', {}): - - assert (sa != 'agent.0'), 'expect subagent, not agent.0' - - # use our own config sans agents/components/bridges as a basis for - # the sub-agent config. - tmp_cfg = copy.deepcopy(self._cfg) - tmp_cfg['agents'] = dict() - tmp_cfg['components'] = dict() - tmp_cfg['bridges'] = dict() - - # merge sub_agent layout into the config - ru.dict_merge(tmp_cfg, self._cfg['agents'][sa], ru.OVERWRITE) - - tmp_cfg['uid'] = sa - tmp_cfg['aid'] = sa - tmp_cfg['owner'] = 'agent.0' + self._log.debug('push final state update') + self._log.debug('update state: %s: %s', state, self._final_cause) + self.advance(pilot, publish=True, push=False) - ru.write_json(tmp_cfg, './%s.cfg' % sa) + # tear things down in reverse order + self._rm.stop() + self._session.close() # -------------------------------------------------------------------------- # def _start_services(self): - service_descriptions = self._cfg.services - if not service_descriptions: + if not self.session.cfg.services: return + self._log.info('starting agent services') - services = list() - for service_desc in service_descriptions: + services = [] + services_data = {} + + for sd in self.session.cfg.services: - td = TaskDescription(service_desc) + td = TaskDescription(sd) td.mode = AGENT_SERVICE # ensure that the description is viable td.verify() - cfg = self._cfg tid = ru.generate_id('service.%(item_counter)04d', - ru.ID_CUSTOM, ns=self._cfg.sid) - + ru.ID_CUSTOM, ns=self.session.uid) task = dict() + task['uid'] = tid + task['type'] = 'service_task' task['origin'] = 'agent' + task['pilot'] = self.session.cfg.pid task['description'] = td.as_dict() task['state'] = rps.AGENT_STAGING_INPUT_PENDING - task['status'] = 'NEW' - task['type'] = 'service_task' - task['uid'] = tid - task['pilot_sandbox'] = cfg.pilot_sandbox - task['task_sandbox'] = cfg.pilot_sandbox + task['uid'] + '/' - task['task_sandbox_path'] = cfg.pilot_sandbox + task['uid'] + '/' - task['session_sandbox'] = cfg.session_sandbox - task['resource_sandbox'] = cfg.resource_sandbox - task['pilot'] = cfg.pid + task['pilot_sandbox'] = self.session.cfg.pilot_sandbox + task['session_sandbox'] = self.session.cfg.session_sandbox + task['resource_sandbox'] = self.session.cfg.resource_sandbox task['resources'] = {'cpu': td.ranks * td.cores_per_rank, 'gpu': td.ranks * td.gpus_per_rank} - self._service_uids_launched.append(tid) - services.append(task) - - self.advance(services, publish=False, push=True) - - # Waiting 2mins for all services to launch - if not self._services_setup.wait(timeout=60 * 2): - raise RuntimeError('Unable to start services') - - self._log.info('all agent services started') + task_sandbox = self.session.cfg.pilot_sandbox + tid + '/' + task['task_sandbox'] = task_sandbox + task['task_sandbox_path'] = task_sandbox + # TODO: use `type='service_task'` in RADICAL-Analytics - # -------------------------------------------------------------------------- - # - def _service_state_cb(self, topic, msg): # pylint: disable=unused-argument + # TaskDescription.metadata will contain service related data: + # "name" (unique), "startup_file" - cmd = msg['cmd'] - tasks = msg['arg'] + self._service_uids_launched.append(tid) + services.append(task) - if cmd != 'update': - return + services_data[tid] = {} + if td.metadata.get('startup_file'): + n = td.metadata.get('name') + services_data[tid]['name'] = 'service.%s' % n if n else tid + services_data[tid]['startup_file'] = td.metadata['startup_file'] - for service in ru.as_list(tasks): + self.advance(services, publish=False, push=True) - if service['uid'] not in self._service_uids_launched or \ - service['uid'] in self._service_uids_running: - continue + self.register_timed_cb(cb=self._services_startup_cb, + cb_data=services_data, + timer=2) - self._log.debug('service state update %s: %s', - service['uid'], service['state']) + # waiting for all services to start (max waiting time 2 mins) + if not self._services_setup.wait(timeout=120): + raise RuntimeError('Unable to start services') - if service['state'] != rps.AGENT_EXECUTING: - continue + self.unregister_timed_cb(self._services_startup_cb) - self._service_uids_running.append(service['uid']) - self._log.debug('service %s started (%s / %s)', service['uid'], - len(self._service_uids_running), - len(self._service_uids_launched)) + self._log.info('all agent services started') - if len(self._service_uids_launched) == \ - len(self._service_uids_running): - self._services_setup.set() + def _services_startup_cb(self, cb_data): + + for tid in list(cb_data): + + service_up = False + startup_file = cb_data[tid].get('startup_file') + + if not startup_file: + service_up = True + # FIXME: at this point we assume that since "startup_file" is + # not provided, then we don't wait - this will be + # replaced with another callback (BaseComponent.advance will + # publish control command "service_up" for service tasks) + + elif os.path.isfile(startup_file): + # if file exists then service is up (general approach) + service_up = True + + # collect data from the startup file: at this point we look + # for URLs only + service_urls = {} + with ru.ru_open(startup_file, 'r') as fin: + for line in fin.readlines(): + if '://' not in line: + continue + parts = line.split() + if len(parts) == 1: + idx, url = '', parts[0] + elif '://' in parts[1]: + idx, url = parts[0], parts[1] + else: + continue + service_urls[idx] = url + + if service_urls: + for idx, url in service_urls.items(): + key = cb_data[tid]['name'] + if idx: + key += '.%s' % idx + key += '.url' + self.session._reg[key] = url + + if service_up: + self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'service_up', + 'arg': {'uid': tid}}) + del cb_data[tid] + return True # -------------------------------------------------------------------------- # @@ -448,25 +411,31 @@ def _start_sub_agents(self): # FIXME: reroute to agent daemonizer - if not self._cfg.get('agents'): + if not self.session.cfg.get('agents'): return - assert (len(self._rm.info.agent_node_list) >= len(self._cfg['agents'])) + n_agents = len(self.session.cfg['agents']) + n_agent_nodes = len(self._rm.info.agent_node_list) + + assert n_agent_nodes >= n_agents + self._log.debug('start_sub_agents') # store the current environment as the sub-agents will use the same + # (it will be called within "bootstrap_2.sh") ru.env_prep(os.environ, script_path='./env/agent.env') # the configs are written, and the sub-agents can be started. To know # how to do that we create the agent launch method, have it creating - # the respective command lines per agent instance, and run via - # popen. - # + # the respective command lines per agent instance, and run via popen. + + bs_name = '%s/bootstrap_2.sh' - for idx, sa in enumerate(self._cfg['agents']): + for idx, sa in enumerate(self.session.cfg['agents']): - target = self._cfg['agents'][sa]['target'] + target = self.session.cfg['agents'][sa]['target'] + bs_args = [self._sid, self.session.cfg.reg_addr, sa] if target not in ['local', 'node']: @@ -475,8 +444,8 @@ def _start_sub_agents(self): if target == 'local': # start agent locally - cmdline = '/bin/sh -l %s/bootstrap_2.sh %s' % (self._pwd, sa) - + bs_path = bs_name % self._pwd + cmdline = '/bin/sh -l %s' % ' '.join([bs_path] + bs_args) else: # target == 'node': @@ -491,10 +460,13 @@ def _start_sub_agents(self): # out for the moment, which will make this unable to # work with a number of launch methods. Can the # offset computation be moved to the ResourceManager? - bs_name = '%s/bootstrap_2.sh' % (self._pwd) + launch_script = '%s/%s.launch.sh' % (self._pwd, sa) exec_script = '%s/%s.exec.sh' % (self._pwd, sa) + node_cores = [cid for cid, cstate in enumerate(node['cores']) + if cstate == rpc.FREE] + agent_task = { 'uid' : sa, 'task_sandbox_path' : self._pwd, @@ -503,11 +475,11 @@ def _start_sub_agents(self): 'ranks' : 1, 'cores_per_rank': self._rm.info.cores_per_node, 'executable' : '/bin/sh', - 'arguments' : [bs_name, sa] + 'arguments' : [bs_name % self._pwd] + bs_args }).as_dict(), 'slots': {'ranks' : [{'node_name': node['node_name'], 'node_id' : node['node_id'], - 'core_map' : [[0]], + 'core_map' : [node_cores], 'gpu_map' : [], 'lfs' : 0, 'mem' : 0}]} @@ -528,15 +500,11 @@ def _start_sub_agents(self): cmds = launcher.get_launch_cmds(agent_task, exec_script) tmp += '%s\nexit $?\n\n' % cmds - with ru.ru_open(launch_script, 'w') as fout: fout.write(tmp) - tmp = '#!/bin/sh\n\n' - tmp += '. ./env/agent.env\n' - tmp += '/bin/sh -l ./bootstrap_2.sh %s\n\n' % sa - + tmp += '/bin/sh -l %s\n\n' % ' '.join([bs_name % '.'] + bs_args) with ru.ru_open(exec_script, 'w') as fout: fout.write(tmp) @@ -549,212 +517,83 @@ def _start_sub_agents(self): # spawn the sub-agent cmdline = launch_script - self._log.info ('create sub-agent %s: %s', sa, cmdline) + self._log.info('create sub-agent %s: %s', sa, cmdline) ru.sh_callout_bg(cmdline, stdout='%s.out' % sa, - stderr='%s.err' % sa) - - # FIXME: register heartbeats? + stderr='%s.err' % sa, + cwd=self._pwd) self._log.debug('start_sub_agents done') # -------------------------------------------------------------------------- # - def _agent_control_cb(self): - - if not self._check_commands(): return False - if not self._check_rpc (): return False - if not self._check_state (): return False - - return True - - - # -------------------------------------------------------------------------- - # - def _check_commands(self): - - # Check if there's a command waiting - # FIXME: this pull should be done by the update worker, and commands - # should then be communicated over the command pubsub - # FIXME: commands go to pmgr, tmgr, session docs - # FIXME: check if pull/wipe are atomic - # FIXME: long runnign commands can time out on hb - retdoc = self._dbs._c.find_and_modify( - query ={'uid' : self._pid}, - fields=['cmds'], # get new commands - update={'$set': {'cmds': list()}}) # wipe old commands - - if not retdoc: - return True + def _check_lifetime(self): - for spec in retdoc.get('cmds', []): - - cmd = spec['cmd'] - arg = spec['arg'] - - self._log.debug('pilot command: %s: %s', cmd, arg) - self._prof.prof('cmd', msg="%s : %s" % (cmd, arg), uid=self._pid) + # Make sure that we haven't exceeded the runtime - otherwise terminate. + if self.session.cfg.runtime: - if cmd == 'heartbeat' and arg['pmgr'] == self._pmgr: - self._hb.beat(uid=self._pmgr) + if time.time() >= self._starttime + \ + (int(self.session.cfg.runtime) * 60): - elif cmd == 'cancel_pilot': - self._log.info('cancel_pilot cmd') - self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'terminate', - 'arg' : None}) - self._final_cause = 'cancel' + self._log.info('runtime limit (%ss).', + self.session.cfg.runtime * 60) + self._final_cause = 'timeout' + self.stop() return False # we are done - elif cmd == 'cancel_tasks': - self._log.info('cancel_tasks cmd') - self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'cancel_tasks', - 'arg' : arg}) - else: - self._log.warn('could not interpret cmd "%s" - ignore', cmd) - return True # -------------------------------------------------------------------------- # - def _check_rpc(self): + def control_cb(self, topic, msg): ''' - check if the DB has any RPC request for this pilot. If so, then forward - that request as `rpc_req` command on the CONTROL channel, and listen for - an `rpc_res` command on the same channel, for the same rpc id. Once - that response is received (from whatever component handled that - command), send the response back to the databse for the callee to pick - up. + Check for commands on the control pubsub, mainly waiting for RPC + requests to handle. ''' - # FIXME: implement a timeout, and/or a registry of rpc clients + self._log.debug_1('control msg %s: %s', topic, msg) - retdoc = self._dbs._c.find_and_modify( - query ={'uid' : self._pid}, - fields=['rpc_req'], - update={'$set': {'rpc_req': None}}) + cmd = msg['cmd'] + arg = msg.get('arg') - if not retdoc: - # no rpc request found - return True + self._log.debug('pilot command: %s: %s', cmd, arg) + self._prof.prof('cmd', msg="%s : %s" % (cmd, arg), uid=self._pid) - rpc_req = retdoc.get('rpc_req') - if rpc_req is None: - # document has no rpc request + if cmd == 'pmgr_heartbeat' and arg['pmgr'] == self._pmgr: + self._session._hb.beat(uid=self._pmgr) return True - self._log.debug('rpc req: %s', rpc_req) - - # RPCs are synchronous right now - we send the RPC on the command - # channel, hope that some component picks it up and replies, and then - # return that reply. The reply is received via a temporary callback - # defined here, which will receive all CONTROL messages until the right - # rpc response comes along. - def rpc_cb(topic, msg): - - rpc_id = rpc_req['uid'] - - cmd = msg['cmd'] - rpc_res = msg['arg'] - - if cmd != 'rpc_res': - # not an rpc responese, keep cb registered - return True - - if rpc_res['uid'] != rpc_id: - # not the right rpc response, keep cb registered - return True - - # send the response to the DB - self._dbs._c.update({'type' : 'pilot', - 'uid' : self._pid}, - {'$set' : {'rpc_res': rpc_res}}) - - # work is done - unregister this temporary cb - return False - - - self.register_subscriber(rpc.CONTROL_PUBSUB, rpc_cb) - - # ready to receive and proxy rpc response -- forward rpc request on - # control channel - self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'rpc_req', - 'arg' : rpc_req}) - - return True # keeb cb registered (self._check_rpc) - - - # -------------------------------------------------------------------------- - # - def _check_control(self, _, msg): - ''' - Check for commands on the control pubsub, mainly waiting for RPC - requests to handle. We handle two types of RPC requests: `hello` for - testing, and `prepare_env` for environment preparation requests. - ''' - - cmd = msg['cmd'] - - if cmd == 'rpc_req': - return self._ctrl_rpc(msg) + elif cmd == 'cancel_pilots': + return self._ctrl_cancel_pilots(msg) elif cmd == 'service_up': - self._log.debug('=== got service_up: %s', msg) return self._ctrl_service_up(msg) - # ignore all other message types - return True - # -------------------------------------------------------------------------- # - def _ctrl_rpc(self, msg): + def _ctrl_cancel_pilots(self, msg): - cmd = msg['cmd'] arg = msg['arg'] - req = arg['rpc'] - if req not in ['hello', 'prepare_env']: - # we don't handle that request - return True + if self._pid not in arg.get('uids'): + self._log.debug('ignore cancel %s', msg) - rpc_res = {'uid': arg['uid']} - try: - if req == 'hello' : - out = 'hello %s' % ' '.join(arg['arg']) - - elif req == 'prepare_env': - env_name = arg['arg']['env_name'] - env_spec = arg['arg']['env_spec'] - out = self._prepare_env(env_name, env_spec) - - else: - # unknown command - return True - - # request succeeded - respond with return value - rpc_res['err'] = None - rpc_res['out'] = out - rpc_res['ret'] = 0 - - except Exception as e: - # request failed for some reason - indicate error - rpc_res['err'] = repr(e) - rpc_res['out'] = None - rpc_res['ret'] = 1 - self._log.exception('control cmd failed') - - # publish the response (success or failure) - self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'rpc_res', - 'arg': rpc_res}) - return True + self._log.info('cancel pilot cmd') + self.publish(rpc.CONTROL_PUBSUB, {'cmd' : 'terminate', + 'arg' : None}) + self._final_cause = 'cancel' + self.stop() + + # work is done - unregister this cb + return False # -------------------------------------------------------------------------- # def _ctrl_service_up(self, msg): - cmd = msg['cmd'] uid = msg['arg']['uid'] # This message signals that an agent service instance is up and running. @@ -763,17 +602,15 @@ def _ctrl_service_up(self, msg): if uid not in self._service_uids_launched: # we do not know this service instance - self._log.warn('=== ignore service startup signal for %s', uid) + self._log.warn('ignore service startup signal for %s', uid) return True if uid in self._service_uids_running: - self._log.warn('=== duplicated service startup signal for %s', uid) + self._log.warn('duplicated service startup signal for %s', uid) return True - self._log.debug('=== service startup message for %s', uid) - self._service_uids_running.append(uid) - self._log.debug('=== service %s started (%s / %s)', uid, + self._log.debug('service %s started (%s / %s)', uid, len(self._service_uids_running), len(self._service_uids_launched)) @@ -785,86 +622,11 @@ def _ctrl_service_up(self, msg): return True - # -------------------------------------------------------------------------- - # - def _check_state(self): - - # Make sure that we haven't exceeded the runtime - otherwise terminate. - if self._cfg.runtime: - - if time.time() >= self._starttime + (int(self._cfg.runtime) * 60): - - self._log.info('runtime limit (%ss).', self._cfg.runtime * 60) - self._final_cause = 'timeout' - self.stop() - return False # we are done - - return True - - - # -------------------------------------------------------------------------- - # - def _check_tasks_cb(self): - - # Check for tasks waiting for input staging and log pull. - # - # FIXME: Unfortunately, 'find_and_modify' is not bulkable, so we have - # to use 'find'. To avoid finding the same tasks over and over - # again, we update the 'control' field *before* running the next - # find -- so we do it right here. - # This also blocks us from using multiple ingest threads, or from - # doing late binding by task pull :/ - task_cursor = self._dbs._c.find({'type' : 'task', - 'pilot' : self._pid, - 'control' : 'agent_pending'}) - if not task_cursor.count(): - self._log.info('tasks pulled: 0') - return True - - # update the tasks to avoid pulling them again next time. - task_list = list(task_cursor) - task_uids = [task['uid'] for task in task_list] - - self._dbs._c.update({'type' : 'task', - 'uid' : {'$in' : task_uids}}, - {'$set' : {'control' : 'agent'}}, - multi=True) - - self._log.info("tasks pulled: %4d", len(task_list)) - self._prof.prof('get', msg='bulk: %d' % len(task_list), uid=self._pid) - - for task in task_list: - - # make sure the tasks obtain env settings (if needed) - if 'task_environment' in self._cfg: - if not task['description'].get('environment'): - task['description']['environment'] = dict() - for k,v in self._cfg['task_environment'].items(): - task['description']['environment'][k] = v - - # we need to make sure to have the correct state: - task['state'] = rps._task_state_collapse(task['states']) - self._prof.prof('get', uid=task['uid']) - - # FIXME: raise or fail task! - if task['state'] != rps.AGENT_STAGING_INPUT_PENDING: - self._log.error('invalid state: %s', (pprint.pformat(task))) - - task['control'] = 'agent' - - # now we really own the CUs, and can start working on them (ie. push - # them into the pipeline). We don't publish nor profile as advance, - # since that happened already on the module side when the state was set. - self.advance(task_list, publish=False, push=True) - - return True - - # -------------------------------------------------------------------------- # def _prepare_env(self, env_name, env_spec): - self._log.debug('env_spec: %s', env_spec) + self._log.debug('env_spec %s: %s', env_name, env_spec) etype = env_spec.get('type', 'venv') evers = env_spec.get('version') From 7653afacf2f8470aad7bc69041559a3fbcf19535 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Sat, 16 Mar 2024 21:57:05 +0100 Subject: [PATCH 5/5] fix var name --- bin/radical-pilot-service-signal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/radical-pilot-service-signal b/bin/radical-pilot-service-signal index a8aed56ee6..6a200d591f 100755 --- a/bin/radical-pilot-service-signal +++ b/bin/radical-pilot-service-signal @@ -25,7 +25,7 @@ import os import radical.utils as ru -reg_addr = os.environ['RP_REGISTRY_URL'] +reg_addr = os.environ['RP_REGISTRY_ADDRESS'] reg = ru.zmq.RegistryClient(url=reg_addr) ctr_addr = reg['bridges.control_pubsub.addr_pub']