Skip to content

Commit

Permalink
Merge pull request #9 from twosigma/improve-stale-check-errors
Browse files Browse the repository at this point in the history
Improve stale check errors
  • Loading branch information
timothy-shields authored Mar 17, 2021
2 parents c7318ea + 9c067a3 commit c63b1e8
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 143 deletions.
9 changes: 9 additions & 0 deletions src/uberjob/_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,12 @@ def __init__(self, call: Call):

class NotTransformedError(Exception):
"""An expected transformation was not applied."""


def create_chained_call_error(call: Call, exception: Exception) -> CallError:
call_error = CallError(call)
call_error.__cause__ = exception
return call_error


__all__ = ["CallError", "NotTransformedError", "create_chained_call_error"]
38 changes: 17 additions & 21 deletions src/uberjob/_execution/run_physical.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
"""Functionality for executing a physical plan"""
from typing import Any, Callable, Dict, NamedTuple, Optional

from uberjob._errors import create_chained_call_error
from uberjob._execution.run_function_on_graph import run_function_on_graph
from uberjob._graph import get_full_scope
from uberjob._plan import Plan
from uberjob._transformations.pruning import prune_source_literals
from uberjob._util import Slot
from uberjob._util.retry import identity
from uberjob.graph import Call, Graph, Literal, Node, get_argument_nodes
from uberjob.progress._null_progress_observer import NullProgressObserver
from uberjob.progress._progress_observer import ProgressObserver


class BoundCall:
Expand Down Expand Up @@ -74,42 +78,38 @@ class PrepRunPhysical(NamedTuple):
plan: Plan


def _default_on_failed(node: Node, e: Exception):
pass


def prep_run_physical(
plan: Plan,
*,
inplace: bool,
output_node: Optional[Node] = None,
retry: Optional[Callable[[Callable], Callable]] = None,
on_started: Optional[Callable[[Node], None]] = None,
on_completed: Optional[Callable[[Node], None]] = None,
on_failed: Optional[Callable[[Node, Exception], None]] = None,
progress_observer: Optional[ProgressObserver] = None,
):
bound_call_lookup, output_slot = _create_bound_call_lookup_and_output_slot(
plan, output_node
)
plan = prune_source_literals(plan, inplace=inplace)
on_started = on_started or identity
on_completed = on_completed or identity
on_failed = on_failed or _default_on_failed
retry = retry or identity
progress_observer = progress_observer or NullProgressObserver()

def process(node):
if type(node) is Call:
on_started(node)
scope = get_full_scope(plan.graph, node)
progress_observer.increment_running(section="run", scope=scope)
bound_call = bound_call_lookup[node]
try:
retry(bound_call.value.run)()
except Exception as e:
on_failed(node, e)
except Exception as exception:
progress_observer.increment_failed(
section="run",
scope=scope,
exception=create_chained_call_error(node, exception),
)
raise
finally:
bound_call.value = None

on_completed(node)
progress_observer.increment_completed(section="run", scope=scope)

return PrepRunPhysical(bound_call_lookup, output_slot, process, plan)

Expand All @@ -123,18 +123,14 @@ def run_physical(
max_workers: Optional[int] = None,
max_errors: Optional[int] = 0,
scheduler: Optional[str] = None,
on_started: Optional[Callable[[Node], None]] = None,
on_completed: Optional[Callable[[Node], None]] = None,
on_failed: Optional[Callable[[Node, Exception], None]] = None,
progress_observer: ProgressObserver,
) -> Any:
_, output_slot, process, plan = prep_run_physical(
plan,
output_node=output_node,
retry=retry,
inplace=inplace,
on_started=on_started,
on_completed=on_completed,
on_failed=on_failed,
progress_observer=progress_observer,
)

run_function_on_graph(
Expand Down
26 changes: 26 additions & 0 deletions src/uberjob/_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright 2020 Two Sigma Open Source, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Tuple

from uberjob.graph import Graph, Node


def get_full_scope(graph: Graph, node: Node) -> Tuple:
node_data = graph.nodes[node]
return node_data["scope"] + node_data.get("implicit_scope", ())


__all__ = ["get_full_scope"]
118 changes: 10 additions & 108 deletions src/uberjob/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,127 +20,32 @@
from uberjob._errors import CallError
from uberjob._execution.run_function_on_graph import NodeError
from uberjob._execution.run_physical import run_physical
from uberjob._graph import get_full_scope
from uberjob._plan import Plan
from uberjob._registry import Registry
from uberjob._transformations import get_mutable_plan
from uberjob._transformations.caching import plan_with_value_stores
from uberjob._transformations.pruning import prune_plan
from uberjob._util import fully_qualified_name
from uberjob._util.retry import create_retry
from uberjob._util.validation import assert_is_callable, assert_is_instance
from uberjob.graph import Call, Graph, Node
from uberjob.graph import Call, Node
from uberjob.progress import (
Progress,
ProgressObserver,
composite_progress,
default_progress,
null_progress,
)


def _get_full_scope(graph: Graph, node: Node):
node_data = graph.nodes[node]
return node_data["scope"] + node_data.get("implicit_scope", ())


def _get_scope_call_counts(plan: Plan):
return collections.Counter(
_get_full_scope(plan.graph, node)
def _update_run_totals(plan: Plan, progress_observer: ProgressObserver) -> None:
scope_counts = collections.Counter(
get_full_scope(plan.graph, node)
for node in plan.graph.nodes()
if type(node) is Call
)


def _prepare_plan_with_registry_and_progress(
plan,
registry,
*,
output_node,
progress_observer,
max_workers,
retry,
fresh_time,
inplace,
):
graph = plan.graph

def get_stale_scope(node):
scope = _get_full_scope(graph, node)
value_store = registry.get(node)
if not value_store:
return scope
return (*scope, fully_qualified_name(value_store.__class__))

def on_started_stale_check(node):
progress_observer.increment_running(
section="stale", scope=get_stale_scope(node)
)

def on_completed_stale_check(node):
progress_observer.increment_completed(
section="stale", scope=get_stale_scope(node)
)

scope_counts = collections.Counter(
get_stale_scope(node) for node in plan.graph.nodes() if type(node) is Call
)
for scope, count in scope_counts.items():
progress_observer.increment_total(section="stale", scope=scope, amount=count)

return plan_with_value_stores(
plan,
registry,
output_node=output_node,
max_workers=max_workers,
retry=retry,
fresh_time=fresh_time,
inplace=inplace,
on_started=on_started_stale_check,
on_completed=on_completed_stale_check,
)


def _run_physical_with_progress(
plan,
*,
output_node,
progress_observer,
max_workers,
max_errors,
retry,
scheduler,
inplace,
):
graph = plan.graph

def on_started_run(node):
progress_observer.increment_running(
section="run", scope=_get_full_scope(graph, node)
)

def on_completed_run(node):
progress_observer.increment_completed(
section="run", scope=_get_full_scope(graph, node)
)

def on_failed_run(node, exception):
call_error = CallError(node)
call_error.__cause__ = exception
progress_observer.increment_failed(
section="run", scope=_get_full_scope(graph, node), exception=call_error
)

return run_physical(
plan,
output_node=output_node,
max_workers=max_workers,
max_errors=max_errors,
retry=retry,
scheduler=scheduler,
inplace=inplace,
on_started=on_started_run,
on_completed=on_completed_run,
on_failed=on_failed_run,
)
progress_observer.increment_total(section="run", scope=scope, amount=count)


def _coerce_progress(
Expand Down Expand Up @@ -251,7 +156,7 @@ def run(
try:
with progress_observer:
if registry:
plan, redirected_output_node = _prepare_plan_with_registry_and_progress(
plan, redirected_output_node = plan_with_value_stores(
plan,
registry,
output_node=output_node,
Expand All @@ -271,15 +176,12 @@ def run(
plan, redirected_output_node
)

for scope, count in _get_scope_call_counts(plan).items():
progress_observer.increment_total(
section="run", scope=scope, amount=count
)
_update_run_totals(plan, progress_observer)

if dry_run:
return plan, redirected_output_node

return _run_physical_with_progress(
return run_physical(
plan,
output_node=redirected_output_node,
progress_observer=progress_observer,
Expand Down
Loading

0 comments on commit c63b1e8

Please sign in to comment.