MicroRabbit is a lightweight, asynchronous Python framework for working with RabbitMQ. It simplifies the process of setting up RabbitMQ consumers and publishers, making it easy to build microservices and distributed systems.
- Asynchronous message handling using
asyncio
- Simple decorator-based message routing
- Plugin system for modular code organization
- Easy-to-use client configuration
- Built-in logging support
- Customizable type annotations for message data
pip install microrabbit
Here's a simple example of how to use MicroRabbit:
import asyncio
import logging
from pydantic import BaseModel
from microrabbit import Client
from microrabbit.types import QueueOptions, ConsumerOptions, ConnectionOptions
class Message(BaseModel):
test: str
client = Client(
host="amqp://guest:guest@localhost/",
plugins="./plugins",
connection_type="ROBUST",
connection_options=ConnectionOptions()
)
log = logging.getLogger(__file__)
logging.basicConfig(level=logging.INFO)
@Client.on_message("queue_name")
async def test(data: dict) -> dict:
log.info(f"Received message {data}")
return {"connected": True}
@Client.on_message("queue_name2", queue_options=QueueOptions(exclusive=True), consume_options=ConsumerOptions(no_ack=True))
async def test2(data: int|Message) -> dict:
log.info(f"Received message {data.test}")
return {"connected": True}
@client.on_ready
async def on_ready():
log.info("[*] Waiting for messages. To exit press CTRL+C")
result = await client.simple_publish("queue_name2", {"test": "data"}, timeout=2, decode=True)
log.info(result)
if __name__ == "__main__":
asyncio.run(client.run())
Create a Client
instance with the following parameters:
host
: RabbitMQ server URLinstance_id
: Unique identifier for the client if not provided it will be generated automatically (optional)plugins
: Path to the plugins folder (optional)connection_type
: Connection typestr (NORMAL, ROBUST)
orCONNECTION_TYPE
(optional)connection_options
: Connection optionsConnectionOptions
(optional)
from microrabbit import Client
from microrabbit.types import CONNECTION_TYPE, ConnectionOptions
client = Client(
host="amqp://guest:guest@localhost/",
instance_id="unique_id",
plugins="./plugins",
connection_type=CONNECTION_TYPE.NORMAL,
connection_options=ConnectionOptions(ssl=True)
)
Use the @Client.on_message
decorator to define a message handler. The decorator takes the queue name as an argument.
Arguments:
queue_name
: Name of the queueinstance_id
: Unique identifier for the client if not provided it will be setted as global, when a client runs the queue will be consumed (optional)queue_options
: Queue optionsQueueOptions
(optional)consume_options
: Consumer optionsConsumerOptions
(optional)
from microrabbit import Client
from microrabbit.types import QueueOptions
@Client.on_message("queue_name", queue_options=QueueOptions(exclusive=True))
async def handler(data: dict):
# Process the message
return response_data # Serializeable data
In the handler function, you can specify the data type of the message using type annotations for parameters and return values.
from microrabbit import Client
from typing import Union
from pydantic import BaseModel
class Message(BaseModel):
test: str
@Client.on_message("queue_name")
async def handler(data: Union[Message, int]) -> Union[Message, int]:
print(data.test)
return data # Could be a Message object or an int, you can return any serializable object or any BaseModel object
Use the @client.on_ready
decorator to define a function that runs when the client is ready:
from microrabbit import Client
client = Client(
host="amqp://guest:guest@localhost/",
plugins="./plugins"
)
@client.on_ready
async def on_ready():
print("Client is ready")
Run the client using asyncio.run(client.run())
:
import asyncio
from microrabbit import Client
client = Client(
host="amqp://guest:guest@localhost/",
plugins="./plugins"
)
if __name__ == "__main__":
asyncio.run(client.run())
Use the simple_publish
method to publish a message to a queue:
result = await client.simple_publish("queue_name", {"test": "data"}, timeout=2, decode=True)
import asyncio
from microrabbit import Client
client = Client(
host="amqp://guest:guest@localhost/",
plugins="./plugins"
)
async def main():
async with client:
await client.run()
if __name__ == "__main__":
asyncio.run(main())
MicroRabbit supports a plugin system. Place your plugin files in the specified plugins folder, and they will be automatically loaded by the client.
# ./plugins/test_plugin.py
from microrabbit import Client
@Client.on_message("test_queue")
async def test_handler(data: dict):
print(f"Received message: {data}")
return {"status": "ok"}
Use the QueueOptions
class to specify queue options:
from microrabbit.types import QueueOptions
@Client.on_message("queue_name", queue_options=QueueOptions(exclusive=True))
async def handler(data: dict):
# Process the message
return response_data
Use the ConsumerOptions
class to specify consumer options:
from microrabbit.types import ConsumerOptions
@Client.on_message("queue_name", consume_options=ConsumerOptions(no_ack=True))
async def handler(data: dict):
# Process the message
return response_data
Contributions are welcome! For feature requests, bug reports, or questions, please open an issue. If you would like to contribute code, please submit a pull request.
This project is licensed under the MIT License - see the LICENSE file for details.