Skip to content

Commit

Permalink
Merge pull request #387 from radical-cybertools/devel_nodb_2
Browse files Browse the repository at this point in the history
Devel nodb 2
  • Loading branch information
mtitov authored Sep 27, 2023
2 parents 5335906 + 6bd3801 commit f52190e
Show file tree
Hide file tree
Showing 23 changed files with 508 additions and 145 deletions.
3 changes: 1 addition & 2 deletions .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ version: 2
formats: all

python:
system_packages: true
version: 3.6
version: 3.7
install:
- requirements: requirements-docs.txt
- method: pip
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.35.0
1.40.0
1 change: 1 addition & 0 deletions src/radical/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from .typeddict import TypedDict, TypedDictMeta, as_dict
from .config import Config, DefaultConfig

from .zmq import Message
from .zmq import Bridge
from .zmq import Queue, Putter, Getter
from .zmq import PubSub, Publisher, Subscriber
Expand Down
45 changes: 32 additions & 13 deletions src/radical/utils/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ def __init__(self, uid, timeout, interval=1, beat_cb=None, term_cb=None,
if not self._log:
self._log = Logger('radical.utils.heartbeat')

self._log.debug('hb %s create', self._uid)


# --------------------------------------------------------------------------
#
def start(self):

self._log.debug('start heartbeat')
self._log.debug('hb %s start', self._uid)
self._watcher = mt.Thread(target=self._watch)
self._watcher.daemon = True
self._watcher.start()
Expand Down Expand Up @@ -101,41 +103,56 @@ def dump(self, log):
log.debug('hb dump %s: \n%s', self._uid, pprint.pformat(self._tstamps))


# --------------------------------------------------------------------------
#
def watch(self, uid):

with self._lock:
if uid not in self._tstamps:
# self._log.debug('=== hb %s watch %s', self._uid, uid)
self._tstamps[uid] = None


# --------------------------------------------------------------------------
#
def _watch(self):

# initial heartbeat without delay
if self._beat_cb:
# self._log.debug('=== hb %s beat cb init', self._uid)
self._beat_cb()

while not self._term.is_set():

# self._log.debug('=== hb %s loop %s', self._uid, self._interval)

time.sleep(self._interval)
now = time.time()

if self._beat_cb:
# self._log.debug('=== hb %s beat cb', self._uid)
self._beat_cb()

# avoid iteration over changing dict
with self._lock:
uids = list(self._tstamps.keys())

# self._log.debug('=== hb %s uids %s', self._uid, uids)
for uid in uids:

# self._log.debug('hb %s check %s', self._uid, uid)
# self._log.debug('=== hb %s check %s', self._uid, uid)

with self._lock:
last = self._tstamps.get(uid)

if last is None:
self._log.warn('hb %s[%s]: never seen', self._uid, uid)
self._log.warn('=== hb %s inval %s', self._uid, uid)
continue

if now - last > self._timeout:

if self._log:
self._log.warn('hb %s[%s]: %.1f - %.1f > %1.f: timeout',
self._log.warn('=== hb %s tout %s: %.1f - %.1f > %1.f',
self._uid, uid, now, last, self._timeout)

ret = None
Expand All @@ -148,9 +165,10 @@ def _watch(self):
# avoiding termination
ret = True

if ret is None:
if ret in [None, False]:
# could not recover: abandon mothership
self._log.warn('hb fail %s: fatal (%d)', uid, self._pid)
self._log.warn('=== hb %s fail %s: fatal (%d)',
self._uid, uid, self._pid)
os.kill(self._pid, signal.SIGTERM)
time.sleep(1)
os.kill(self._pid, signal.SIGKILL)
Expand All @@ -161,8 +179,9 @@ def _watch(self):
# information for the old uid and register a new
# heartbeat for the new one, so that we can immediately
# begin to watch it.
self._log.info('hb recover %s -> %s (%s)',
uid, ret, self._term_cb)
assert isinstance(ret, str)
self._log.info('=== hb %s recov %s -> %s (%s)',
self._uid, uid, ret, self._term_cb)
with self._lock:
del self._tstamps[uid]
self._tstamps[ret] = time.time()
Expand All @@ -178,8 +197,8 @@ def beat(self, uid=None, timestamp=None):
if not uid:
uid = 'default'

# self._log.debug('hb %s beat [%s]', self._uid, uid)
with self._lock:
# self._log.debug('=== hb %s beat [%s]', self._uid, uid)
self._tstamps[uid] = timestamp


Expand Down Expand Up @@ -223,21 +242,21 @@ def wait_startup(self, uids=None, timeout=None):
ok = [uid for uid in uids if self._tstamps.get(uid)]
nok = [uid for uid in uids if uid not in ok]

self._log.debug('wait for : %s', nok)
# self._log.debug('wait for : %s', nok)

if len(ok) == len(uids):
break

if timeout:
if time.time() - start > timeout:
self._log.debug('wait time: %s', nok)
# self._log.debug('wait time: %s', nok)
break

time.sleep(0.05)
time.sleep(0.25)

if len(ok) != len(uids):
nok = [uid for uid in uids if uid not in ok]
self._log.debug('wait fail: %s', nok)
self._log.error('wait fail: %s', nok)
return nok

else:
Expand Down
6 changes: 3 additions & 3 deletions src/radical/utils/ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def reset_counter(self, prefix, reset_all_others=False):

# ------------------------------------------------------------------------------
#
def generate_id(prefix, mode=ID_SIMPLE, ns=None):
def generate_id(prefix: str, mode=ID_SIMPLE, ns=None):
"""
Generate a human readable, sequential ID for the given prefix.
Expand Down Expand Up @@ -183,8 +183,8 @@ def generate_id(prefix, mode=ID_SIMPLE, ns=None):
and will, for `ID_PRIVATE`, revert to `ID_UUID`.
"""

if not prefix or not isinstance(prefix, str):
raise TypeError("ID generation expect prefix in basestring type")
if not isinstance(prefix, str):
raise TypeError('"prefix" must be a string, not %s' % type(prefix))

if _cache['dockerized'] and mode == ID_PRIVATE:
mode = ID_UUID
Expand Down
1 change: 1 addition & 0 deletions src/radical/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ def _ensure_handler(self):
p = self._path
n = self._name
for t in self._targets:

if t in ['0', 'null'] : h = logging.NullHandler()
elif t in ['-', '1', 'stdout']: h = ColorStreamHandler(sys.stdout)
elif t in ['=', '2', 'stderr']: h = ColorStreamHandler(sys.stderr)
Expand Down
4 changes: 4 additions & 0 deletions src/radical/utils/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ def __init__(self, name, ns=None, path=None):

if self._enabled.lower() in ['0', 'false', 'off']:
self._enabled = False

# don't open the file on disabled profilers
if not self._enabled:
self._handle = None
return

# profiler is enabled - set properties, sync time, open handle
Expand Down
21 changes: 14 additions & 7 deletions src/radical/utils/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def sh_quote(data):

# ------------------------------------------------------------------------------
#
def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):
def sh_callout(cmd, stdout=True, stderr=True,
shell=False, env=None, cwd=None):
'''
call a shell command, return `[stdout, stderr, retval]`.
'''
Expand All @@ -54,7 +55,8 @@ def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):
if stderr : stderr = sp.PIPE
else : stderr = None

p = sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env)
p = sp.Popen(cmd, stdout=stdout, stderr=stderr,
shell=shell, env=env, cwd=cwd)

if not stdout and not stderr:
ret = p.wait()
Expand All @@ -67,7 +69,8 @@ def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):

# ------------------------------------------------------------------------------
#
def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None):
def sh_callout_bg(cmd, stdout=None, stderr=None,
shell=False, env=None, cwd=None):
'''
call a shell command in the background. Do not attempt to pipe STDOUT/ERR,
but only support writing to named files.
Expand All @@ -84,15 +87,15 @@ def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None):
# convert string into arg list if needed
if not shell and is_string(cmd): cmd = shlex.split(cmd)

sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env)
sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env, cwd=cwd)

return


# ------------------------------------------------------------------------------
#
def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True,
shell=False, env=None):
shell=False, env=None, cwd=None):
'''
Run a command, and capture stdout/stderr if so flagged. The call will
Expand All @@ -110,6 +113,9 @@ def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True,
shell: True, False [default]
- pass to popen
cwd: string
- working directory for command to run in
PROC:
- PROC.stdout : `queue.Queue` instance delivering stdout lines
- PROC.stderr : `queue.Queue` instance delivering stderr lines
Expand All @@ -133,7 +139,7 @@ class _P(object):
'''

# ----------------------------------------------------------------------
def __init__(self, cmd, stdin, stdout, stderr, shell, env):
def __init__(self, cmd, stdin, stdout, stderr, shell, env, cwd):

cmd = cmd.strip()

Expand Down Expand Up @@ -165,6 +171,7 @@ def __init__(self, cmd, stdin, stdout, stderr, shell, env):
stderr=self._err_w,
shell=shell,
env=env,
cwd=cwd,
bufsize=1)

t = mt.Thread(target=self._watch)
Expand Down Expand Up @@ -277,7 +284,7 @@ def _watch(self):
# --------------------------------------------------------------------------

return _P(cmd=cmd, stdin=stdin, stdout=stdout, stderr=stderr,
shell=shell, env=env)
shell=shell, env=env, cwd=cwd)


# ------------------------------------------------------------------------------
Expand Down
14 changes: 11 additions & 3 deletions src/radical/utils/typeddict.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class TypedDict(dict, metaclass=TypedDictMeta):

# --------------------------------------------------------------------------
#
def __init__(self, from_dict=None):
def __init__(self, from_dict=None, **kwargs):
'''
Create a typed dictionary (tree) from `from_dict`.
Expand All @@ -131,10 +131,19 @@ def __init__(self, from_dict=None):
verify
Names with a leading underscore are not supported.
Supplied `from_dict` and kwargs are used to initialize the object
data -- the `kwargs` take preceedence over the `from_dict` if both
are specified (note that `from_dict` and `self` are invalid
`kwargs`).
'''

self.update(copy.deepcopy(self._defaults))
self.update(from_dict)

if kwargs:
self.update(kwargs)


# --------------------------------------------------------------------------
#
Expand Down Expand Up @@ -297,8 +306,7 @@ def __str__(self):
return str(self._data)

def __repr__(self):
return '<%s object, schema keys: %s>' % \
(type(self).__qualname__, tuple(self._schema.keys()))
return '%s: %s' % (type(self).__qualname__, str(self))


# --------------------------------------------------------------------------
Expand Down
5 changes: 3 additions & 2 deletions src/radical/utils/zmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@


from .bridge import Bridge
from .queue import Queue, Putter, Getter
from .pubsub import PubSub, Publisher, Subscriber
from .queue import Queue, Putter, Getter, test_queue
from .pubsub import PubSub, Publisher, Subscriber, test_pubsub
from .pipe import Pipe, MODE_PUSH, MODE_PULL
from .client import Client
from .server import Server
from .registry import Registry, RegistryClient
from .message import Message


# ------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit f52190e

Please sign in to comment.