diff --git a/src/radical/entk/api.v2/__init__.py b/src/radical/entk/api.v2/__init__.py new file mode 100644 index 00000000..154c8438 --- /dev/null +++ b/src/radical/entk/api.v2/__init__.py @@ -0,0 +1,8 @@ + +from .app_manager import AppManager +from .workflow import Workflow +from .pipeline import Pipeline +from .stage import Stage +from .task import Task + + diff --git a/src/radical/entk/api.v2/pipeline.py b/src/radical/entk/api.v2/pipeline.py new file mode 100644 index 00000000..e5ea2978 --- /dev/null +++ b/src/radical/entk/api.v2/pipeline.py @@ -0,0 +1,59 @@ + +import radical.utils as ru + +from .stage import Stage + + +# ------------------------------------------------------------------------------ +# +class Pipeline(object): + + def __init__(self, descr): + + self._descr = descr + self._state = 'NEW' + self._stages = dict() + self._cbs = list() + self._uid = ru.generate_uid('pipe') + self._wf = None + + + @property + def state(self): + + return self._state + + + def cancel(self): + + self._wf._wfmgr.cancel(self) + pass + + + def add_callback(self, cb): + + self._cbs.append(cb) + + + # -------------------------------------------------------------------------- + # + def add_stage(self, stage): + + self._stages[stage.uid] = stage + stage._pipe = weakref(self) + + + def list_stages(self): + + return self._stages + + + def cancel_stages(self, uids): + + for uid in uids: + self._stages[uid].cancel() + + +# ------------------------------------------------------------------------------ + + diff --git a/src/radical/entk/api.v2/stage.py b/src/radical/entk/api.v2/stage.py new file mode 100644 index 00000000..1edd3649 --- /dev/null +++ b/src/radical/entk/api.v2/stage.py @@ -0,0 +1,57 @@ + +import radical.utils as ru + +from .task import Task + + +# ------------------------------------------------------------------------------ +# +class Stage(object): + + def __init__(self, descr): + + self._descr = descr + self._state = 'NEW' + self._tasks = dict() + self._cbs = list() + self._uid = ru.generate_uid('stage') + self._pipe = None + + + @property + def state(self): + + return self._state + + + def cancel(self): + + self._pipe._wf._wfmgr.cancel(self) + + + def add_callback(self, cb): + + self._cbs.append(cb) + + + # -------------------------------------------------------------------------- + # + def add_task(self, task): + + self._tasks[task.uid] = task + task._stage = weakref(self) + + + def list_tasks(self): + + return self._tasks + + + def cancel_tasks(self, uids): + + for uid in uids: + self._tasks[uid].cancel() + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/entk/api.v2/task.py b/src/radical/entk/api.v2/task.py new file mode 100644 index 00000000..a06e3d87 --- /dev/null +++ b/src/radical/entk/api.v2/task.py @@ -0,0 +1,34 @@ + +import radical.utils as ru + + +# ------------------------------------------------------------------------------ +# +class Task(object): + + def __init__(self, descr): + + self._descr = descr + self._state = 'NEW' + self._cbs = list() + self._uid = ru.generate_uid('task') + self._stage = None + + + @property + def state(self): + + return self._state + + def cancel(self): + + self._stage._pipe._wf._wfmgr.cancel(self) + + + def add_callback(self, cb): + + self._cbs.append(cb) + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/entk/api.v2/workflow.py b/src/radical/entk/api.v2/workflow.py new file mode 100644 index 00000000..564e18d3 --- /dev/null +++ b/src/radical/entk/api.v2/workflow.py @@ -0,0 +1,56 @@ + +import radical.utils as ru + +from .pipeline import Pipeline + + +# ------------------------------------------------------------------------------ +# +class Workflow(object): + + def __init__(self, descr): + + self._descr = descr + self._state = 'NEW' + self._pipes = dict() + self._cbs = list() + self._uid = ru.generate_uid('wf') + self._wfmgr = None + + + @property + def state(self): + + return self._state + + + def cancel(self): + + self._wfmgr._cancel(self) + + + def add_callback(self, cb): + self._cbs.append(cb) + + + # -------------------------------------------------------------------------- + # + def add_pipeline(self, pipe): + + self._pipes[pipe.uid] = pipe + pipe._wf = weakref(self) + + + def list_pipelines(self): + + return self._pipes + + + def cancel_pipelines(self, uids): + + for uid in uids: + self._pipes[uid].cancel() + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/entk/api.v2/workflow_manager.py b/src/radical/entk/api.v2/workflow_manager.py new file mode 100644 index 00000000..2859de97 --- /dev/null +++ b/src/radical/entk/api.v2/workflow_manager.py @@ -0,0 +1,85 @@ + +import radical.utils as ru + + +# ------------------------------------------------------------------------------ +# +class WorkflowManager(object): + + # -------------------------------------------------------------------------- + # + def __init__(self, rmq_url=None, uid=None, rtype=None): + + self._rtype = rtype or 'radical.pilot' + self._backend = rtype.create_backend # ... + self._workflows = dict() # uid - Workflow + self._cb = None + self._uid = ru.generate_id('wfmgr') + + + # -------------------------------------------------------------------------- + # + @property + def uid(self): + + return self._uid + + + # -------------------------------------------------------------------------- + # + def acquire_resource(self, descr): + # returns resource id (rid) + + self._backend.acquire_resource(descr) + + + def list_resources(self): + # return map: {rid: descr} + + return self._backend.list_resources() + + + def release_resource(self, rid): + + return self._backend.release_resource(rid) + + + def stage_to_resource(self, rid, data): + + return self._backend.stage_to_resource(rid, data) + + + def stage_from_resource(self, rid, data): + + return self._backend.stage_from_resource(rid, data) + + + # -------------------------------------------------------------------------- + # + def submit_workflow(self, workflow): + # returns none + + workflow._wfmgr = weakref(self) + self._workflows[workflow.uid] = workflow + self._backend.submit(workflow) + + + def list_workflows(self): + # return map {uid: workflow} + + return self._backend.list() + + + def get_workflow(self, uid): + + return self._backend.get(uid) + + + def cancel_workflow(self, uid): + + return self._backend.cancel(uid) + + +# ------------------------------------------------------------------------------ +# pylint: disable=protected-access +