Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[not-for-merge] Towards statically type-checking parsl #1676

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a4c1f0d
Remove broken hard-coded usage tracking IP address.
benclifford Feb 14, 2024
647fc05
upgrade mypy
benclifford Feb 16, 2024
add3210
Remove unused codepath from HighThroughputExecutor scale_in
benclifford Jan 31, 2024
c71b0a5
this test only tests the behaviour from simple strategy, so turn it i…
benclifford Feb 14, 2024
5fbdfd6
test partial scaling-in (the scaling in the comes from one block bein…
benclifford Feb 14, 2024
ca2d889
uses human readable names not numeric indices for scale in code
benclifford Jan 31, 2024
2613105
make scale in pick longest idle blocks in preference to least idle bl…
benclifford Jan 31, 2024
e0e28d5
scale-in should consider all known blocks, not just blocks that have …
benclifford Jan 31, 2024
7957628
fiddle with submiterror
benclifford Nov 20, 2023
2e898da
make storage access a sequence for type-checker goodness
benclifford Nov 15, 2023
d002313
fiddling with wqtaskfailrue exception
benclifford Oct 14, 2023
5e0f764
connected workers is used in strategy (and exposed as part of executo…
benclifford Sep 15, 2023
4fe55c4
serialization performance test client
benclifford Jun 29, 2023
0b4d9b1
Prohibit adding duplicate-labelled executors
benclifford May 16, 2023
e8683b6
Add type annotations to common test configurations
benclifford May 10, 2023
c1bbb8e
htex doesn't shut down its queue management thread at shutdown
benclifford May 1, 2023
59972ca
Form WQ worker_command in __init__ to make type visible to mypy
benclifford Jan 19, 2023
9905fb2
add invocation type annotations to see how well type checking of deco…
benclifford Mar 10, 2023
f27715d
Add typing for dfk.submit()
benclifford Feb 10, 2023
baaec6b
more wq typechecking - add asserts to reflect assumed behaviour
benclifford Jan 19, 2023
9104615
move port mailbox into init so it can be seen by typechecker
benclifford Mar 21, 2023
3f80d8a
Squash benc-mypy history
benclifford Jan 9, 2023
6f5b7a6
Add type annotations to common test configurations
benclifford May 10, 2023
db1575b
remove None-type from provider API
benclifford Nov 20, 2023
043421e
marker for what i really expect to work
benclifford Aug 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ clean_coverage:

.PHONY: mypy
mypy: ## run mypy checks
MYPYPATH=$(CWD)/mypy-stubs mypy parsl/
PYTHONPATH=$(CWD)/mypy-plugins:$(PYTHONPATH) MYPYPATH=$(CWD)/mypy-stubs mypy --no-incremental parsl/ --show-traceback

.PHONY: local_thread_test
local_thread_test: ## run all tests with local_thread config
Expand Down
56 changes: 56 additions & 0 deletions mypy-plugins/parsl_mypy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from mypy.plugin import FunctionContext, Plugin
from mypy.types import Type
import mypy.nodes as nodes

def plugin(v):
return ParslMypyPlugin

class ParslMypyPlugin(Plugin):
def get_type_analyze_hook(self, t):
# print("BENC: gtah t={}".format(t))
return None

def get_function_hook(self, f):
if f == "parsl.app.app.python_appXXXX":
return python_app_function_hook
else:
return None

def python_app_function_hook(ctx: FunctionContext) -> Type:
print("inside python_app function_hook")
print("ctx = {}".format(ctx))

# if python_app is being called with a function parameter (rather than
# None, the default) then the return type of the python_app decorator
# is a variation (with a Future added on the type of the decorated
# function...)

if ctx.callee_arg_names[0] == "function": # will this always be at position 0? probably fragile to assume so, but this code does make that assumption
print(f"python_app called with a function supplied: {ctx.args[0]}")
function_node = ctx.args[0][0]
print(f"function node repr is {repr(function_node)} with type {type(function_node)}")

# return the type of function_node - actually it needs modifying to have the Future wrapper added....
if isinstance(function_node, nodes.TempNode):
print(f"temporary node has type {function_node.type}")
print(f"Python type of tempnode.type is {type(function_node.type)}")
print(ctx.api)
# return_type = ctx.api.named_type_or_none("concurrent.futures.Future", [function_node.type.ret_type])
# return_type = ctx.api.named_generic_type("concurrent.futures.Future", [function_node.type.ret_type])
# return_type = ctx.api.named_generic_type("builtins.list", [function_node.type.ret_type])
return_type = function_node.type.ret_type
# return_type = ctx.default_return_type
print(f"plugin chosen return type is {return_type}")
return function_node.type.copy_modified(ret_type=return_type)
else:
print("function node is not specified as something this plugin understands")
return_type = ctx.default_return_type
return return_type
else:
print("python_app called without a function supplied")
# TODO: this should return a type that is aligned with the implementation:
# it's the type of the decorator, assuming that it will definitely be given
# a function this time? or something along those lines...

print("will return ctx.default_return_type = {}".format(ctx.default_return_type))
return ctx.default_return_type
87 changes: 66 additions & 21 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
plugins = sqlalchemy.ext.mypy.plugin
plugins = sqlalchemy.ext.mypy.plugin, parsl_mypy

enable_error_code = ignore-without-code
no_implicit_reexport = True
Expand All @@ -9,6 +9,8 @@ no_implicit_optional = True

strict_equality = True
warn_unused_ignores = True

# there are some exceptions to this even in the more-strongly-typed sections
warn_unreachable = True

[mypy-non_existent.*]
Expand Down Expand Up @@ -55,11 +57,55 @@ disallow_any_expr = True
disallow_any_decorated = True

[mypy-parsl.providers.base.*]
disallow_untyped_defs = True
disallow_untyped_decorators = True
check_untyped_defs = True
disallow_subclassing_any = True


# modules to be checked mostly more strongly than default:

[mypy-parsl.dataflow.dflow.*]
disallow_untyped_defs = True

[mypy-parsl.jobs.*]
disallow_untyped_defs = True
disallow_any_decorated = True

[mypy-parsl.jobs.strategy.*]
disallow_untyped_defs = True
disallow_any_decorated = True

[mypy-parsl.jobs.job_status_poller.*]
disallow_untyped_defs = True

# merge of #1877 introduced stuff that violates this so disabling pending perhaps further investigation
# disallow_any_expr = True

disallow_any_decorated = True

[mypy-parsl.config.*]
disallow_untyped_defs = True
# Any has to be allowed because TaskRecord now forms part of the type signature of config,
# and task record has Any from the type of tasks args
#disallow_any_expr = True
#disallow_any_decorated = True

[mypy-parsl.channels.base.*]
disallow_untyped_defs = True
disallow_any_expr = True

[mypy-parsl.channels.ssh.*]
disallow_untyped_defs = True

[mypy-parsl.launchers.*]
disallow_untyped_defs = True
disallow_any_decorated = True

[mypy-parsl.executors.base.*]
disallow_untyped_defs = True
disallow_any_expr = True

[mypy-parsl.serialize.*]
disallow_untyped_decorators = True
check_untyped_defs = True
Expand All @@ -70,13 +116,23 @@ disallow_untyped_defs = True
# parsl/serialize/proxystore.py:9: error: Class cannot subclass "Pickler" (has type "Any")
disallow_subclassing_any = False

[mypy-parsl.executors.base.*]
disallow_untyped_defs = True
disallow_any_expr = True

[mypy-parsl.executors.high_throughput.interchange.*]
disallow_untyped_defs = True

# modules to be checked more weakly than default:

[mypy-parsl.executors.flux.*]
ignore_errors = True

[mypy-parsl.executors.workqueue.*]
check_untyped_defs = True

# this is generated object code: parsl_coprocess_stub
# should still be type checked, and we then rely on the
# work queue code generator to generate valid code.
[mypy-parsl.executors.workqueue.parsl_coprocess.*]
ignore_errors = True

[mypy-parsl.monitoring.*]
disallow_untyped_decorators = True
check_untyped_defs = True
Expand All @@ -88,6 +144,9 @@ disallow_untyped_defs = True
[mypy-parsl.monitoring.visualization.*]
ignore_errors = True

[mypy-parsl.tests]
check_untyped_defs = True

[mypy-parsl.tests.configs.local_user_opts]
ignore_missing_imports = True

Expand All @@ -101,6 +160,8 @@ warn_unreachable = True
[mypy-parsl.utils]
disallow_untyped_defs = True

# imports from elsewhere that there are no stubs for:

[mypy-flask_sqlalchemy.*]
ignore_missing_imports = True

Expand Down Expand Up @@ -164,22 +225,6 @@ ignore_missing_imports = True
[mypy-zmq.*]
ignore_missing_imports = True

[mypy-mpi4py.*]
ignore_missing_imports = True

[mypy-flask.*]
ignore_missing_imports = True

# this is an internal undocumentated package
# of multiprocessing - trying to get Event
# to typecheck in monitoring, but it's not
# a top level class as far as mypy is concerned.
# but... when commented out seems ok?
# so lets see when happens when I try to merge
# in clean CI
#[mypy-multiprocessing.synchronization.*]
#ignore_missing_imports = True

[mypy-pandas.*]
ignore_missing_imports = True

Expand Down
29 changes: 23 additions & 6 deletions parsl/app/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@
from inspect import signature, Parameter
import logging

from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from typing_extensions import Literal

from parsl.dataflow.futures import AppFuture
from parsl.app.errors import wrap_error
from parsl.app.app import AppBase
from parsl.dataflow.dflow import DataFlowKernelLoader
from parsl.dataflow.dflow import DataFlowKernel, DataFlowKernelLoader

logger = logging.getLogger(__name__)


def remote_side_bash_executor(func, *args, **kwargs):
def remote_side_bash_executor(func: Callable[..., str], *args, **kwargs) -> int:
"""Executes the supplied function with *args and **kwargs to get a
command-line to run, and then run that command-line using bash.
"""
import os
import subprocess
from typing import List, cast
import parsl.app.errors as pe
from parsl.data_provider.files import File
from parsl.utils import get_std_fname_mode

if hasattr(func, '__name__'):
Expand Down Expand Up @@ -88,7 +94,8 @@ def open_std_fd(fdname):
# TODO : Add support for globs here

missing = []
for outputfile in kwargs.get('outputs', []):
outputs = cast(List[File], kwargs.get('outputs', []))
for outputfile in outputs:
fpath = outputfile.filepath

if not os.path.exists(fpath):
Expand All @@ -102,7 +109,10 @@ def open_std_fd(fdname):

class BashApp(AppBase):

def __init__(self, func, data_flow_kernel=None, cache=False, executors='all', ignore_for_cache=None):
def __init__(self, func: Callable[..., str], data_flow_kernel: Optional[DataFlowKernel] = None,
cache: bool = False,
executors: Union[List[str], Literal['all']] = 'all',
ignore_for_cache: Optional[Sequence[str]] = None) -> None:
super().__init__(func, data_flow_kernel=data_flow_kernel, executors=executors, cache=cache, ignore_for_cache=ignore_for_cache)
self.kwargs = {}

Expand All @@ -120,10 +130,16 @@ def __init__(self, func, data_flow_kernel=None, cache=False, executors='all', ig
# this is done to avoid passing a function type in the args which parsl.serializer
# doesn't support
remote_fn = partial(update_wrapper(remote_side_bash_executor, self.func), self.func)
remote_fn.__name__ = self.func.__name__

# parsl/app/bash.py:145: error: "partial[Any]" has no attribute "__name__"
# but... other parts of the code are relying on getting the __name__
# of (?) an arbitrary Callable too (which is why we're setting the __name__
# at all)
remote_fn.__name__ = self.func.__name__ # type: ignore[attr-defined]

self.wrapped_remote_function = wrap_error(remote_fn)

def __call__(self, *args, **kwargs):
def __call__(self, *args, **kwargs) -> AppFuture:
"""Handle the call to a Bash app.

Args:
Expand All @@ -136,6 +152,7 @@ def __call__(self, *args, **kwargs):
App_fut

"""
invocation_kwargs: Dict[str, Any]
invocation_kwargs = {}
invocation_kwargs.update(self.kwargs)
invocation_kwargs.update(kwargs)
Expand Down
5 changes: 3 additions & 2 deletions parsl/app/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from parsl.app.errors import wrap_error
from parsl.dataflow.dflow import DataFlowKernelLoader
from parsl.utils import AutoCancelTimer
from parsl.dataflow.futures import AppFuture


logger = logging.getLogger(__name__)
Expand All @@ -36,7 +37,7 @@ def inject_exception(thread):
class PythonApp(AppBase):
"""Extends AppBase to cover the Python App."""

def __init__(self, func, data_flow_kernel=None, cache=False, executors='all', ignore_for_cache=None, join=False):
def __init__(self, func, data_flow_kernel=None, cache=False, executors='all', ignore_for_cache=None, join: bool = False) -> None:
super().__init__(
wrap_error(func),
data_flow_kernel=data_flow_kernel,
Expand All @@ -46,7 +47,7 @@ def __init__(self, func, data_flow_kernel=None, cache=False, executors='all', ig
)
self.join = join

def __call__(self, *args, **kwargs):
def __call__(self, *args, **kwargs) -> AppFuture:
"""This is where the call to a python app is handled.

Args:
Expand Down
73 changes: 73 additions & 0 deletions parsl/benchmark/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import time
import parsl.serialize as s
import dill
import pickle


def prof(serializer, deserializer):
tot_iters = 100000

t_start = time.time()
for _ in range(0, tot_iters):
pass
t_end = time.time()

print(f"time for null loop: {t_end - t_start}s")
print(f"time per iteration: {(t_end - t_start) / tot_iters}s")

t_start = time.time()
for _ in range(0, tot_iters):
serializer(7)
t_end = time.time()

print(f"time for serialize 7 loop: {t_end - t_start}s")
print(f"time per iteration: {(t_end - t_start) / tot_iters}s")

t_start = time.time()
for n in range(0, tot_iters):
serializer(n)
t_end = time.time()

print(f"time for serialize all ns loop: {t_end - t_start}s")
print(f"time per iteration: {(t_end - t_start) / tot_iters}s")

t_start = time.time()
for _ in range(0, tot_iters):
serializer("hello")
t_end = time.time()

print(f"time for serialize hello loop: {t_end - t_start}s")
print(f"time per iteration: {(t_end - t_start) / tot_iters}s")

def f():
"""This is a test function to be serialized"""
return 100

try:
t_start = time.time()
for _ in range(0, tot_iters):
serializer(f)
t_end = time.time()

print(f"time for serialize f loop: {t_end - t_start}s")
print(f"time per iteration: {(t_end - t_start) / tot_iters}s")
except Exception as e:
print(f"Exception in serialize f loop: {e}")

t_start = time.time()
for n in range(0, tot_iters):
deserializer(serializer(n))
t_end = time.time()

print(f"time for serialize/deserialize all ns loop: {t_end - t_start}s")
print(f"time per iteration: {(t_end - t_start) / tot_iters}s")


if __name__ == "__main__":
print("parsl serialization benchmark")
print("parsl.serialize:")
prof(s.serialize, s.deserialize)
print("dill.dumps:")
prof(dill.dumps, dill.loads)
print("pickle.dumps:")
prof(pickle.dumps, pickle.loads)
Loading
Loading