From 10d8982622f6baea043f2133d56a0593ad2943a3 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 16 Jun 2020 21:56:04 +0200 Subject: [PATCH 1/2] api draft --- src/radical/entk/api.v2/__init__.py | 8 +++ src/radical/entk/api.v2/app_manager.py | 75 ++++++++++++++++++++++++++ src/radical/entk/api.v2/pipeline.py | 37 +++++++++++++ src/radical/entk/api.v2/stage.py | 37 +++++++++++++ src/radical/entk/api.v2/task.py | 36 +++++++++++++ src/radical/entk/api.v2/workflow.py | 35 ++++++++++++ 6 files changed, 228 insertions(+) create mode 100644 src/radical/entk/api.v2/__init__.py create mode 100644 src/radical/entk/api.v2/app_manager.py create mode 100644 src/radical/entk/api.v2/pipeline.py create mode 100644 src/radical/entk/api.v2/stage.py create mode 100644 src/radical/entk/api.v2/task.py create mode 100644 src/radical/entk/api.v2/workflow.py diff --git a/src/radical/entk/api.v2/__init__.py b/src/radical/entk/api.v2/__init__.py new file mode 100644 index 000000000..154c84389 --- /dev/null +++ b/src/radical/entk/api.v2/__init__.py @@ -0,0 +1,8 @@ + +from .app_manager import AppManager +from .workflow import Workflow +from .pipeline import Pipeline +from .stage import Stage +from .task import Task + + diff --git a/src/radical/entk/api.v2/app_manager.py b/src/radical/entk/api.v2/app_manager.py new file mode 100644 index 000000000..ebc5237b0 --- /dev/null +++ b/src/radical/entk/api.v2/app_manager.py @@ -0,0 +1,75 @@ + + +# ------------------------------------------------------------------------------ +# +class AppManager(object): + + # -------------------------------------------------------------------------- + # + def __init__(self, rmq_url=None, uid=None, rtype=None): + + self._rtype = rtype or 'radical.pilot' + self._backend = rtype.create_backend # ... + self._workflows = dict() # uid - Workflow + self._cb = None + + + # -------------------------------------------------------------------------- + # + @property + def uid(self): + + return self._uid + + + # -------------------------------------------------------------------------- + # + def acquire_resource(self, descr): + + self._backend.acquire_resource(descr) + + + def list_resources(self): + + return self._backend.list_resources() + + + def release_resource(self, rid): + + return self._backend.release_resource(rid) + + + # -------------------------------------------------------------------------- + # + def submit(self, workflow): + + self._workflows[workflow.uid] = workflow + + + def list_workflows(self): + + return self._workflows.keys() + + + def get_workflow(self, uid): + + return self._workflows[uid] + + + def cancel(self, workflow_id): + + pass + + + def add_callback(self, cb): + self._cb = cb + + # goes to workflow + # def shared_data(self): + # def outputs(self): + + + +# ------------------------------------------------------------------------------ +# pylint: disable=protected-access + diff --git a/src/radical/entk/api.v2/pipeline.py b/src/radical/entk/api.v2/pipeline.py new file mode 100644 index 000000000..09db54051 --- /dev/null +++ b/src/radical/entk/api.v2/pipeline.py @@ -0,0 +1,37 @@ + +from .stage import Stage + + +# ------------------------------------------------------------------------------ +# +class Pipeline(object): + + def __init__(self, descr): + + self._stages = [Stage(sd) for sd in descr] + self._state = 'NEW' + self._cb = None + + self._workflow = None # workflow uid + + + @property + def state(self): + return self._state + + def cancel(self): + pass + + def add_callback(self, cb): + self._cb = cb + + def _advance(self, state): + + self._state = state + if self._cb: + self._cb() + + +# ------------------------------------------------------------------------------ + + diff --git a/src/radical/entk/api.v2/stage.py b/src/radical/entk/api.v2/stage.py new file mode 100644 index 000000000..cad34ffec --- /dev/null +++ b/src/radical/entk/api.v2/stage.py @@ -0,0 +1,37 @@ + +from .task import Task + + +# ------------------------------------------------------------------------------ +# +class Stage(object): + + def __init__(self, descr): + + self._tasks = [Task(td) for td in descr] + self._state = 'NEW' + self._cb = None + + self._pipeline = None # pipeline uid + self._workflow = None # workflow uid + + + @property + def state(self): + return self._state + + def cancel(self): + pass + + def add_callback(self, cb): + self._cb = cb + + def _advance(self, state): + + self._state = state + if self._cb: + self._cb() + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/entk/api.v2/task.py b/src/radical/entk/api.v2/task.py new file mode 100644 index 000000000..299798c94 --- /dev/null +++ b/src/radical/entk/api.v2/task.py @@ -0,0 +1,36 @@ + + +# ------------------------------------------------------------------------------ +# +class Task(object): + + def __init__(self, descr): + + self._state = 'NEW' + self._descr = descr + self._cb = None + + self._stage = None # stage uid + self._pipeline = None # pipeline uid + self._workflow = None # workflow uid + + + @property + def state(self): + return self._state + + def cancel(self): + pass + + def add_callback(self, cb): + self._cb = cb + + def _advance(self, state): + + self._state = state + if self._cb: + self._cb() + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/entk/api.v2/workflow.py b/src/radical/entk/api.v2/workflow.py new file mode 100644 index 000000000..0d2cce154 --- /dev/null +++ b/src/radical/entk/api.v2/workflow.py @@ -0,0 +1,35 @@ + + +from .pipeline import Pipeline + + +# ------------------------------------------------------------------------------ +# +class Workflow(object): + + def __init__(self, descr): + + self._pipelines = [Pipeline(pd) for pd in descr] + self._state = 'NEW' + self._cb = None + + + @property + def state(self): + return self._state + + def cancel(self): + pass + + def add_callback(self, cb): + self._cb = cb + + def _advance(self, state): + + self._state = state + if self._cb: + self._cb() + + +# ------------------------------------------------------------------------------ + From 28f6d2996d55bbfab97015dabcd27f962135b823 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 10 Feb 2021 11:53:24 +0100 Subject: [PATCH 2/2] api iteration --- src/radical/__init__.py | 2 - src/radical/entk/api.v2/pipeline.py | 42 +++++++++++++----- src/radical/entk/api.v2/stage.py | 44 ++++++++++++++----- src/radical/entk/api.v2/task.py | 26 +++++------ src/radical/entk/api.v2/workflow.py | 39 ++++++++++++---- .../{app_manager.py => workflow_manager.py} | 40 ++++++++++------- 6 files changed, 131 insertions(+), 62 deletions(-) delete mode 100644 src/radical/__init__.py rename src/radical/entk/api.v2/{app_manager.py => workflow_manager.py} (63%) diff --git a/src/radical/__init__.py b/src/radical/__init__.py deleted file mode 100644 index 8d17c21ca..000000000 --- a/src/radical/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -__import__('pkg_resources').declare_namespace(__name__) - diff --git a/src/radical/entk/api.v2/pipeline.py b/src/radical/entk/api.v2/pipeline.py index 09db54051..e5ea29789 100644 --- a/src/radical/entk/api.v2/pipeline.py +++ b/src/radical/entk/api.v2/pipeline.py @@ -1,4 +1,6 @@ +import radical.utils as ru + from .stage import Stage @@ -8,28 +10,48 @@ class Pipeline(object): def __init__(self, descr): - self._stages = [Stage(sd) for sd in descr] - self._state = 'NEW' - self._cb = None - - self._workflow = None # workflow uid + self._descr = descr + self._state = 'NEW' + self._stages = dict() + self._cbs = list() + self._uid = ru.generate_uid('pipe') + self._wf = None @property def state(self): + return self._state + def cancel(self): + + self._wf._wfmgr.cancel(self) pass + def add_callback(self, cb): - self._cb = cb - def _advance(self, state): + self._cbs.append(cb) + + + # -------------------------------------------------------------------------- + # + def add_stage(self, stage): + + self._stages[stage.uid] = stage + stage._pipe = weakref(self) + + + def list_stages(self): + + return self._stages + + + def cancel_stages(self, uids): - self._state = state - if self._cb: - self._cb() + for uid in uids: + self._stages[uid].cancel() # ------------------------------------------------------------------------------ diff --git a/src/radical/entk/api.v2/stage.py b/src/radical/entk/api.v2/stage.py index cad34ffec..1edd3649a 100644 --- a/src/radical/entk/api.v2/stage.py +++ b/src/radical/entk/api.v2/stage.py @@ -1,4 +1,6 @@ +import radical.utils as ru + from .task import Task @@ -8,29 +10,47 @@ class Stage(object): def __init__(self, descr): - self._tasks = [Task(td) for td in descr] - self._state = 'NEW' - self._cb = None - - self._pipeline = None # pipeline uid - self._workflow = None # workflow uid + self._descr = descr + self._state = 'NEW' + self._tasks = dict() + self._cbs = list() + self._uid = ru.generate_uid('stage') + self._pipe = None @property def state(self): + return self._state + def cancel(self): - pass + + self._pipe._wf._wfmgr.cancel(self) + def add_callback(self, cb): - self._cb = cb - def _advance(self, state): + self._cbs.append(cb) + + + # -------------------------------------------------------------------------- + # + def add_task(self, task): + + self._tasks[task.uid] = task + task._stage = weakref(self) + + + def list_tasks(self): + + return self._tasks + + + def cancel_tasks(self, uids): - self._state = state - if self._cb: - self._cb() + for uid in uids: + self._tasks[uid].cancel() # ------------------------------------------------------------------------------ diff --git a/src/radical/entk/api.v2/task.py b/src/radical/entk/api.v2/task.py index 299798c94..a06e3d873 100644 --- a/src/radical/entk/api.v2/task.py +++ b/src/radical/entk/api.v2/task.py @@ -1,4 +1,6 @@ +import radical.utils as ru + # ------------------------------------------------------------------------------ # @@ -6,30 +8,26 @@ class Task(object): def __init__(self, descr): - self._state = 'NEW' - self._descr = descr - self._cb = None - - self._stage = None # stage uid - self._pipeline = None # pipeline uid - self._workflow = None # workflow uid + self._descr = descr + self._state = 'NEW' + self._cbs = list() + self._uid = ru.generate_uid('task') + self._stage = None @property def state(self): + return self._state def cancel(self): - pass - def add_callback(self, cb): - self._cb = cb + self._stage._pipe._wf._wfmgr.cancel(self) + - def _advance(self, state): + def add_callback(self, cb): - self._state = state - if self._cb: - self._cb() + self._cbs.append(cb) # ------------------------------------------------------------------------------ diff --git a/src/radical/entk/api.v2/workflow.py b/src/radical/entk/api.v2/workflow.py index 0d2cce154..564e18d35 100644 --- a/src/radical/entk/api.v2/workflow.py +++ b/src/radical/entk/api.v2/workflow.py @@ -1,4 +1,5 @@ +import radical.utils as ru from .pipeline import Pipeline @@ -9,26 +10,46 @@ class Workflow(object): def __init__(self, descr): - self._pipelines = [Pipeline(pd) for pd in descr] - self._state = 'NEW' - self._cb = None + self._descr = descr + self._state = 'NEW' + self._pipes = dict() + self._cbs = list() + self._uid = ru.generate_uid('wf') + self._wfmgr = None @property def state(self): + return self._state + def cancel(self): - pass + + self._wfmgr._cancel(self) + def add_callback(self, cb): - self._cb = cb + self._cbs.append(cb) + + + # -------------------------------------------------------------------------- + # + def add_pipeline(self, pipe): + + self._pipes[pipe.uid] = pipe + pipe._wf = weakref(self) + + + def list_pipelines(self): + + return self._pipes + - def _advance(self, state): + def cancel_pipelines(self, uids): - self._state = state - if self._cb: - self._cb() + for uid in uids: + self._pipes[uid].cancel() # ------------------------------------------------------------------------------ diff --git a/src/radical/entk/api.v2/app_manager.py b/src/radical/entk/api.v2/workflow_manager.py similarity index 63% rename from src/radical/entk/api.v2/app_manager.py rename to src/radical/entk/api.v2/workflow_manager.py index ebc5237b0..2859de97f 100644 --- a/src/radical/entk/api.v2/app_manager.py +++ b/src/radical/entk/api.v2/workflow_manager.py @@ -1,8 +1,10 @@ +import radical.utils as ru + # ------------------------------------------------------------------------------ # -class AppManager(object): +class WorkflowManager(object): # -------------------------------------------------------------------------- # @@ -12,6 +14,7 @@ def __init__(self, rmq_url=None, uid=None, rtype=None): self._backend = rtype.create_backend # ... self._workflows = dict() # uid - Workflow self._cb = None + self._uid = ru.generate_id('wfmgr') # -------------------------------------------------------------------------- @@ -25,11 +28,13 @@ def uid(self): # -------------------------------------------------------------------------- # def acquire_resource(self, descr): + # returns resource id (rid) self._backend.acquire_resource(descr) def list_resources(self): + # return map: {rid: descr} return self._backend.list_resources() @@ -39,35 +44,40 @@ def release_resource(self, rid): return self._backend.release_resource(rid) + def stage_to_resource(self, rid, data): + + return self._backend.stage_to_resource(rid, data) + + + def stage_from_resource(self, rid, data): + + return self._backend.stage_from_resource(rid, data) + + # -------------------------------------------------------------------------- # - def submit(self, workflow): + def submit_workflow(self, workflow): + # returns none + workflow._wfmgr = weakref(self) self._workflows[workflow.uid] = workflow + self._backend.submit(workflow) def list_workflows(self): + # return map {uid: workflow} - return self._workflows.keys() + return self._backend.list() def get_workflow(self, uid): - return self._workflows[uid] - - - def cancel(self, workflow_id): - - pass - + return self._backend.get(uid) - def add_callback(self, cb): - self._cb = cb - # goes to workflow - # def shared_data(self): - # def outputs(self): + def cancel_workflow(self, uid): + return self._backend.cancel(uid) # ------------------------------------------------------------------------------