Skip to content

Commit

Permalink
finished off the workflow tutorial for now
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Dec 28, 2024
1 parent a8ed05e commit 8e88ed2
Showing 1 changed file with 86 additions and 8 deletions.
94 changes: 86 additions & 8 deletions new-docs/source/tutorial/workflow.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Workflow design"
"# Workflow design\n",
"\n",
"In Pydra, workflows are DAG of component tasks to be executed on specified inputs.\n",
"Workflow specifications are dataclasses, which interchangeable with Python and shell tasks\n",
"specifications and executed in the same way."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Given two task specifications, `Add` and `Mul`"
"## Constructor functions\n",
"\n",
"Workflows are typically defined using the `pydra.design.workflow.define` decorator on \n",
"a \"constructor\" function that generates the workflow. For example, given two task\n",
"specifications, `Add` and `Mul`."
]
},
{
Expand Down Expand Up @@ -47,7 +55,7 @@
"outputs": [],
"source": [
"@workflow.define\n",
"def MyTestWorkflow(a, b):\n",
"def BasicWorkflow(a, b):\n",
" add = workflow.add(Add(a=a, b=b))\n",
" mul = workflow.add(Mul(a=add.out, b=b))\n",
" return mul.out"
Expand All @@ -72,7 +80,7 @@
"from fileformats import image, video\n",
"\n",
"@workflow.define\n",
"def MyTestShellWorkflow(\n",
"def ShellWorkflow(\n",
" input_video: video.Mp4,\n",
" watermark: image.Png,\n",
" watermark_dims: tuple[int, int] = (10, 10),\n",
Expand Down Expand Up @@ -209,7 +217,7 @@
" return float(value)\n",
"\n",
"@workflow.define\n",
"class MyLibraryWorkflow(WorkflowSpec[\"MyLibraryWorkflow.Outputs\"]):\n",
"class LibraryWorkflow(WorkflowSpec[\"MyLibraryWorkflow.Outputs\"]):\n",
"\n",
" a: int\n",
" b: float = workflow.arg(\n",
Expand Down Expand Up @@ -248,7 +256,7 @@
" return sum(x)\n",
"\n",
"@workflow.define\n",
"def MySplitWorkflow(a: list[int], b: list[float]) -> list[float]:\n",
"def SplitWorkflow(a: list[int], b: list[float]) -> list[float]:\n",
" # Multiply over all combinations of the elements of a and b, then combine the results\n",
" # for each a element into a list over each b element\n",
" mul = workflow.add(Mul()).split(x=a, y=b).combine(\"x\")\n",
Expand All @@ -271,7 +279,7 @@
"outputs": [],
"source": [
"@workflow.define\n",
"def MySplitThenCombineWorkflow(a: list[int], b: list[float], c: float) -> list[float]:\n",
"def SplitThenCombineWorkflow(a: list[int], b: list[float], c: float) -> list[float]:\n",
" mul = workflow.add(Mul()).split(x=a, y=b)\n",
" add = workflow.add(Add(x=mul.out, y=c)).combine(\"Mul.x\")\n",
" sum = workflow.add(Sum(x=add.out))\n",
Expand Down Expand Up @@ -301,7 +309,7 @@
"outputs": [],
"source": [
"@workflow.define\n",
"def MyConditionalWorkflow(\n",
"def ConditionalWorkflow(\n",
" input_video: video.Mp4,\n",
" watermark: image.Png,\n",
" watermark_dims: tuple[int, int] | None = None,\n",
Expand Down Expand Up @@ -371,6 +379,76 @@
"placeholders see [Conditional construction](../explanation/conditional-lazy.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Typing\n",
"\n",
"Pydra utilizes Python type annotations to implement strong type-checking, which is performed\n",
"when values or upstream outputs are assigned to task specification inputs.\n",
"\n",
"Task input and output fields do not need to be assigned types, since they will default to `typing.Any`.\n",
"However, if they are assigned a type and a value or output from an upstream node conflicts\n",
"with the type, a `TypeError` will be raised at construction time.\n",
"\n",
"Note that the type-checking \"assumes the best\", and will pass if the upstream field is typed\n",
"by `Any` or a super-class of the field being assigned to. For example, an input of\n",
"`fileformats.generic.File` passed to a field expecting a `fileformats.image.Png` file type,\n",
"because `Png` is a subtype of `File`, where as `fileformats.image.Jpeg` input would fail\n",
"since it is clearly not the intended type.\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"from fileformats import generic\n",
"\n",
"Mp4Handbrake = shell.define(\n",
" \"HandBrakeCLI -i <in_video:video/mp4> -o <out|out_video:video/mp4> \"\n",
" \"--width <width:int> --height <height:int>\",\n",
")\n",
"\n",
"\n",
"QuicktimeHandbrake = shell.define(\n",
" \"HandBrakeCLI -i <in_video:video/quicktime> -o <out|out_video:video/quicktime> \"\n",
" \"--width <width:int> --height <height:int>\",\n",
")\n",
"\n",
"@workflow.define\n",
"def TypeErrorWorkflow(\n",
" input_video: video.Mp4,\n",
" watermark: generic.File,\n",
" watermark_dims: tuple[int, int] = (10, 10),\n",
") -> video.Mp4:\n",
"\n",
" add_watermark = workflow.add(\n",
" shell.define(\n",
" \"ffmpeg -i <in_video> -i <watermark:image/png> \"\n",
" \"-filter_complex <filter> <out|out_video:video/mp4>\"\n",
" )(\n",
" in_video=input_video,\n",
" watermark=watermark, # Type is OK because generic.File is superclass of image.Png\n",
" filter=\"overlay={}:{}\".format(*watermark_dims),\n",
" ),\n",
" name=\"add_watermark\",\n",
" )\n",
"\n",
" try:\n",
" handbrake = workflow.add(\n",
" QuicktimeHandbrake(in_video=add_watermark.out_video, width=1280, height=720),\n",
" ) # This will raise a TypeError because the input video is an Mp4\n",
" except TypeError:\n",
" handbrake = workflow.add(\n",
" Mp4Handbrake(in_video=add_watermark.out_video, width=1280, height=720),\n",
" ) # The type of the input video is now correct\n",
"\n",
" return handbrake.output_video"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down

0 comments on commit 8e88ed2

Please sign in to comment.