-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Orion Extension concept [OC-343] #673
base: develop
Are you sure you want to change the base?
Changes from all commits
64543c2
c6191f3
7e8bd78
60b630d
cb8f1e3
0d9411a
0665515
4c3fa27
58e5bce
2339ac3
db1dcd4
f1f1351
dc5067e
eb58529
778e356
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,3 +80,5 @@ target/ | |
|
||
# Notebooks | ||
tests/**.ipynb | ||
dask-worker-space/ | ||
tests/functional/commands/*.json |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
Extensions | ||
========== | ||
|
||
.. automodule:: orion.ext.extensions | ||
:members: | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,222 @@ | ||
"""Defines extension mechanism for third party to hook into Orion""" | ||
|
||
|
||
class EventDelegate: | ||
"""Allow extensions to listen to incoming events from Orion. | ||
Orion broadcasts events which trigger extensions callbacks. | ||
|
||
Parameters | ||
---------- | ||
name: str | ||
name of the event we are creating, this is useful for error reporting | ||
|
||
deferred: bool | ||
if false events are triggered as soon as broadcast is called | ||
Delaunay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
If true, the events will need to be triggered manually. | ||
""" | ||
|
||
def __init__(self, name, deferred=False) -> None: | ||
self.handlers = [] | ||
self.deferred_calls = [] | ||
self.name = name | ||
self.deferred = deferred | ||
self.bad_handlers = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this for? |
||
self.manager = None | ||
|
||
def remove(self, function) -> bool: | ||
"""Remove an event handler from the handler list""" | ||
try: | ||
self.handlers.remove(function) | ||
return True | ||
except ValueError: | ||
return False | ||
|
||
def add(self, function): | ||
"""Add an event handler to our handler list""" | ||
self.handlers.append(function) | ||
|
||
def broadcast(self, *args, **kwargs): | ||
"""Broadcast and event to all our handlers""" | ||
if not self.deferred: | ||
self._execute(args, kwargs) | ||
return | ||
|
||
self.deferred_calls.append((args, kwargs)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be best to leave the implementation of deferred calls for later unless we test it now. |
||
|
||
def _execute(self, args, kwargs): | ||
for fun in self.handlers: | ||
try: | ||
fun(*args, **kwargs) | ||
except Exception as err: | ||
if self.manager: | ||
self.manager.on_extension_error.broadcast( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this silence any extension error by default? Perhaps the default should be to raise if there are no callbacks registered for |
||
self.name, fun, err, args=(args, kwargs) | ||
) | ||
|
||
def execute(self): | ||
"""Execute all our deferred handlers if any""" | ||
for args, kwargs in self.deferred_calls: | ||
self._execute(args, kwargs) | ||
|
||
|
||
class _DelegateStartEnd: | ||
def __init__(self, start, error, end, *args, **kwargs): | ||
self.args = args | ||
self.kwargs = kwargs | ||
self.start = start | ||
self.end = end | ||
self.error = error | ||
|
||
def __enter__(self): | ||
self.start.broadcast(*self.args, **self.kwargs) | ||
return self | ||
|
||
def __exit__(self, exception_type, exception_value, exception_traceback): | ||
self.end.broadcast(*self.args, **self.kwargs) | ||
|
||
if exception_value is not None: | ||
self.error.broadcast( | ||
*self.args, | ||
exception_type, | ||
exception_value, | ||
exception_traceback, | ||
**self.kwargs | ||
) | ||
|
||
|
||
class OrionExtensionManager: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think extension is a bit too general. It could be an extension of anything. Here we are specifically talking of callbacks for specific events. What would you think of a name like |
||
"""Manages third party extensions for Orion""" | ||
bouthilx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __init__(self): | ||
self._events = {} | ||
self._get_event("on_extension_error") | ||
|
||
# -- Trials | ||
self._get_event("new_trial") | ||
self._get_event("on_trial_error") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be coherent I think there should be no |
||
self._get_event("end_trial") | ||
|
||
# -- Experiments | ||
self._get_event("start_experiment") | ||
self._get_event("on_experiment_error") | ||
self._get_event("end_experiment") | ||
|
||
@property | ||
def on_extension_error(self): | ||
"""Called when an extension is throwing an exception""" | ||
return self._get_event("on_extension_error") | ||
|
||
def experiment(self, *args, **kwargs): | ||
"""Initialize a context manager that will call start/error/end events automatically""" | ||
return _DelegateStartEnd( | ||
self._get_event("start_experiment"), | ||
self._get_event("on_experiment_error"), | ||
self._get_event("end_experiment"), | ||
*args, | ||
**kwargs | ||
) | ||
|
||
def trial(self, *args, **kwargs): | ||
"""Initialize a context manager that will call start/error/end events automatically""" | ||
return _DelegateStartEnd( | ||
self._get_event("new_trial"), | ||
self._get_event("on_trial_error"), | ||
self._get_event("end_trial"), | ||
*args, | ||
**kwargs | ||
) | ||
|
||
def broadcast(self, name, *args, **kwargs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is indeed not tested nor used yet. |
||
return self._get_event(name).broadcast(*args, **kwargs) | ||
|
||
def _get_event(self, key): | ||
"""Retrieve event delegate | ||
|
||
Will generate one if not defined already. | ||
""" | ||
delegate = self._events.get(key) | ||
|
||
if delegate is None: | ||
delegate = EventDelegate(key) | ||
delegate.manager = self | ||
self._events[key] = delegate | ||
|
||
return delegate | ||
|
||
def register(self, ext): | ||
"""Register a new extensions | ||
|
||
Parameters | ||
---------- | ||
ext: ``OrionExtension`` | ||
object implementing :class:`OrionExtension` methods | ||
|
||
Returns | ||
------- | ||
the number of calls that was registered | ||
""" | ||
registered_callbacks = 0 | ||
for name, delegate in self._events.items(): | ||
if hasattr(ext, name): | ||
delegate.add(getattr(ext, name)) | ||
registered_callbacks += 1 | ||
|
||
return registered_callbacks | ||
|
||
def unregister(self, ext): | ||
"""Remove an extensions if it was already registered""" | ||
unregistered_callbacks = 0 | ||
for name, delegate in self._events.items(): | ||
if hasattr(ext, name): | ||
delegate.remove(getattr(ext, name)) | ||
unregistered_callbacks += 1 | ||
|
||
return unregistered_callbacks | ||
|
||
|
||
class OrionExtension: | ||
"""Base orion extension interface you need to implement""" | ||
|
||
def on_extension_error(self, name, fun, exception, args): | ||
"""Called when an extension callbakc raise an exception | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should clarify that if the callback does not raise the error, then the execution will continue when the callback is done. |
||
|
||
Parameters | ||
---------- | ||
fun: callable | ||
handler that raised the error | ||
|
||
exception: | ||
raised exception | ||
|
||
args: tuple | ||
tuple of the arguments that were used | ||
""" | ||
return | ||
|
||
def on_trial_error( | ||
self, trial, exception_type, exception_value, exception_traceback | ||
): | ||
"""Called when a error occur during the optimization process""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is expected of the trial status? Will it be changed to broken already? It should be specified in the docstring. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also what will happen afterwards, it this callback supposed to raise an error if we want the worker to stop? |
||
return | ||
|
||
def new_trial(self, trial): | ||
"""Called when the trial starts with a new configuration""" | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is when the trial execution start then the name should be start_trial instead. Because it may not be a new trial, it can be a resumed one, so the current name is confusing. |
||
|
||
def end_trial(self, trial): | ||
"""Called when the trial finished""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it called anytime, even if the trial was interrupted or crashed? Is it called before a status change occurred? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should also be an event |
||
return | ||
|
||
def on_experiment_error( | ||
self, experiment, exception_type, exception_value, exception_traceback | ||
): | ||
"""Called when a error occur during the optimization process""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar comment as for on_trial_error |
||
return | ||
|
||
def start_experiment(self, experiment): | ||
"""Called at the begin of the optimization process before the worker starts""" | ||
return | ||
|
||
def end_experiment(self, experiment): | ||
"""Called at the end of the optimization process after the worker exits""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar comment as for end_trial. |
||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line below executing
on_error
should be replaced by the Extension, with an event on_trial_fail or something like that.