Skip to content

Commit

Permalink
merge from devel
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Jul 1, 2024
2 parents 933ebed + b001af2 commit b37094e
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
https://github.com/radical-cybertools/radical.entk/ \
issues?q=is%3Aissue+is%3Aopen+

1.60.0 Release 2024-07-01
--------------------------------------------------------------------------------

- api draft


1.60.0 Release 2024-05-10
--------------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.61.0
1.62.0
8 changes: 8 additions & 0 deletions src/radical/entk/api.v2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

from .app_manager import AppManager
from .workflow import Workflow
from .pipeline import Pipeline
from .stage import Stage
from .task import Task


59 changes: 59 additions & 0 deletions src/radical/entk/api.v2/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

import radical.utils as ru

from .stage import Stage


# ------------------------------------------------------------------------------
#
class Pipeline(object):

def __init__(self, descr):

self._descr = descr
self._state = 'NEW'
self._stages = dict()
self._cbs = list()
self._uid = ru.generate_uid('pipe')
self._wf = None


@property
def state(self):

return self._state


def cancel(self):

self._wf._wfmgr.cancel(self)
pass


def add_callback(self, cb):

self._cbs.append(cb)


# --------------------------------------------------------------------------
#
def add_stage(self, stage):

self._stages[stage.uid] = stage
stage._pipe = weakref(self)


def list_stages(self):

return self._stages


def cancel_stages(self, uids):

for uid in uids:
self._stages[uid].cancel()


# ------------------------------------------------------------------------------


57 changes: 57 additions & 0 deletions src/radical/entk/api.v2/stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

import radical.utils as ru

from .task import Task


# ------------------------------------------------------------------------------
#
class Stage(object):

def __init__(self, descr):

self._descr = descr
self._state = 'NEW'
self._tasks = dict()
self._cbs = list()
self._uid = ru.generate_uid('stage')
self._pipe = None


@property
def state(self):

return self._state


def cancel(self):

self._pipe._wf._wfmgr.cancel(self)


def add_callback(self, cb):

self._cbs.append(cb)


# --------------------------------------------------------------------------
#
def add_task(self, task):

self._tasks[task.uid] = task
task._stage = weakref(self)


def list_tasks(self):

return self._tasks


def cancel_tasks(self, uids):

for uid in uids:
self._tasks[uid].cancel()


# ------------------------------------------------------------------------------

34 changes: 34 additions & 0 deletions src/radical/entk/api.v2/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

import radical.utils as ru


# ------------------------------------------------------------------------------
#
class Task(object):

def __init__(self, descr):

self._descr = descr
self._state = 'NEW'
self._cbs = list()
self._uid = ru.generate_uid('task')
self._stage = None


@property
def state(self):

return self._state

def cancel(self):

self._stage._pipe._wf._wfmgr.cancel(self)


def add_callback(self, cb):

self._cbs.append(cb)


# ------------------------------------------------------------------------------

56 changes: 56 additions & 0 deletions src/radical/entk/api.v2/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

import radical.utils as ru

from .pipeline import Pipeline


# ------------------------------------------------------------------------------
#
class Workflow(object):

def __init__(self, descr):

self._descr = descr
self._state = 'NEW'
self._pipes = dict()
self._cbs = list()
self._uid = ru.generate_uid('wf')
self._wfmgr = None


@property
def state(self):

return self._state


def cancel(self):

self._wfmgr._cancel(self)


def add_callback(self, cb):
self._cbs.append(cb)


# --------------------------------------------------------------------------
#
def add_pipeline(self, pipe):

self._pipes[pipe.uid] = pipe
pipe._wf = weakref(self)


def list_pipelines(self):

return self._pipes


def cancel_pipelines(self, uids):

for uid in uids:
self._pipes[uid].cancel()


# ------------------------------------------------------------------------------

85 changes: 85 additions & 0 deletions src/radical/entk/api.v2/workflow_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@

import radical.utils as ru


# ------------------------------------------------------------------------------
#
class WorkflowManager(object):

# --------------------------------------------------------------------------
#
def __init__(self, rmq_url=None, uid=None, rtype=None):

self._rtype = rtype or 'radical.pilot'
self._backend = rtype.create_backend # ...
self._workflows = dict() # uid - Workflow
self._cb = None
self._uid = ru.generate_id('wfmgr')


# --------------------------------------------------------------------------
#
@property
def uid(self):

return self._uid


# --------------------------------------------------------------------------
#
def acquire_resource(self, descr):
# returns resource id (rid)

self._backend.acquire_resource(descr)


def list_resources(self):
# return map: {rid: descr}

return self._backend.list_resources()


def release_resource(self, rid):

return self._backend.release_resource(rid)


def stage_to_resource(self, rid, data):

return self._backend.stage_to_resource(rid, data)


def stage_from_resource(self, rid, data):

return self._backend.stage_from_resource(rid, data)


# --------------------------------------------------------------------------
#
def submit_workflow(self, workflow):
# returns none

workflow._wfmgr = weakref(self)
self._workflows[workflow.uid] = workflow
self._backend.submit(workflow)


def list_workflows(self):
# return map {uid: workflow}

return self._backend.list()


def get_workflow(self, uid):

return self._backend.get(uid)


def cancel_workflow(self, uid):

return self._backend.cancel(uid)


# ------------------------------------------------------------------------------
# pylint: disable=protected-access

0 comments on commit b37094e

Please sign in to comment.