Skip to content

Commit

Permalink
debugging test utils
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Dec 21, 2024
1 parent b4d2b16 commit fb86964
Showing 1 changed file with 38 additions and 46 deletions.
84 changes: 38 additions & 46 deletions pydra/engine/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import pytest
from fileformats.generic import File
from ..submitter import Submitter
from pydra.design import workflow
from pydra import mark
from pydra.design import workflow, python


need_docker = pytest.mark.skipif(
Expand Down Expand Up @@ -56,12 +55,12 @@ def result_submitter(shell_task, plugin):
DOT_FLAG = False


@mark.task
@python.define
def op_4var(a, b, c, d) -> str:
return f"{a} {b} {c} {d}"


@mark.task
@python.define
def fun_addtwo(a: int) -> int:
import time

Expand All @@ -71,7 +70,7 @@ def fun_addtwo(a: int) -> int:
return a + 2


@mark.task
@python.define
def fun_addtwo_notype(a):
import time

Expand All @@ -81,7 +80,7 @@ def fun_addtwo_notype(a):
return a + 2


@mark.task
@python.define
def fun_addtwo_with_threadcount(a: int, sgeThreads: int = 1) -> int:
import time

Expand All @@ -91,157 +90,152 @@ def fun_addtwo_with_threadcount(a: int, sgeThreads: int = 1) -> int:
return a + 2


@mark.task
@python.define
def fun_addvar(
a: ty.Union[int, float], b: ty.Union[int, float]
) -> ty.Union[int, float]:
return a + b


@mark.task
@python.define
def fun_addvar_notype(a, b):
return a + b


@mark.task
@mark.annotate({"return": {"sum": float, "sub": float}})
@python.define(outputs={"sum": float, "sub": float})
def fun_addsubvar(a: float, b: float):
return a + b, a - b


@mark.task
@python.define
def fun_addvar_none(a: int, b: ty.Optional[int]) -> int:
if b is None:
return a
else:
return a + b


@mark.task
@python.define
def fun_addvar_default(a: int, b: int = 1) -> int:
return a + b


@mark.task
@python.define
def fun_addvar_default_notype(a, b=1):
return a + b


@mark.task
@python.define
def fun_addvar3(a: int, b: int, c: int) -> int:
return a + b + c


@mark.task
@python.define
def fun_addvar4(a: int, b: int, c: int, d: int) -> int:
return a + b + c + d


@mark.task
@python.define
def moment(lst: ty.List[float], n: float) -> float:
return sum([i**n for i in lst]) / len(lst)


@mark.task
@python.define
def fun_div(a: ty.Union[int, float], b: ty.Union[int, float]) -> float:
return a / b


@mark.task
@python.define
def multiply(x: int, y: int) -> int:
return x * y


@mark.task
@python.define
def multiply_list(x: list, y: int) -> list:
return x * y


@mark.task
@python.define
def multiply_mixed(x: list, y: int) -> list:
return x * y


@mark.task
@python.define
def add2(x: int) -> int:
if x == 1 or x == 12:
time.sleep(1)
return x + 2


@mark.task
@python.define
def raise_xeq1(x: int) -> int:
if x == 1:
raise Exception("x is 1, so i'm raising an exception!")
return x


@mark.task
@mark.annotate({"return": {"out_add": float, "out_sub": float}})
@python.define(outputs={"out_add": float, "out_sub": float})
def add2_sub2_res(res):
"""function that takes entire output as an input"""
return res["out"] + 2, res["out"] - 2


@mark.task
@mark.annotate({"return": {"out_add": ty.List[float], "out_sub": ty.List[float]}})
@python.define(outputs={"out_add": ty.List[float], "out_sub": ty.List[float]})
def add2_sub2_res_list(res):
"""function that takes entire output as an input"""
return [r["out"] + 2 for r in res], [r["out"] - 2 for r in res]


@mark.task
@python.define
def power(a: int, b: int) -> int:
return a**b


@mark.task
@python.define
def identity(x):
return x


@mark.task
def identity_2flds(
x1, x2
) -> ty.NamedTuple("Output", [("out1", ty.Any), ("out2", ty.Any)]):
@python.define(outputs={"out1": ty.Any, "out2": ty.Any})
def identity_2flds(x1, x2):
return x1, x2


@mark.task
@python.define
def ten(x) -> int:
return 10


@mark.task
@python.define
def add2_wait(x: int) -> int:
time.sleep(2)
return x + 2


@mark.task
@python.define
def list_output(x: int) -> ty.List[int]:
return [x, 2 * x, 3 * x]


@mark.task
@python.define
def list_sum(x: ty.Sequence[ty.Union[int, float]]) -> ty.Union[int, float]:
return sum(x)


@mark.task
@python.define
def fun_dict(d: dict) -> str:
kv_list = [f"{k}:{v}" for (k, v) in d.items()]
return "_".join(kv_list)


@mark.task
@python.define
def fun_write_file(filename: Path, text="hello") -> File:
with open(filename, "w") as f:
f.write(text)
return File(filename)


@mark.task
@python.define
def fun_write_file_list(
filename_list: ty.List[ty.Union[str, File, Path]], text="hi"
) -> ty.List[File]:
Expand All @@ -252,7 +246,7 @@ def fun_write_file_list(
return filename_list


@mark.task
@python.define
def fun_write_file_list2dict(
filename_list: ty.List[ty.Union[str, File, Path]], text="hi"
) -> ty.Dict[str, ty.Union[File, int]]:
Expand All @@ -266,14 +260,14 @@ def fun_write_file_list2dict(
return filename_dict


@mark.task
@python.define
def fun_file(filename: File):
with open(filename) as f:
txt = f.read()
return txt


@mark.task
@python.define
def fun_file_list(filename_list: ty.List[File]):
txt_list = []
for filename in filename_list:
Expand Down Expand Up @@ -349,14 +343,12 @@ def Workflow(x):
return Workflow(x=5)


@mark.task
@mark.annotate({"return": {"sum": int, "products": ty.List[int]}})
@python.define(outputs={"sum": int, "products": ty.List[int]})
def list_mult_sum(scalar: int, in_list: ty.List[int]) -> ty.Tuple[int, ty.List[int]]:
products = [scalar * x for x in in_list]
return functools.reduce(operator.add, products, 0), products


@mark.task
@mark.annotate({"return": {"x": str, "y": int, "z": float}})
@python.define(outputs={"x": str, "y": int, "z": float})
def foo(a: str, b: int, c: float) -> ty.Tuple[str, int, float]:
return a, b, c

0 comments on commit fb86964

Please sign in to comment.