-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gevent sample #84
Gevent sample #84
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Gevent Sample | ||
|
||
This sample shows how to run Temporal in an environment that gevent has patched. | ||
|
||
Gevent is built to patch Python libraries to attempt to seamlessly convert threaded code into coroutine-based code. | ||
However, it is well known within the gevent community that it does not work well with `asyncio`, which is the modern | ||
Python approach to coroutines. Temporal leverages `asyncio` which means by default it is incompatible with gevent. Users | ||
are encouraged to abandon gevent in favor of more modern approaches where they can but it is not always possible. | ||
|
||
This sample shows how to use a customized gevent executor to run `asyncio` Temporal clients, workers, activities, and | ||
workflows. | ||
|
||
For this sample, the optional `gevent` dependency group must be included. To include, run: | ||
|
||
poetry install --with gevent | ||
|
||
To run the sample, first see [README.md](../README.md) for prerequisites such as having a localhost Temporal server | ||
running. Then, run the following from this directory to start the worker: | ||
|
||
poetry run python worker.py | ||
|
||
This will start the worker. The worker has a workflow and two activities, one `asyncio` based and one gevent based. Now | ||
in another terminal, run the following from this directory to execute the workflow: | ||
|
||
poetry run python starter.py | ||
|
||
The workflow should run and complete with the hello result. Note on the worker terminal there will be logs of the | ||
workflow and activity executions. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from dataclasses import dataclass | ||
|
||
import gevent | ||
from temporalio import activity | ||
|
||
|
||
@dataclass | ||
class ComposeGreetingInput: | ||
greeting: str | ||
name: str | ||
|
||
|
||
@activity.defn | ||
async def compose_greeting_async(input: ComposeGreetingInput) -> str: | ||
activity.logger.info(f"Running async activity with parameter {input}") | ||
return f"{input.greeting}, {input.name}!" | ||
|
||
|
||
@activity.defn | ||
def compose_greeting_sync(input: ComposeGreetingInput) -> str: | ||
activity.logger.info( | ||
f"Running sync activity with parameter {input}, " | ||
f"in greenlet: {gevent.getcurrent()}" | ||
) | ||
return f"{input.greeting}, {input.name}!" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import functools | ||
from concurrent.futures import Future | ||
from typing import Callable, TypeVar | ||
|
||
from gevent import threadpool | ||
from typing_extensions import ParamSpec | ||
|
||
T = TypeVar("T") | ||
P = ParamSpec("P") | ||
|
||
|
||
class GeventExecutor(threadpool.ThreadPoolExecutor): | ||
def submit( | ||
self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs | ||
) -> Future[T]: | ||
# Gevent's returned futures do not map well to Python futures, so we | ||
# must translate. We can't just use set_result/set_exception because | ||
# done callbacks are not always called in gevent's case and it doesn't | ||
# seem to support cancel, so we instead wrap the caller function. | ||
Comment on lines
+16
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [non-blocking/optional] I think it might be clearer to explicitly state "Gevent's returned futures do not implement all of the things expected by |
||
python_fut: Future[T] = Future() | ||
|
||
@functools.wraps(fn) | ||
def wrapper(*w_args: P.args, **w_kwargs: P.kwargs) -> None: | ||
try: | ||
result = fn(*w_args, **w_kwargs) | ||
# Swallow InvalidStateError in case Python future was cancelled | ||
try: | ||
python_fut.set_result(result) | ||
except: | ||
pass | ||
except Exception as exc: | ||
# Swallow InvalidStateError in case Python future was cancelled | ||
try: | ||
python_fut.set_exception(exc) | ||
except: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this might end up in production code, I suggest catching the specific exception that you indicate in the comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically we support 3.7+ and this class didn't get added until 3.8 and we try to write our code for all versions. Adding this makes the 3.7 CI angry (even though technically this sample only works with 3.8 because gevent only works with 3.8). |
||
pass | ||
|
||
# Submit our wrapper to gevent | ||
super().submit(wrapper, *args, **kwargs) | ||
# Return Python future to user | ||
return python_fut |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# Init gevent | ||
from gevent import monkey | ||
|
||
monkey.patch_all() | ||
|
||
import asyncio | ||
import logging | ||
|
||
from temporalio.client import Client | ||
|
||
from gevent_async import workflow | ||
from gevent_async.executor import GeventExecutor | ||
|
||
|
||
def main(): | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
# Create single-worker gevent executor and run asyncio.run(async_main()) in | ||
# it, waiting for result. This executor cannot be used for anything else in | ||
# Temporal, it is just a single thread for running asyncio. | ||
with GeventExecutor(max_workers=1) as executor: | ||
executor.submit(asyncio.run, async_main()).result() | ||
|
||
|
||
async def async_main(): | ||
# Connect client | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Run workflow | ||
result = await client.execute_workflow( | ||
workflow.GreetingWorkflow.run, | ||
"Temporal", | ||
id="gevent_async-workflow-id", | ||
task_queue="gevent_async-task-queue", | ||
) | ||
logging.info(f"Workflow result: {result}") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# Init gevent | ||
from gevent import monkey | ||
|
||
monkey.patch_all() | ||
|
||
import asyncio | ||
import logging | ||
|
||
from temporalio.testing import WorkflowEnvironment | ||
from temporalio.worker import Worker | ||
|
||
from gevent_async import activity, workflow | ||
from gevent_async.executor import GeventExecutor | ||
|
||
# This basically combines ../worker.py and ../starter.py for use by CI to | ||
# confirm this works in all environments | ||
|
||
|
||
def main(): | ||
logging.basicConfig(level=logging.INFO) | ||
with GeventExecutor(max_workers=1) as executor: | ||
executor.submit(asyncio.run, async_main()).result() | ||
|
||
|
||
async def async_main(): | ||
logging.info("Starting local server") | ||
async with await WorkflowEnvironment.start_local() as env: | ||
logging.info("Starting worker") | ||
with GeventExecutor(max_workers=200) as executor: | ||
async with Worker( | ||
env.client, | ||
task_queue="gevent_async-task-queue", | ||
workflows=[workflow.GreetingWorkflow], | ||
activities=[ | ||
activity.compose_greeting_async, | ||
activity.compose_greeting_sync, | ||
], | ||
activity_executor=executor, | ||
workflow_task_executor=executor, | ||
max_concurrent_activities=100, | ||
max_concurrent_workflow_tasks=100, | ||
): | ||
logging.info("Running workflow") | ||
result = await env.client.execute_workflow( | ||
workflow.GreetingWorkflow.run, | ||
"Temporal", | ||
id="gevent_async-workflow-id", | ||
task_queue="gevent_async-task-queue", | ||
) | ||
if result != "Hello, Temporal!": | ||
raise RuntimeError(f"Unexpected result: {result}") | ||
logging.info(f"Workflow complete, result: {result}") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
# Init gevent | ||
from gevent import monkey | ||
|
||
monkey.patch_all() | ||
|
||
import asyncio | ||
import logging | ||
import signal | ||
|
||
import gevent | ||
from temporalio.client import Client | ||
from temporalio.worker import Worker | ||
|
||
from gevent_async import activity, workflow | ||
from gevent_async.executor import GeventExecutor | ||
|
||
|
||
def main(): | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
# Create single-worker gevent executor and run asyncio.run(async_main()) in | ||
# it, waiting for result. This executor cannot be used for anything else in | ||
# Temporal, it is just a single thread for running asyncio. This means that | ||
# inside of async_main we must create another executor specifically for | ||
# executing activity and workflow tasks. | ||
with GeventExecutor(max_workers=1) as executor: | ||
executor.submit(asyncio.run, async_main()).result() | ||
|
||
|
||
async def async_main(): | ||
# Create ctrl+c handler. We do this by telling gevent on SIGINT to set the | ||
# asyncio event. But asyncio calls are not thread safe, so we have to invoke | ||
# it via call_soon_threadsafe. | ||
interrupt_event = asyncio.Event() | ||
gevent.signal_handler( | ||
signal.SIGINT, | ||
asyncio.get_running_loop().call_soon_threadsafe, | ||
interrupt_event.set, | ||
) | ||
|
||
# Connect client | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Create an executor for use by Temporal. This cannot be the outer one | ||
# running this async main. The max_workers here needs to have enough room to | ||
# support the max concurrent activities/workflows settings. | ||
with GeventExecutor(max_workers=200) as executor: | ||
|
||
# Run a worker for the workflow and activities | ||
async with Worker( | ||
client, | ||
task_queue="gevent_async-task-queue", | ||
workflows=[workflow.GreetingWorkflow], | ||
activities=[ | ||
activity.compose_greeting_async, | ||
activity.compose_greeting_sync, | ||
], | ||
# Set the executor for activities (only used for non-async | ||
# activities) and workflow tasks | ||
activity_executor=executor, | ||
workflow_task_executor=executor, | ||
# Set the max concurrent activities/workflows. These are the same as | ||
# the defaults, but this makes it clear that the 100 + 100 = 200 for | ||
# max_workers settings. | ||
max_concurrent_activities=100, | ||
max_concurrent_workflow_tasks=100, | ||
): | ||
|
||
# Wait until interrupted | ||
logging.info("Worker started, ctrl+c to exit") | ||
await interrupt_event.wait() | ||
logging.info("Shutting down") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from datetime import timedelta | ||
|
||
from temporalio import workflow | ||
|
||
with workflow.unsafe.imports_passed_through(): | ||
from gevent_async.activity import ( | ||
ComposeGreetingInput, | ||
compose_greeting_async, | ||
compose_greeting_sync, | ||
) | ||
|
||
|
||
@workflow.defn | ||
class GreetingWorkflow: | ||
@workflow.run | ||
async def run(self, name: str) -> str: | ||
workflow.logger.info("Running workflow with parameter %s" % name) | ||
|
||
# Run an async and a sync activity | ||
async_res = await workflow.execute_activity( | ||
compose_greeting_async, | ||
ComposeGreetingInput("Hello", name), | ||
start_to_close_timeout=timedelta(seconds=10), | ||
) | ||
sync_res = await workflow.execute_activity( | ||
compose_greeting_sync, | ||
ComposeGreetingInput("Hello", name), | ||
start_to_close_timeout=timedelta(seconds=10), | ||
) | ||
|
||
# Confirm the same, return one | ||
if async_res != sync_res: | ||
raise ValueError("Results are not the same") | ||
return sync_res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @cretz, I believe gevent.threadpool.ThreadPoolExecutor is the unpatched thread pool, so really this PR is just using native threads.
cf. https://www.gevent.org/api/gevent.threadpool.html#gevent.threadpool.ThreadPoolExecutor :
I verified by running this code locally with the patched
concurrent.futures.ThreadPoolExecutor
and it throws this error, as seen before in our explorations:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly. We are intentionally using native threads so that gevent doesn't try to mess with them so you can have asyncio running in a native thread and gevent doing other things elsewhere. The whole crux of the original issue is that you couldn't even use Temporal and gevent in the same process because gevent tries to take over, but this allows them to share the process space.
If we need further investigation on using greenlets maybe from activities or something, we can look into that separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cretz please correct me if my understanding is wrong: if this is intentionally using native threads, it is creating a new thread for each activity it runs on line 39 despite the comment on line 38 saying that it will "[s]ubmit our wrapper to gevent". this means that all the code executed in activities needs to be threadsafe.
our codebase isn't threadsafe (pretty common for gevent codebases i think) but we are still giving this a go to see if we can get basic execution working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a new thread, This is calling gevent.threadpool.ThreadPoolExecutor.submit which reuses existing threads (I believe). But yes, they aren't the threads shared by gevent.
Not
async def
activities, but yes, regulardef
activities in the Temporal Python SDK are executed via the executor passed and most people use concurrent.futures.ThreadPoolExecutor which does require thread safety yes.Yes, this is admittedly not true "gevent activities" but rather just a demonstration of showing that Temporal can live in the same process as a gevent-encumbered Python process.
We could go one further and look into having an executor specific for activities that bridged the native Python thread to greenlet gap. I did not go that far, but we can try. It'd probably just be a
concurrent.futures.Executor
implementation backed bygevent.pool.Pool
.