mq is a publish/subscribe (or queueing system) mechanism based on mongodb
. It allows to launch small tasks
in an asynchronous way. Several tools exist but using other datastore such as Redis (rq, ...) so if you already use
redis, it may be a better choice.
mq is a hobby project mainly to learn asyncio module, and multiprocessing condition primitives.
pip install mq
poetry add mq
mq can work in several ways. Usually, we enqueue tasks in a process, and launch another process to dequeue these tasks and perform their execution. mq support this in several ways:
- launch worker process in same script becoming a subprocess of the main process
- launch worker process in another script.
- launch worker in a thread for heavy IO tasks.
- full dequeuing as an asyncio.Task (coming)
import random
import asyncio
from functools import partial
from loguru import logger
from mq import mq, job
from mq.utils import MongoDBConnectionParameters
@job(channel="test")
async def job_test(a, b):
await asyncio.sleep(1)
return a + b
async def main():
await mq.init(
MongoDBConnectionParameters(
mongo_uri="mongodb://localhost:27017",
db_name="mq",
collection="mq",
)
)
# start worker process in same process
worker = await mq.default_worker(channel="default").start()
# enqueue job, coroutine function
r1, r2 = random.randint(1, 100), random.randint(1, 100)
job_command = await job_test.mq(r1, r2)
# add a callback when it' s done !
job_command.add_done_callback(partial(logger.debug, "Got a result {} !"))
#or await for its result
result = await job_command.wait_for_result()
# or cancel a running task
is_cancelled = await job_command.cancel()
if __name == "__main__":
asyncio.run(main())
Needs to add a parameter in the init function of mq, in the enqueuing process
await mq.with_process_connection(
# specify connection parameters to the main manager process
MQManagerConnectionParameters(
url= "127.0.0.1",
port=50000,
authkey=b"abracadabra"
)
).init(
# specify mongodb connection
MongoDBConnectionParameters(
mongo_uri="mongodb://localhost:27017",
db_name="mq",
collection="mq",
),
# add this one to start manager server
start_server=True,
)
Now in another script:
await mq.with_process_connection(
# specify connection parameters to the main manager process
MQManagerConnectionParameters(
url= "127.0.0.1",
port=50000,
authkey=b"abracadabra"
) #(1)
).init(
# specify mongodb connection
MongoDBConnectionParameters(
mongo_uri="mongodb://localhost:27017",
db_name="mq",
collection="mq",
),
in_worker_process=True,
)
- Needs to connect to other process and acceed to shared memory. Process can also be remote ☁ !
You can specify retry, schedule options, and finally define Directed Acyclic Graph of tasks (using the downstream keyword) !
@job(
channel="test",
stop=stop_after_attempt(3),
)
async def test_retry(a, b):
await asyncio.sleep(0.1)
if random.randint() > 50:
raise ValueError("")
return a + b
@job(
channel="test",
schedule=every(10).seconds,
)
async def test_schedule(a, b):
await asyncio.sleep(0.1)
return a + b
See examples for more complex code !
- write a full asyncio version (all running in the same event loop)
- write extensive test suite