diff --git a/crates/outbound-mqtt/src/lib.rs b/crates/outbound-mqtt/src/lib.rs index ec3118ba89..8f085afbed 100644 --- a/crates/outbound-mqtt/src/lib.rs +++ b/crates/outbound-mqtt/src/lib.rs @@ -3,7 +3,7 @@ mod host_component; use std::time::Duration; use anyhow::Result; -use rumqttc::AsyncClient; +use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS}; use spin_core::{async_trait, wasmtime::component::Resource}; use spin_world::v2::mqtt::{self as v2, Connection as MqttConnection, Error, Qos}; @@ -97,15 +97,22 @@ impl v2::HostConnection for OutboundMqtt { .await .map_err(other_error)?; - // Poll EventLoop once to send the message to MQTT broker or capture/throw error - // We may revisit this later to manage long running connections and their issues in the connection pool. - eventloop - .poll() - .await - .map_err(|err: rumqttc::ConnectionError| { - v2::Error::ConnectionFailed(err.to_string()) - })?; - + // Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error. + // We may revisit this later to manage long running connections, high throughput use cases and their issues in the connection pool. + loop { + let event = eventloop + .poll() + .await + .map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?; + + match (qos, event) { + (QoS::AtMostOnce, Event::Outgoing(Outgoing::Publish(_))) + | (QoS::AtLeastOnce, Event::Incoming(Incoming::PubAck(_))) + | (QoS::ExactlyOnce, Event::Outgoing(Outgoing::PubComp(_))) => break, + + (_, _) => continue, + } + } Ok(()) } .await)