Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gevent sample #84

Merged
merged 3 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: true
matrix:
python: ["3.7", "3.10"]
python: ["3.7", "3.11"]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -32,3 +32,11 @@ jobs:
- run: poe test -s -o log_cli_level=DEBUG
- run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping

# On non-3.7, run gevent test
- name: Gevent test
if: ${{ matrix.python != '3.7' }}
run: |
poetry install --with gevent
poetry run python gevent_async/test/run_combined.py


1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
28 changes: 28 additions & 0 deletions gevent_async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Gevent Sample

This sample shows how to run Temporal in an environment that gevent has patched.

Gevent is built to patch Python libraries to attempt to seamlessly convert threaded code into coroutine-based code.
However, it is well known within the gevent community that it does not work well with `asyncio`, which is the modern
Python approach to coroutines. Temporal leverages `asyncio` which means by default it is incompatible with gevent. Users
are encouraged to abandon gevent in favor of more modern approaches where they can but it is not always possible.

This sample shows how to use a customized gevent executor to run `asyncio` Temporal clients, workers, activities, and
workflows.

For this sample, the optional `gevent` dependency group must be included. To include, run:

poetry install --with gevent

To run the sample, first see [README.md](../README.md) for prerequisites such as having a localhost Temporal server
running. Then, run the following from this directory to start the worker:

poetry run python worker.py

This will start the worker. The worker has a workflow and two activities, one `asyncio` based and one gevent based. Now
in another terminal, run the following from this directory to execute the workflow:

poetry run python starter.py

The workflow should run and complete with the hello result. Note on the worker terminal there will be logs of the
workflow and activity executions.
Empty file added gevent_async/__init__.py
Empty file.
25 changes: 25 additions & 0 deletions gevent_async/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dataclasses import dataclass

import gevent
from temporalio import activity


@dataclass
class ComposeGreetingInput:
greeting: str
name: str


@activity.defn
async def compose_greeting_async(input: ComposeGreetingInput) -> str:
activity.logger.info(f"Running async activity with parameter {input}")
return f"{input.greeting}, {input.name}!"


@activity.defn
def compose_greeting_sync(input: ComposeGreetingInput) -> str:
activity.logger.info(
f"Running sync activity with parameter {input}, "
f"in greenlet: {gevent.getcurrent()}"
)
return f"{input.greeting}, {input.name}!"
41 changes: 41 additions & 0 deletions gevent_async/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import functools
from concurrent.futures import Future
from typing import Callable, TypeVar

from gevent import threadpool
from typing_extensions import ParamSpec

T = TypeVar("T")
P = ParamSpec("P")


class GeventExecutor(threadpool.ThreadPoolExecutor):
Copy link

@abeland abeland Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cretz, I believe gevent.threadpool.ThreadPoolExecutor is the unpatched thread pool, so really this PR is just using native threads.

cf. https://www.gevent.org/api/gevent.threadpool.html#gevent.threadpool.ThreadPoolExecutor :

A version of concurrent.futures.ThreadPoolExecutor that always uses native threads, even when threading is monkey-patched.

I verified by running this code locally with the patched concurrent.futures.ThreadPoolExecutor and it throws this error, as seen before in our explorations:

[~/temporalio_gevent/src] ∴ ~/.local/bin/poetry run python -m gevent_async.worker
INFO:root:Worker started, ctrl+c to exit
INFO:temporalio.workflow:Running workflow with parameter Temporal ({'attempt': 1, 'namespace': 'default', 'run_id': '543fef67-e25d-4bd8-926a-cca7bc78fd91', 'task_queue': 'gevent_async-task-queue', 'workflow_id': 'gevent_async-workflow-id', 'workflow_type': 'GreetingWorkflow'})
ERROR:temporalio.worker._workflow:Failed handling activation on workflow with run ID 543fef67-e25d-4bd8-926a-cca7bc78fd91
Traceback (most recent call last):
  File "/home/discord/.cache/pypoetry/virtualenvs/temporalio-gevent-EEP-UPYz-py3.10/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 208, in _handle_activation
    completion = await asyncio.wait_for(
  File "/usr/lib/python3.10/asyncio/tasks.py", line 405, in wait_for
    loop = events.get_running_loop()
RuntimeError: no running event loop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. We are intentionally using native threads so that gevent doesn't try to mess with them so you can have asyncio running in a native thread and gevent doing other things elsewhere. The whole crux of the original issue is that you couldn't even use Temporal and gevent in the same process because gevent tries to take over, but this allows them to share the process space.

If we need further investigation on using greenlets maybe from activities or something, we can look into that separately.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cretz please correct me if my understanding is wrong: if this is intentionally using native threads, it is creating a new thread for each activity it runs on line 39 despite the comment on line 38 saying that it will "[s]ubmit our wrapper to gevent". this means that all the code executed in activities needs to be threadsafe.

our codebase isn't threadsafe (pretty common for gevent codebases i think) but we are still giving this a go to see if we can get basic execution working.

Copy link
Member Author

@cretz cretz Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is creating a new thread for each activity it runs on line 39 despite the comment on line 38 saying that it will "[s]ubmit our wrapper to gevent"

Not a new thread, This is calling gevent.threadpool.ThreadPoolExecutor.submit which reuses existing threads (I believe). But yes, they aren't the threads shared by gevent.

this means that all the code executed in activities needs to be threadsafe.

Not async def activities, but yes, regular def activities in the Temporal Python SDK are executed via the executor passed and most people use concurrent.futures.ThreadPoolExecutor which does require thread safety yes.

our codebase isn't threadsafe (pretty common for gevent codebases i think) but we are still giving this a go to see if we can get basic execution working.

Yes, this is admittedly not true "gevent activities" but rather just a demonstration of showing that Temporal can live in the same process as a gevent-encumbered Python process.

We could go one further and look into having an executor specific for activities that bridged the native Python thread to greenlet gap. I did not go that far, but we can try. It'd probably just be a concurrent.futures.Executor implementation backed by gevent.pool.Pool.

def submit(
self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> Future[T]:
# Gevent's returned futures do not map well to Python futures, so we
# must translate. We can't just use set_result/set_exception because
# done callbacks are not always called in gevent's case and it doesn't
# seem to support cancel, so we instead wrap the caller function.
Comment on lines +16 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[non-blocking/optional] I think it might be clearer to explicitly state "Gevent's returned futures do not implement all of the things expected by concurrent.futures.Future" rather than saying they "do not map well". I wasn't sure what the latter meant, so I wound up reading gevent code to figure out what this meant.

python_fut: Future[T] = Future()

@functools.wraps(fn)
def wrapper(*w_args: P.args, **w_kwargs: P.kwargs) -> None:
try:
result = fn(*w_args, **w_kwargs)
# Swallow InvalidStateError in case Python future was cancelled
try:
python_fut.set_result(result)
except:
pass
except Exception as exc:
# Swallow InvalidStateError in case Python future was cancelled
try:
python_fut.set_exception(exc)
except:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this might end up in production code, I suggest catching the specific exception that you indicate in the comment.

Copy link
Member Author

@cretz cretz Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we support 3.7+ and this class didn't get added until 3.8 and we try to write our code for all versions. Adding this makes the 3.7 CI angry (even though technically this sample only works with 3.8 because gevent only works with 3.8).

pass

# Submit our wrapper to gevent
super().submit(wrapper, *args, **kwargs)
# Return Python future to user
return python_fut
40 changes: 40 additions & 0 deletions gevent_async/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Init gevent
from gevent import monkey

monkey.patch_all()

import asyncio
import logging

from temporalio.client import Client

from gevent_async import workflow
from gevent_async.executor import GeventExecutor


def main():
logging.basicConfig(level=logging.INFO)

# Create single-worker gevent executor and run asyncio.run(async_main()) in
# it, waiting for result. This executor cannot be used for anything else in
# Temporal, it is just a single thread for running asyncio.
with GeventExecutor(max_workers=1) as executor:
executor.submit(asyncio.run, async_main()).result()


async def async_main():
# Connect client
client = await Client.connect("localhost:7233")

# Run workflow
result = await client.execute_workflow(
workflow.GreetingWorkflow.run,
"Temporal",
id="gevent_async-workflow-id",
task_queue="gevent_async-task-queue",
)
logging.info(f"Workflow result: {result}")


if __name__ == "__main__":
main()
Empty file added gevent_async/test/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions gevent_async/test/run_combined.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Init gevent
from gevent import monkey

monkey.patch_all()

import asyncio
import logging

from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from gevent_async import activity, workflow
from gevent_async.executor import GeventExecutor

# This basically combines ../worker.py and ../starter.py for use by CI to
# confirm this works in all environments


def main():
logging.basicConfig(level=logging.INFO)
with GeventExecutor(max_workers=1) as executor:
executor.submit(asyncio.run, async_main()).result()


async def async_main():
logging.info("Starting local server")
async with await WorkflowEnvironment.start_local() as env:
logging.info("Starting worker")
with GeventExecutor(max_workers=200) as executor:
async with Worker(
env.client,
task_queue="gevent_async-task-queue",
workflows=[workflow.GreetingWorkflow],
activities=[
activity.compose_greeting_async,
activity.compose_greeting_sync,
],
activity_executor=executor,
workflow_task_executor=executor,
max_concurrent_activities=100,
max_concurrent_workflow_tasks=100,
):
logging.info("Running workflow")
result = await env.client.execute_workflow(
workflow.GreetingWorkflow.run,
"Temporal",
id="gevent_async-workflow-id",
task_queue="gevent_async-task-queue",
)
if result != "Hello, Temporal!":
raise RuntimeError(f"Unexpected result: {result}")
logging.info(f"Workflow complete, result: {result}")


if __name__ == "__main__":
main()
76 changes: 76 additions & 0 deletions gevent_async/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Init gevent
from gevent import monkey

monkey.patch_all()

import asyncio
import logging
import signal

import gevent
from temporalio.client import Client
from temporalio.worker import Worker

from gevent_async import activity, workflow
from gevent_async.executor import GeventExecutor


def main():
logging.basicConfig(level=logging.INFO)

# Create single-worker gevent executor and run asyncio.run(async_main()) in
# it, waiting for result. This executor cannot be used for anything else in
# Temporal, it is just a single thread for running asyncio. This means that
# inside of async_main we must create another executor specifically for
# executing activity and workflow tasks.
with GeventExecutor(max_workers=1) as executor:
executor.submit(asyncio.run, async_main()).result()


async def async_main():
# Create ctrl+c handler. We do this by telling gevent on SIGINT to set the
# asyncio event. But asyncio calls are not thread safe, so we have to invoke
# it via call_soon_threadsafe.
interrupt_event = asyncio.Event()
gevent.signal_handler(
signal.SIGINT,
asyncio.get_running_loop().call_soon_threadsafe,
interrupt_event.set,
)

# Connect client
client = await Client.connect("localhost:7233")

# Create an executor for use by Temporal. This cannot be the outer one
# running this async main. The max_workers here needs to have enough room to
# support the max concurrent activities/workflows settings.
with GeventExecutor(max_workers=200) as executor:

# Run a worker for the workflow and activities
async with Worker(
client,
task_queue="gevent_async-task-queue",
workflows=[workflow.GreetingWorkflow],
activities=[
activity.compose_greeting_async,
activity.compose_greeting_sync,
],
# Set the executor for activities (only used for non-async
# activities) and workflow tasks
activity_executor=executor,
workflow_task_executor=executor,
# Set the max concurrent activities/workflows. These are the same as
# the defaults, but this makes it clear that the 100 + 100 = 200 for
# max_workers settings.
max_concurrent_activities=100,
max_concurrent_workflow_tasks=100,
):

# Wait until interrupted
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
main()
34 changes: 34 additions & 0 deletions gevent_async/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from gevent_async.activity import (
ComposeGreetingInput,
compose_greeting_async,
compose_greeting_sync,
)


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)

# Run an async and a sync activity
async_res = await workflow.execute_activity(
compose_greeting_async,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
sync_res = await workflow.execute_activity(
compose_greeting_sync,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

# Confirm the same, return one
if async_res != sync_res:
raise ValueError("Results are not the same")
return sync_res
Loading
Loading