-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT messages sent when connectivity service was stopped are not processed #1868
Comments
Good catch, thanks for finding and bringing it up 👍 My thoughts:
|
I'm fairly new to Akka/Pekko, so I might get it wrong. But from what I read I don't see possibility (for now) to distinguish between closing connection and modifying connection. actions = Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.UPDATE_SUBSCRIPTIONS, ConnectionAction.CLOSE_CONNECTION,
ConnectionAction.STOP_CLIENT_ACTORS, ConnectionAction.SEND_RESPONSE); Moreover, modifying of open connection closes it first, then stops actors and then opens it again. This makes MQTT actor unaware if we want to create brand new connection or update existing one. So it needs to be taken to upper level - ConnectionPersistenceActor, I guess. Modifying of closed connection does not do any unsubscribe, apparently. So if we modify closed connection to closed one, we'll need to do unsubscribe on opening the connection. Having that said, understanding if the MQTT client should unsubscribe (from some or all topics) or not does not seem a straightforward thing. To be honest, I don't imagine how it would be intuitive to understand if we want just to close connection or close and discard saved session. Maybe we could introduce some header, e.g., ditto-mqtt-clear-session and process it when connection is being closed (for example, when processing CloseConnection or ModifyConnection commands), and not do unsubscribe anywhere else? I'm not fan of magic headers, but guessing if user wants to clear session seem to be complicated and prone to mistakes. Introducing new commands only for MQTT (like, CloseConnectionAndClear) does not seem reasonable to me. |
Recently we had similar issue #1767 and it was fixed. This issue has different cause.
When connectivity service is being stopped, it stops all connections and unsubscribes (link to code) from all MQTT topics. In MQTT if client unsubscribes from topic, then new messages for this topic will not be kept by broker in the client session and will not be delivered on next client connect. This means that even if connection has "cleanSession" set to false and "qos" 1 or 2, the messages will not be delivered when connectivity service will start again.
Following are my thoughts on that.
My initial idea was to not do unsubscribing for MQTT at all. For cases if we do not need session (for example, we set ditto.connectivity.connection.mqtt.session-expiry-interval to 0s) this will have no impact as broker will discard session upon client disconnect.
But this approach will not work for edge case when we change list of topic we want to subscribe to. Apart from session, broker remembers client subscriptions in session: if client subscribed to topic "topic1", then disconnected and connected back, it will be subscribed to "topic1" and will receive all new messages for this topic. So if we change list of topics for connection and do not change client ID, the client will receive messages from all previous and newly subscribed topics.
The text was updated successfully, but these errors were encountered: