Skip to content

Commit

Permalink
Linux fix and minor PR suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Oct 13, 2023
1 parent 092f855 commit 2567cca
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
6 changes: 3 additions & 3 deletions gevent_async/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import functools
from concurrent.futures import Future
from concurrent.futures import Future, InvalidStateError
from typing import Callable, TypeVar

from gevent import threadpool
Expand All @@ -26,13 +26,13 @@ def wrapper(*w_args: P.args, **w_kwargs: P.kwargs) -> None:
# Swallow InvalidStateError in case Python future was cancelled
try:
python_fut.set_result(result)
except:
except InvalidStateError:
pass
except Exception as exc:
# Swallow InvalidStateError in case Python future was cancelled
try:
python_fut.set_exception(exc)
except:
except InvalidStateError:
pass

# Submit our wrapper to gevent
Expand Down
2 changes: 1 addition & 1 deletion gevent_async/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def async_main():
result = await client.execute_workflow(
workflow.GreetingWorkflow.run,
"Temporal",
id=f"gevent_async-workflow-id",
id="gevent_async-workflow-id",
task_queue="gevent_async-task-queue",
)
logging.info(f"Workflow result: {result}")
Expand Down
33 changes: 17 additions & 16 deletions gevent_async/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,26 @@
def main():
logging.basicConfig(level=logging.INFO)

# Create a new event loop so we can run_until_complete on it. We could
# just use asyncio.run like starter does, but this approach allows us to
# listen for a signal and stop the worker gracefully.
loop = asyncio.new_event_loop()

# Make SIGINT trigger an event that will shutdown the worker
interrupt_event = asyncio.Event()
gevent.signal_handler(signal.SIGINT, loop.call_soon_threadsafe, interrupt_event.set)

# Create single-worker gevent executor to run event loop, waiting for
# result. This executor cannot be used for anything else in Temporal, it is
# just a single thread for running asyncio. This means that inside of
# async_main we must create another executor specifically for executing
# activity and workflow tasks.
# Create single-worker gevent executor and run asyncio.run(async_main()) in
# it, waiting for result. This executor cannot be used for anything else in
# Temporal, it is just a single thread for running asyncio. This means that
# inside of async_main we must create another executor specifically for
# executing activity and workflow tasks.
with GeventExecutor(max_workers=1) as executor:
executor.submit(loop.run_until_complete, async_main(interrupt_event)).result()
executor.submit(asyncio.run, async_main()).result()


async def async_main(interrupt_event: asyncio.Event):
async def async_main():
# Create ctrl+c handler. We do this by telling gevent on SIGINT to set the
# asyncio event. But asyncio calls are not thread safe, so we have to invoke
# it via call_soon_threadsafe.
interrupt_event = asyncio.Event()
gevent.signal_handler(
signal.SIGINT,
asyncio.get_running_loop().call_soon_threadsafe,
interrupt_event.set,
)

# Connect client
client = await Client.connect("localhost:7233")

Expand Down

0 comments on commit 2567cca

Please sign in to comment.