-
Notifications
You must be signed in to change notification settings - Fork 57
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fixes #7
- Loading branch information
Showing
12 changed files
with
411 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# DSL Sample | ||
|
||
This sample shows how to have a workflow interpret/invoke arbitrary steps defined in a DSL. It is similar to the DSL | ||
samples [in TypeScript](https://github.com/temporalio/samples-typescript/tree/main/dsl-interpreter) and | ||
[in Go](https://github.com/temporalio/samples-go/tree/main/dsl). | ||
|
||
For this sample, the optional `dsl` dependency group must be included. To include, run: | ||
|
||
poetry install --with dsl | ||
|
||
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the | ||
worker: | ||
|
||
poetry run python worker.py | ||
|
||
This will start the worker. Then, in another terminal, run the following to execute a workflow of steps defined in | ||
[workflow1.yaml](workflow1.yaml): | ||
|
||
poetry run python starter.py workflow1.yaml | ||
|
||
This will run the workflow and show the final variables that the workflow returns. Looking in the worker terminal, each | ||
step executed will be visible. | ||
|
||
Similarly we can do the same for the more advanced [workflow2.yaml](workflow2.yaml) file: | ||
|
||
poetry run python starter.py workflow2.yaml | ||
|
||
This sample gives a guide of how one can write a workflow to interpret arbitrary steps from a user-provided DSL. Many | ||
DSL models are more advanced and are more specific to conform to business logic needs. |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from temporalio import activity | ||
|
||
|
||
class DSLActivities: | ||
@activity.defn | ||
async def activity1(self, arg: str) -> str: | ||
activity.logger.info(f"Executing activity1 with arg: {arg}") | ||
return f"[result from activity1: {arg}]" | ||
|
||
@activity.defn | ||
async def activity2(self, arg: str) -> str: | ||
activity.logger.info(f"Executing activity2 with arg: {arg}") | ||
return f"[result from activity2: {arg}]" | ||
|
||
@activity.defn | ||
async def activity3(self, arg1: str, arg2: str) -> str: | ||
activity.logger.info(f"Executing activity3 with args: {arg1} and {arg2}") | ||
return f"[result from activity3: {arg1} {arg2}]" | ||
|
||
@activity.defn | ||
async def activity4(self, arg: str) -> str: | ||
activity.logger.info(f"Executing activity4 with arg: {arg}") | ||
return f"[result from activity4: {arg}]" | ||
|
||
@activity.defn | ||
async def activity5(self, arg1: str, arg2: str) -> str: | ||
activity.logger.info(f"Executing activity5 with args: {arg1} and {arg2}") | ||
return f"[result from activity5: {arg1} {arg2}]" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import asyncio | ||
import logging | ||
import sys | ||
import uuid | ||
|
||
import dacite | ||
import yaml | ||
from temporalio.client import Client | ||
|
||
from dsl.workflow import DSLInput, DSLWorkflow | ||
|
||
|
||
async def main(dsl_yaml: str) -> None: | ||
# Convert the YAML to our dataclass structure. We use PyYAML + dacite to do | ||
# this but it can be done any number of ways. | ||
dsl_input = dacite.from_dict(DSLInput, yaml.safe_load(dsl_yaml)) | ||
|
||
# Connect client | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Run workflow | ||
result = await client.execute_workflow( | ||
DSLWorkflow.run, | ||
dsl_input, | ||
id=f"dsl-workflow-id-{uuid.uuid4()}", | ||
task_queue="dsl-task-queue", | ||
) | ||
logging.info( | ||
f"Final variables:\n " | ||
+ "\n ".join((f"{k}: {v}" for k, v in result.items())) | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
# Require the YAML file as an argument. We read this _outside_ of the async | ||
# def function because thread-blocking IO should never happen in async def | ||
# functions. | ||
if len(sys.argv) != 2: | ||
raise RuntimeError("Expected single argument for YAML file") | ||
with open(sys.argv[1], "r") as yaml_file: | ||
dsl_yaml = yaml_file.read() | ||
|
||
# Run | ||
asyncio.run(main(dsl_yaml)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import asyncio | ||
import logging | ||
|
||
from temporalio.client import Client | ||
from temporalio.worker import Worker | ||
|
||
from dsl.activities import DSLActivities | ||
from dsl.workflow import DSLWorkflow | ||
|
||
interrupt_event = asyncio.Event() | ||
|
||
|
||
async def main(): | ||
# Connect client | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Run a worker for the activities and workflow | ||
activities = DSLActivities() | ||
async with Worker( | ||
client, | ||
task_queue="dsl-task-queue", | ||
activities=[ | ||
activities.activity1, | ||
activities.activity2, | ||
activities.activity3, | ||
activities.activity4, | ||
activities.activity5, | ||
], | ||
workflows=[DSLWorkflow], | ||
): | ||
# Wait until interrupted | ||
logging.info("Worker started, ctrl+c to exit") | ||
await interrupt_event.wait() | ||
logging.info("Shutting down") | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
loop = asyncio.new_event_loop() | ||
try: | ||
loop.run_until_complete(main()) | ||
except KeyboardInterrupt: | ||
interrupt_event.set() | ||
loop.run_until_complete(loop.shutdown_asyncgens()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
import dataclasses | ||
from dataclasses import dataclass | ||
from datetime import timedelta | ||
from typing import Any, Dict, List, Optional, Union | ||
|
||
from temporalio import workflow | ||
|
||
|
||
@dataclass | ||
class DSLInput: | ||
root: Statement | ||
variables: Dict[str, Any] = dataclasses.field(default_factory=dict) | ||
|
||
|
||
@dataclass | ||
class ActivityStatement: | ||
activity: ActivityInvocation | ||
|
||
|
||
@dataclass | ||
class ActivityInvocation: | ||
name: str | ||
arguments: List[str] = dataclasses.field(default_factory=list) | ||
result: Optional[str] = None | ||
|
||
|
||
@dataclass | ||
class SequenceStatement: | ||
sequence: Sequence | ||
|
||
|
||
@dataclass | ||
class Sequence: | ||
elements: List[Statement] | ||
|
||
|
||
@dataclass | ||
class ParallelStatement: | ||
parallel: Parallel | ||
|
||
|
||
@dataclass | ||
class Parallel: | ||
branches: List[Statement] | ||
|
||
|
||
Statement = Union[ActivityStatement, SequenceStatement, ParallelStatement] | ||
|
||
|
||
@workflow.defn | ||
class DSLWorkflow: | ||
@workflow.run | ||
async def run(self, input: DSLInput) -> Dict[str, Any]: | ||
self.variables = dict(input.variables) | ||
workflow.logger.info("Running DSL workflow") | ||
await self.execute_statement(input.root) | ||
workflow.logger.info("DSL workflow completed") | ||
return self.variables | ||
|
||
async def execute_statement(self, stmt: Statement) -> None: | ||
if isinstance(stmt, ActivityStatement): | ||
# Invoke activity loading arguments from variables and optionally | ||
# storing result as a variable | ||
result = await workflow.execute_activity( | ||
stmt.activity.name, | ||
args=[self.variables.get(arg, "") for arg in stmt.activity.arguments], | ||
start_to_close_timeout=timedelta(minutes=1), | ||
) | ||
if stmt.activity.result: | ||
self.variables[stmt.activity.result] = result | ||
elif isinstance(stmt, SequenceStatement): | ||
# Execute each statement in order | ||
for elem in stmt.sequence.elements: | ||
await self.execute_statement(elem) | ||
elif isinstance(stmt, ParallelStatement): | ||
# Execute all in parallel. Note, this will raise an exception when | ||
# the first activity fails and will not cancel the others. We could | ||
# store tasks and cancel if we wanted. In newer Python versions this | ||
# would use a TaskGroup instead. | ||
await asyncio.gather( | ||
*[self.execute_statement(branch) for branch in stmt.parallel.branches] | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# This sample workflows execute 3 steps in sequence. | ||
# 1) Activity1, takes arg1 as input, and put result as result1. | ||
# 2) Activity2, takes result1 as input, and put result as result2. | ||
# 3) Activity3, takes args2 and result2 as input, and put result as result3. | ||
|
||
variables: | ||
arg1: value1 | ||
arg2: value2 | ||
|
||
root: | ||
sequence: | ||
elements: | ||
- activity: | ||
name: activity1 | ||
arguments: | ||
- arg1 | ||
result: result1 | ||
- activity: | ||
name: activity2 | ||
arguments: | ||
- result1 | ||
result: result2 | ||
- activity: | ||
name: activity3 | ||
arguments: | ||
- arg2 | ||
- result2 | ||
result: result3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# This sample workflow executes 3 steps in sequence. | ||
# 1) activity1, takes arg1 as input, and put result as result1. | ||
# 2) it runs a parallel block which runs below sequence branches in parallel | ||
# 2.1) sequence 1 | ||
# 2.1.1) activity2, takes result1 as input, and put result as result2 | ||
# 2.1.2) activity3, takes arg2 and result2 as input, and put result as result3 | ||
# 2.2) sequence 2 | ||
# 2.2.1) activity4, takes result1 as input, and put result as result4 | ||
# 2.2.2) activity5, takes arg3 and result4 as input, and put result as result5 | ||
# 3) activity3, takes result3 and result5 as input, and put result as result6. | ||
|
||
variables: | ||
arg1: value1 | ||
arg2: value2 | ||
arg3: value3 | ||
|
||
root: | ||
sequence: | ||
elements: | ||
- activity: | ||
name: activity1 | ||
arguments: | ||
- arg1 | ||
result: result1 | ||
- parallel: | ||
branches: | ||
- sequence: | ||
elements: | ||
- activity: | ||
name: activity2 | ||
arguments: | ||
- result1 | ||
result: result2 | ||
- activity: | ||
name: activity3 | ||
arguments: | ||
- arg2 | ||
- result2 | ||
result: result3 | ||
- sequence: | ||
elements: | ||
- activity: | ||
name: activity4 | ||
arguments: | ||
- result1 | ||
result: result4 | ||
- activity: | ||
name: activity5 | ||
arguments: | ||
- arg3 | ||
- result4 | ||
result: result5 | ||
- activity: | ||
name: activity3 | ||
arguments: | ||
- result3 | ||
- result5 | ||
result: result6 |
Oops, something went wrong.