Skip to content

Commit

Permalink
Merge pull request #63 from quartiq/feature/time-refactor
Browse files Browse the repository at this point in the history
Feature/time refactor
  • Loading branch information
ryan-summers authored Oct 8, 2021
2 parents a959e94 + 0a3c690 commit 95e5303
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keep-alive interval
* Fixing main docs.
* Added support for publishing with QoS 1
* Refactoring network stack management into a separate container class
* Keep-alive settings now take a u16 integer number of seconds

# Version 0.3.0
Version 0.3.0 was published on 2021-08-06
Expand Down
32 changes: 8 additions & 24 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ use crate::{
};

use embedded_nal::{IpAddr, SocketAddr, TcpClientStack};
use embedded_time::{
self,
duration::{Extensions, Seconds},
};

use heapless::String;

Expand Down Expand Up @@ -121,10 +117,7 @@ where
let packet = serialize::connect_message(
&mut buffer,
self.session_state.client_id.as_str().as_bytes(),
self.session_state
.keep_alive_interval
.unwrap_or(0.seconds())
.0 as u16,
self.session_state.keepalive_interval(),
&properties,
// Only perform a clean start if we do not have any session state.
!self.session_state.is_present(),
Expand Down Expand Up @@ -166,21 +159,14 @@ where
/// messages are sent within 50% of the keep-alive interval.
pub fn set_keepalive_interval(
&mut self,
interval: impl Into<Seconds<u32>>,
interval_seconds: u16,
) -> Result<(), Error<TcpStack::Error>> {
let interval = interval.into();
if interval.0 > u16::MAX as u32 {
return Err(ProtocolError::Invalid.into());
}
match self.connection_state.state() {
&States::Active => Err(Error::NotReady),
_ => {
self.session_state
.keep_alive_interval
.replace(interval.into());
Ok(())
}
if self.connection_state.state() != &States::Active {
return Err(Error::NotReady);
}

self.session_state.set_keepalive(interval_seconds);
Ok(())
}

/// Subscribe to a topic.
Expand Down Expand Up @@ -337,9 +323,7 @@ where
String::from_str(id).or(Err(Error::ProvidedClientIdTooLong))?;
}
Property::ServerKeepAlive(keep_alive) => {
self.session_state
.keep_alive_interval
.replace((keep_alive as u32).seconds());
self.session_state.set_keepalive(keep_alive);
}
_prop => info!("Ignoring property: {:?}", _prop),
};
Expand Down
37 changes: 24 additions & 13 deletions src/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use embedded_nal::IpAddr;
use heapless::{LinearMap, String, Vec};

use embedded_time::{
duration::{Extensions, Seconds},
duration::{Extensions, Milliseconds, Seconds},
Instant,
};

Expand All @@ -13,7 +13,7 @@ const PING_TIMEOUT: Seconds = Seconds(5);

pub struct SessionState<Clock: embedded_time::Clock, const MSG_SIZE: usize, const MSG_COUNT: usize>
{
pub keep_alive_interval: Option<Seconds<u32>>,
keep_alive_interval: Option<Milliseconds<u32>>,
ping_timeout: Option<Instant<Clock>>,
next_ping: Option<Instant<Clock>>,
pub broker: IpAddr,
Expand All @@ -37,7 +37,7 @@ impl<Clock: embedded_time::Clock, const MSG_SIZE: usize, const MSG_COUNT: usize>
broker,
client_id: id,
packet_id: 1,
keep_alive_interval: Some(59.seconds()),
keep_alive_interval: Some(59_000.milliseconds()),
pending_subscriptions: Vec::new(),
pending_publish: LinearMap::new(),
pending_publish_ordering: Vec::new(),
Expand All @@ -48,13 +48,30 @@ impl<Clock: embedded_time::Clock, const MSG_SIZE: usize, const MSG_COUNT: usize>
pub fn reset(&mut self) {
self.active = false;
self.packet_id = 1;
self.keep_alive_interval = Some(59.seconds());
self.keep_alive_interval = Some(59_000.milliseconds());
self.maximum_packet_size = None;
self.pending_subscriptions.clear();
self.pending_publish.clear();
self.pending_publish_ordering.clear();
}

/// Get the keep-alive interval as an integer number of seconds.
///
/// # Note
/// If no keep-alive interval is specified, zero is returned.
pub fn keepalive_interval(&self) -> u16 {
(self.keep_alive_interval.unwrap_or(0.milliseconds()).0 * 1000) as u16
}

/// Update the keep-alive interval.
///
/// # Args
/// * `seconds` - The number of seconds in the keep-alive interval.
pub fn set_keepalive(&mut self, seconds: u16) {
self.keep_alive_interval
.replace(Milliseconds(seconds as u32 * 1000));
}

/// Called when publish with QoS 1 is called so that we can keep track of PUBACK
pub fn handle_publish(&mut self, qos: QoS, id: u16, packet: &[u8]) {
// This is not called for QoS 0 and QoS 2 is not implemented yet
Expand Down Expand Up @@ -107,12 +124,9 @@ impl<Clock: embedded_time::Clock, const MSG_SIZE: usize, const MSG_COUNT: usize>
self.active = true;
self.ping_timeout = None;

// The next ping should be sent out in half the keep-alive interval from now. To calculate
// that, we take the integral number of seconds in the keep-alive interval and multiply it
// by 500ms (1/2 seconds) to get half the interval.
// The next ping should be sent out in half the keep-alive interval from now.
if let Some(interval) = self.keep_alive_interval {
self.next_ping
.replace(now + 500.milliseconds() * interval.0);
self.next_ping.replace(now + interval / 2);
}
}

Expand Down Expand Up @@ -171,10 +185,7 @@ impl<Clock: embedded_time::Clock, const MSG_SIZE: usize, const MSG_COUNT: usize>
// Update the next ping deadline if the ping is due.
if now > ping_deadline {
// The next ping should be sent out in half the keep-alive interval from now.
// To calculate that, we take the integral number of seconds in the keep-alive
// interval and multiply it by 500ms (1/2 seconds) to get half the interval.
self.next_ping
.replace(now + 500.milliseconds() * keep_alive_interval.0);
self.next_ping.replace(now + keep_alive_interval / 2);

self.ping_timeout.replace(now + PING_TIMEOUT);
return Ok(true);
Expand Down

0 comments on commit 95e5303

Please sign in to comment.