Skip to content

Commit

Permalink
Merge pull request #55 from quartiq/rs/issue-53/receive-timeout-pings
Browse files Browse the repository at this point in the history
Updating ping timeout to operate unconditionally within keep-alive interval
  • Loading branch information
ryan-summers authored Oct 8, 2021
2 parents 25a396b + c7680f6 commit a959e94
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 150 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
//! let mut subscribed = false;
//!
//! loop {
//! if mqtt.client.is_connected().unwrap() && !subscribed {
//! if mqtt.client.is_connected() && !subscribed {
//! mqtt.client.subscribe("topic", &[]).unwrap();
//! subscribed = true;
//! }
Expand Down
230 changes: 115 additions & 115 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::{
de::{deserialize::ReceivedPacket, PacketReader},
de::{
deserialize::{ConnAck, ReceivedPacket},
PacketReader,
},
network_manager::InterfaceHolder,
ser::serialize,
session_state::SessionState,
Error, Property, ProtocolError, {debug, error, info, warn},
Error, Property, ProtocolError, {debug, error, info},
};

use embedded_nal::{IpAddr, SocketAddr, TcpClientStack};
Expand All @@ -12,13 +15,10 @@ use embedded_time::{
duration::{Extensions, Seconds},
};

use heapless::{String, Vec};
use heapless::String;

use core::str::FromStr;

/// The default duration to wait for a ping response from the broker.
const PING_TIMEOUT: embedded_time::duration::Seconds = embedded_time::duration::Seconds(5);

/// The quality-of-service for an MQTT message.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum QoS {
Expand All @@ -40,8 +40,10 @@ mod sm {
transitions: {
*Restart + GotSocket = ConnectTransport,
ConnectTransport + Connect = ConnectBroker,
ConnectBroker + SentConnect = Active,
ConnectBroker + SentConnect = Establishing,
ConnectBroker + Disconnect = Restart,
Establishing + ReceivedConnAck = Active,
Establishing + Disconnect = Restart,
Active + Disconnect = Restart,
}
}
Expand Down Expand Up @@ -108,8 +110,6 @@ where

// Next, connect to the broker via the MQTT protocol.
&States::ConnectBroker => {
self.reset();

let properties = [
// Tell the broker our maximum packet size.
Property::MaximumPacketSize(MSG_SIZE as u32),
Expand All @@ -131,14 +131,15 @@ where
)?;

info!("Sending CONNECT");
self.network
.write_packet(&mut self.session_state, &self.clock, packet)?;
self.network.write(packet)?;

self.connection_state
.process_event(Events::SentConnect)
.unwrap();
}

&States::Establishing => {}

_ => {}
}

Expand All @@ -147,6 +148,10 @@ where
Ok(())
}

fn reset(&mut self) {
self.connection_state.process_event(Events::Disconnect).ok();
}

/// Configure the MQTT keep-alive interval.
///
/// # Note
Expand Down Expand Up @@ -192,7 +197,7 @@ where
topic: &'a str,
properties: &[Property<'b>],
) -> Result<(), Error<TcpStack::Error>> {
if !self.session_state.connected {
if self.connection_state.state() != &States::Active {
return Err(Error::NotReady);
}

Expand All @@ -201,17 +206,15 @@ where
let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE];
let packet = serialize::subscribe_message(&mut buffer, topic, packet_id, properties)?;

self.network
.write_packet(&mut self.session_state, &self.clock, packet)
.and_then(|_| {
info!("Subscribing to `{}`: {}", topic, packet_id);
self.session_state
.pending_subscriptions
.push(packet_id)
.map_err(|_| Error::Unsupported)?;
self.session_state.increment_packet_identifier();
Ok(())
})
self.network.write(packet).and_then(|_| {
info!("Subscribing to `{}`: {}", topic, packet_id);
self.session_state
.pending_subscriptions
.push(packet_id)
.map_err(|_| Error::Unsupported)?;
self.session_state.increment_packet_identifier();
Ok(())
})
}

/// Determine if any subscriptions are waiting for completion.
Expand All @@ -226,8 +229,8 @@ where
///
/// # Returns
/// True if the client is connected to the broker.
pub fn is_connected(&mut self) -> Result<bool, Error<TcpStack::Error>> {
Ok(self.network.tcp_connected()? && self.session_state.connected)
pub fn is_connected(&mut self) -> bool {
self.connection_state.state() == &States::Active
}

/// Get the count of unacknowledged QoS 1 messages.
Expand Down Expand Up @@ -271,7 +274,7 @@ where
assert!(qos != QoS::ExactlyOnce);

// If we are not yet connected to the broker, we can't transmit a message.
if self.is_connected()? == false {
if self.is_connected() == false {
return Ok(());
}

Expand All @@ -286,8 +289,7 @@ where
let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE];
let packet = serialize::publish_message(&mut buffer, topic, data, qos, id, properties)?;

self.network
.write_packet(&mut self.session_state, &self.clock, packet)?;
self.network.write(packet)?;
self.session_state.increment_packet_identifier();

if qos == QoS::AtLeastOnce {
Expand All @@ -297,8 +299,63 @@ where
Ok(())
}

fn reset(&mut self) {
self.session_state.connected = false;
fn handle_connection_acknowledge(
&mut self,
acknowledge: ConnAck,
) -> Result<(), Error<TcpStack::Error>> {
if self.connection_state.state() != &States::Establishing {
return Err(Error::Protocol(ProtocolError::Invalid));
}

let mut result = Ok(());

if acknowledge.reason_code != 0 {
return Err(Error::Failed(acknowledge.reason_code));
}

if !acknowledge.session_present {
if self.session_state.is_present() {
result = Err(Error::SessionReset);
}

// Reset the session state upon connection with a broker that doesn't have a
// session state saved for us.
self.session_state.reset();
}

self.connection_state
.process_event(Events::ReceivedConnAck)
.unwrap();

for property in acknowledge.properties {
match property {
Property::MaximumPacketSize(size) => {
self.session_state.maximum_packet_size.replace(size);
}
Property::AssignedClientIdentifier(id) => {
self.session_state.client_id =
String::from_str(id).or(Err(Error::ProvidedClientIdTooLong))?;
}
Property::ServerKeepAlive(keep_alive) => {
self.session_state
.keep_alive_interval
.replace((keep_alive as u32).seconds());
}
_prop => info!("Ignoring property: {:?}", _prop),
};
}

// Now that we are connected, we have session state that will be persisted.
self.session_state
.register_connection(self.clock.try_now()?);

// Replay QoS 1 messages
for key in self.session_state.pending_publish_ordering.iter() {
let message = self.session_state.pending_publish.get(&key).unwrap();
self.network.write(message)?;
}

result
}

fn handle_packet<'a, F>(
Expand All @@ -314,71 +371,18 @@ where
&[Property<'a>],
),
{
if !self.session_state.connected {
if let ReceivedPacket::ConnAck(acknowledge) = packet {
let mut result = Ok(());

if acknowledge.reason_code != 0 {
return Err(Error::Failed(acknowledge.reason_code));
}

if !acknowledge.session_present {
if self.session_state.connected {
result = Err(Error::SessionReset);
}

// Reset the session state upon connection with a broker that doesn't have a
// session state saved for us.
self.session_state.reset();
}

// Now that we are connected, we have session state that will be persisted.
self.session_state.register_connection();
self.session_state.connected = true;

for property in acknowledge.properties {
match property {
Property::MaximumPacketSize(size) => {
self.session_state.maximum_packet_size.replace(size);
}
Property::AssignedClientIdentifier(id) => {
self.session_state.client_id =
String::from_str(id).or(Err(Error::ProvidedClientIdTooLong))?;
}
Property::ServerKeepAlive(keep_alive) => {
self.session_state
.keep_alive_interval
.replace((keep_alive as u32).seconds());
}
_prop => info!("Ignoring property: {:?}", _prop),
};
}

// Replay QoS 1 messages
let keys: Vec<u16, MSG_COUNT> = self.session_state.pending_publish_ordering.clone();
for k in keys {
let message: Vec<u8, MSG_SIZE> = Vec::from_slice(
self.session_state
.pending_publish
.get(&k)
.unwrap()
.as_slice(),
)
.unwrap();
self.network
.write_packet(&mut self.session_state, &self.clock, &message)?;
}
// ConnAck packets are received outside of the connection state.
if let ReceivedPacket::ConnAck(ack) = packet {
return self.handle_connection_acknowledge(ack);
}

return result;
} else {
// It is a protocol error to receive anything else when not connected.
// TODO: Verify it is a protocol error.
error!(
"Received invalid packet outside of connected state: {:?}",
packet
);
return Err(Error::Protocol(ProtocolError::Invalid));
}
// All other packets must be received in the active state.
if !self.is_connected() {
error!(
"Received invalid packet outside of connected state: {:?}",
packet
);
return Err(Error::Protocol(ProtocolError::Invalid));
}

match packet {
Expand Down Expand Up @@ -419,12 +423,7 @@ where

ReceivedPacket::PingResp => {
// Cancel the ping response timeout.
if self.session_state.ping_timeout.is_some() {
self.session_state.ping_timeout = None;
} else {
warn!("Got unexpected ping response");
}

self.session_state.register_ping_response();
Ok(())
}

Expand All @@ -440,26 +439,25 @@ where

let now = self.clock.try_now()?;

if let Some(timeout) = self.session_state.ping_timeout {
if now > timeout {
// Reset network connection.
// Note: The ping timeout is set at this point so that it's running even if we fail
// to write the ping message. This is intentional incase the underlying transport
// mechanism has stalled. The ping timeout will then allow us to recover the
// underlying TCP connection.
match self.session_state.handle_ping(now) {
Err(()) => {
self.connection_state.process_event(Events::Disconnect).ok();
}
} else {
// Check if we need to ping the server.
if self.session_state.ping_is_due(&now) {
let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE];

// Note: The ping timeout is set at this point so that it's running even if we fail
// to write the ping message. This is intentional incase the underlying transport
// mechanism has stalled. The ping timeout will then allow us to recover the
// underlying TCP connection.
self.session_state.ping_timeout.replace(now + PING_TIMEOUT);
Ok(true) => {
let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE];

// Note: If we fail to serialize or write the packet, the ping timeout timer is
// still running, so we will recover the TCP connection in the future.
let packet = serialize::ping_req_message(&mut buffer)?;
self.network
.write_packet(&mut self.session_state, &self.clock, packet)?;
self.network.write(packet)?;
}

Ok(false) => {}
}

Ok(())
Expand Down Expand Up @@ -527,7 +525,9 @@ impl<

// If the connection is no longer active, reset the packet reader state and return. There's
// nothing more we can do.
if self.client.connection_state.state() != &States::Active {
if self.client.connection_state.state() != &States::Active
&& self.client.connection_state.state() != &States::Establishing
{
self.packet_reader.reset();
return Ok(());
}
Expand Down
Loading

0 comments on commit a959e94

Please sign in to comment.