From 5439daa4b60bf88a72adf63dd2339fb495cb092e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20B=C3=B6hm?= Date: Thu, 11 Jan 2024 23:02:39 +0100 Subject: [PATCH] docs: Add migration guide --- CONTRIBUTING.md | 2 +- docs/index.md | 3 +- docs/migration-guide-v2.md | 152 +++++++++++++++++++++++++++++++++ docs/subscribing-to-a-topic.md | 10 +-- 4 files changed, 160 insertions(+), 7 deletions(-) create mode 100644 docs/migration-guide-v2.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 61b6f6c..a941fb1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -7,7 +7,7 @@ We're very happy about contributions to aiomqtt! ✨ - Clone the aiomqtt repository - Install the Python version noted in `.python-version` via `pyenv` - Install poetry; Then run `./scripts/setup` to install the dependencies and aiomqtt itself -- Run ruff, and mypy with `./scripts/check` +- Run ruff and mypy with `./scripts/check` - Run the tests with `./scripts/test` During development, it's often useful to have a local MQTT broker running. You can spin up a local mosquitto broker with Docker via `./scripts/develop`. You can connect to this broker with `aiomqtt.Client("localhost", port=1883)`. diff --git a/docs/index.md b/docs/index.md index c5138d6..d43600d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -31,6 +31,7 @@ alongside-fastapi-and-co :hidden: developer-interface +migration-guide-v2 ``` ```{toctree} @@ -39,7 +40,7 @@ developer-interface GitHub Issue tracker -Discussions +Changelog Contributing PyPI ``` diff --git a/docs/migration-guide-v2.md b/docs/migration-guide-v2.md new file mode 100644 index 0000000..707233e --- /dev/null +++ b/docs/migration-guide-v2.md @@ -0,0 +1,152 @@ +# Migration guide: v2.0.0 + +Version 2.0.0 introduces some breaking changes. This page aims to help you migrate to this new major version. The relevant changes are: + +- The deprecated `connect` and `disconnect` methods have been removed +- The deprecated `filtered_messages` and `unfiltered_messages` methods have been removed +- User-managed queues for incoming messages have been replaced with a single client-wide queue +- Some arguments to the `Client` have been renamed or removed + +## Changes to the client lifecycle + +The deprecated `connect` and `disconnect` methods have been removed. The best way to connect and disconnect from the broker is through the client's context manager: + +```python +import asyncio +import aiomqtt + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + await client.publish("temperature/outside", payload=28.4) + + +asyncio.run(main()) +``` + +If your use case does not allow you to use a context manager, you can use the client’s `__aenter__` and `__aexit__` methods almost interchangeably in place of the removed `connect` and `disconnect` methods. + +The `__aenter__` and `__aexit__` methods are designed to be called by the `async with` statement when the execution enters and exits the context manager. However, we can also execute them manually: + +```python +import asyncio +import aiomqtt + + +async def main(): + client = aiomqtt.Client("test.mosquitto.org") + await client.__aenter__() + await client.publish("temperature/outside", payload=28.4) + await client.__aexit__(None, None, None) + + +asyncio.run(main()) +``` + +`__aenter__` is equivalent to `connect`. `__aexit__` is equivalent to `disconnect` except that it forces disconnection instead of throwing an exception in case the client cannot disconnect cleanly. + +```{note} +`__aexit__` expects three arguments: `exc_type`, `exc`, and `tb`. These arguments describe the exception that caused the context manager to exit, if any. You can pass `None` to all of these arguments in a manual call to `__aexit__`. +``` + +## Changes to the message queue + +The `filtered_messages`, `unfiltered_messages`, and `messages` methods have been removed and replaced with a single client-wide message queue. + +A minimal example of printing all messages (unfiltered) looks like this: + +```python +import asyncio +import aiomqtt + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + await client.subscribe("temperature/#") + async for message in client.messages: + print(message.payload) + + +asyncio.run(main()) +``` + +To handle messages from different topics differently, we can use `Topic.matches()`: + +```python +import asyncio +import aiomqtt + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + async for message in client.messages: + if message.topic.matches("humidity/inside"): + print(f"[humidity/inside] {message.payload}") + if message.topic.matches("+/outside"): + print(f"[+/outside] {message.payload}") + if message.topic.matches("temperature/#"): + print(f"[temperature/#] {message.payload}") + + +asyncio.run(main()) +``` + +```{note} +In our example, messages to `temperature/outside` are handled twice! +``` + +The `filtered_messages`, `unfiltered_messages`, and `messages` methods created isolated message queues underneath, such that you could invoke them multiple times. From Version 2.0.0 on, the client maintains a single queue that holds all incoming messages, accessible via `Client.messages`. + +If you continue to need multiple queues (e.g. because you have special concurrency requirements), you can build a "distributor" on top: + +```python +import asyncio +import aiomqtt + + +async def temperature_consumer(): + while True: + message = await temperature_queue.get() + print(f"[temperature/#] {message.payload}") + + +async def humidity_consumer(): + while True: + message = await humidity_queue.get() + print(f"[humidity/#] {message.payload}") + + +temperature_queue = asyncio.Queue() +humidity_queue = asyncio.Queue() + + +async def distributor(client): + # Sort messages into the appropriate queues + async for message in client.messages: + if message.topic.matches("temperature/#"): + temperature_queue.put_nowait(message) + elif message.topic.matches("humidity/#"): + humidity_queue.put_nowait(message) + + +async def main(): + async with aiomqtt.Client("test.mosquitto.org") as client: + await client.subscribe("temperature/#") + await client.subscribe("humidity/#") + # Use a task group to manage and await all tasks + async with asyncio.TaskGroup() as tg: + tg.create_task(distributor(client)) + tg.create_task(temperature_consumer()) + tg.create_task(humidity_consumer()) + + +asyncio.run(main()) +``` + +## Changes to client arguments + +- The `queue_class` and `queue_maxsize` arguments to `filtered_messages`, `unfiltered_messages`, and `messages` have been moved to the `Client` and have been renamed to `queue_type` and `max_queued_incoming_messages` +- The `max_queued_messages` client argument has been renamed to `max_queued_outgoing_messages` +- The deprecated `message_retry_set` client argument has been removed diff --git a/docs/subscribing-to-a-topic.md b/docs/subscribing-to-a-topic.md index e227dd2..74212af 100644 --- a/docs/subscribing-to-a-topic.md +++ b/docs/subscribing-to-a-topic.md @@ -1,6 +1,6 @@ # Subscribing to a topic -To receive messages for a topic, we need to subscribe to it and listen for messages. This is a minimal working example that listens for messages to the `temperature/#` wildcard: +To receive messages for a topic, we need to subscribe to it. Incoming messages are queued internally. You can use the `Client.message` generator to iterate over incoming messages. This is a minimal working example that listens for messages to the `temperature/#` wildcard: ```python import asyncio @@ -40,9 +40,9 @@ async def main(): await client.subscribe("humidity/#") async for message in client.messages: if message.topic.matches("humidity/inside"): - print(f"[humidity/outside] {message.payload}") + print(f"[humidity/inside] {message.payload}") if message.topic.matches("+/outside"): - print(f"[+/inside] {message.payload}") + print(f"[+/outside] {message.payload}") if message.topic.matches("temperature/#"): print(f"[temperature/#] {message.payload}") @@ -60,7 +60,7 @@ For details on the `+` and `#` wildcards and what topics they match, see the [OA ## The message queue -Messages are queued and returned sequentially from `Client.messages`. +Messages are queued internally and returned sequentially from `Client.messages`. The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other types of asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to the `Client` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`. @@ -103,7 +103,7 @@ By default, the size of the queue is unlimited. You can set a limit by passing t ## Processing concurrently -Messages are queued and returned sequentially from `Client.messages`. If a message takes a long time to handle, it blocks the handling of other messages. +Messages are queued internally and returned sequentially from `Client.messages`. If a message takes a long time to handle, it blocks the handling of other messages. You can handle messages concurrently by using an `asyncio.TaskGroup` like so: