From f6e6186c14cf642e09aef8e943161651cb7c95d3 Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Wed, 25 Sep 2024 17:15:27 +0200 Subject: [PATCH 01/13] Echo timestamps to calculate latency --- cflib/crtp/radiodriver.py | 67 ++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/cflib/crtp/radiodriver.py b/cflib/crtp/radiodriver.py index 7d22b6c52..9399e8f7f 100644 --- a/cflib/crtp/radiodriver.py +++ b/cflib/crtp/radiodriver.py @@ -49,8 +49,10 @@ from urllib.parse import parse_qs from urllib.parse import urlparse +import numpy as np + import cflib.drivers.crazyradio as crazyradio -from .crtpstack import CRTPPacket +from .crtpstack import CRTPPacket, CRTPPort from .exceptions import WrongUriType from cflib.crtp.crtpdriver import CRTPDriver from cflib.drivers.crazyradio import Crazyradio @@ -534,6 +536,12 @@ def __init__(self, radio, inQueue, outQueue, self._retry_sum = 0 self.rate_limit = rate_limit + self._packed_last_ping_time = None + self._last_ping_time = 0 + self._latencies = [] + self._p95_latency = None + self._ping_timeout = 3 + self._curr_up = 0 self._curr_down = 1 @@ -632,8 +640,12 @@ def run(self): # If there is a copter in range, the packet is analysed and the # next packet to send is prepared if (len(data) > 0): - inPacket = CRTPPacket(data[0], list(data[1:])) - self._in_queue.put(inPacket) + if data[1:] == self._packed_last_ping_time: + # This is a ping response + self._calculate_latency(struct.unpack(" self._ping_timeout): + out_packet = CRTPPacket() + out_packet.set_header(CRTPPort.LINKCTRL, 0) + + # Pack the current time as the ping timestamp + current_time = time.time() + out_packet.data = struct.pack(" 100: + self._latencies.pop(0) + self._p95_latency = np.percentile(self._latencies, 95) + print("95th percentile latency: {} ms".format(self._p95_latency)) + + # Indicate that the next ping can be sent + self._packed_last_ping_time = None def set_retries_before_disconnect(nr_of_retries): global _nr_of_retries From fda63c1158d1b4565a032561cbc4038a5680ee94 Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Thu, 26 Sep 2024 12:40:49 +0200 Subject: [PATCH 02/13] Refactor latency measurement into dedicated Latency class - Moved latency measurement from the radio driver to a new Latency class for better organization and readability. - Generalized the LinkStatistics class to allow for easy addition of other link statistics in the future. --- cflib/crazyflie/__init__.py | 32 ++++- cflib/crazyflie/link_statistics.py | 186 +++++++++++++++++++++++++++++ cflib/crtp/radiodriver.py | 56 +-------- 3 files changed, 219 insertions(+), 55 deletions(-) create mode 100644 cflib/crazyflie/link_statistics.py diff --git a/cflib/crazyflie/__init__.py b/cflib/crazyflie/__init__.py index a64930683..679b32bde 100644 --- a/cflib/crazyflie/__init__.py +++ b/cflib/crazyflie/__init__.py @@ -44,6 +44,7 @@ from .commander import Commander from .console import Console from .extpos import Extpos +from .link_statistics import LinkStatistics from .localization import Localization from .log import Log from .mem import Memory @@ -121,6 +122,7 @@ def __init__(self, link=None, ro_cache=None, rw_cache=None): self.mem = Memory(self) self.platform = PlatformService(self) self.appchannel = Appchannel(self) + self.link_statistics = LinkStatistics(self) self.link_uri = '' @@ -153,6 +155,11 @@ def __init__(self, link=None, ro_cache=None, rw_cache=None): self.fully_connected.add_callback( lambda uri: logger.info('Callback->Connection completed [%s]', uri)) + self.connected.add_callback( + lambda uri: self.link_statistics.start()) + self.disconnected.add_callback( + lambda uri: self.link_statistics.stop()) + def _disconnected(self, link_uri): """ Callback when disconnected.""" self.connected_ts = None @@ -283,6 +290,14 @@ def add_port_callback(self, port, cb): def remove_port_callback(self, port, cb): """Remove the callback cb on port""" self.incoming.remove_port_callback(port, cb) + + def add_header_callback(self, cb, port, channel, port_mask=0xFF, channel_mask=0xFF): + """Add a callback to cb on port and channel""" + self.incoming.add_header_callback(cb, port, channel, port_mask, channel_mask) + + def remove_header_callback(self, cb, port, channel, port_mask=0xFF, channel_mask=0xFF): + """Remove the callback cb on port and channel""" + self.incoming.remove_header_callback(cb, port, channel, port_mask, channel_mask) def _no_answer_do_retry(self, pk, pattern): """Resend packets that we have not gotten answers to""" @@ -380,9 +395,7 @@ def add_port_callback(self, port, cb): def remove_port_callback(self, port, cb): """Remove a callback for data that comes on a specific port""" logger.debug('Removing callback on port [%d] to [%s]', port, cb) - for port_callback in self.cb: - if port_callback.port == port and port_callback.callback == cb: - self.cb.remove(port_callback) + self.remove_header_callback(cb, port, 0, 0xff, 0x0) def add_header_callback(self, cb, port, channel, port_mask=0xFF, channel_mask=0xFF): @@ -394,6 +407,19 @@ def add_header_callback(self, cb, port, channel, port_mask=0xFF, self.cb.append(_CallbackContainer(port, port_mask, channel, channel_mask, cb)) + def remove_header_callback(self, cb, port, channel, port_mask=0xFF, + channel_mask=0xFF): + """ + Remove a callback for a specific port/header callback with the + possibility to add a mask for channel and port for multiple + hits for same callback. + """ + for port_callback in self.cb: + if port_callback.port == port and port_callback.port_mask == port_mask and \ + port_callback.channel == channel and port_callback.channel_mask == channel_mask and \ + port_callback.callback == cb: + self.cb.remove(port_callback) + def run(self): while True: if self.cf.link is None: diff --git a/cflib/crazyflie/link_statistics.py b/cflib/crazyflie/link_statistics.py new file mode 100644 index 000000000..3dfe28819 --- /dev/null +++ b/cflib/crazyflie/link_statistics.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# +# ,---------, ____ _ __ +# | ,-^-, | / __ )(_) /_______________ _____ ___ +# | ( O ) | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# | / ,--' | / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# +------` /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2024 Bitcraze AB +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, in version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +This module provides tools for tracking statistics related to the communication +link between the Crazyflie and the lib. Currently, it focuses on tracking latency +but is designed to be extended with additional link statistics in the future. +""" + +from cflib.crtp.crtpstack import CRTPPort, CRTPPacket +from threading import Thread, Event +import time +import struct +import numpy as np + +__author__ = "Bitcraze AB" +__all__ = ["LinkStatistics"] + +PING_HEADER = 0x0 +ECHO_CHANNEL = 0 + + +class LinkStatistics: + """ + LinkStatistics class manages the collection of various statistics related to the + communication link between the Crazyflie and the lib. + + This class serves as a high-level manager, initializing and coordinating multiple + statistics trackers, such as Latency. It allows starting and stopping all + statistics trackers simultaneously. Future statistics can be added to extend + the class's functionality. + + Attributes: + _cf (Crazyflie): A reference to the Crazyflie instance. + latency (Latency): An instance of the Latency class that tracks latency statistics. + """ + + def __init__(self, crazyflie): + self._cf = crazyflie + + self.latency = Latency(self._cf) + + def start(self): + """ + Start collecting all statistics. + """ + self.latency.start() + + def stop(self): + """ + Stop collecting all statistics. + """ + self.latency.stop() + + +class Latency: + """ + The Latency class measures and tracks the latency of the communication link + between the Crazyflie and the lib. + + This class periodically sends ping requests to the Crazyflie and tracks + the round-trip time (latency). It calculates and stores the 95th percentile + latency over a rolling window of recent latency measurements. + + Attributes: + _cf (Crazyflie): A reference to the Crazyflie instance. + latency (float): The current calculated 95th percentile latency in milliseconds. + _stop_event (Event): An event object to control the stopping of the ping thread. + _ping_thread_instance (Thread): Thread instance for sending ping requests at intervals. + """ + + def __init__(self, crazyflie): + self._cf = crazyflie + self._cf.add_header_callback(self._ping_response, CRTPPort.LINKCTRL, 0) + self._stop_event = Event() + self._ping_thread_instance = Thread(target=self._ping_thread) + self.latency = 0 + + def start(self): + """ + Start the latency tracking process. + + This method initiates a background thread that sends ping requests + at regular intervals to measure and track latency statistics. + """ + self._ping_thread_instance.start() + + def stop(self): + """ + Stop the latency tracking process. + + This method stops the background thread and ceases sending further + ping requests, halting latency measurement. + """ + self._stop_event.set() + self._ping_thread_instance.join() + + def _ping_thread(self, interval: float = 1.0) -> None: + """ + Background thread method that sends a ping to the Crazyflie at regular intervals. + + This method runs in a separate thread and continues to send ping requests + until the stop event is set. + + Args: + interval (float): The time (in seconds) to wait between ping requests. Default is 1 second. + """ + while not self._stop_event.is_set(): + self.ping() + time.sleep(interval) + + def ping(self) -> None: + """ + Send a ping request to the Crazyflie to measure latency. + + A ping packet is sent to the Crazyflie with the current timestamp and a + header identifier to differentiate it from other echo responses. The latency + is calculated upon receiving the response. + """ + ping_packet = CRTPPacket() + ping_packet.set_header(CRTPPort.LINKCTRL, ECHO_CHANNEL) + + # Pack the current time as the ping timestamp + current_time = time.time() + ping_packet.data = struct.pack(" 100: + self._latencies.pop(0) + p95_latency = np.percentile(self._latencies, 95) + return p95_latency diff --git a/cflib/crtp/radiodriver.py b/cflib/crtp/radiodriver.py index 9399e8f7f..5f773cc0a 100644 --- a/cflib/crtp/radiodriver.py +++ b/cflib/crtp/radiodriver.py @@ -52,7 +52,7 @@ import numpy as np import cflib.drivers.crazyradio as crazyradio -from .crtpstack import CRTPPacket, CRTPPort +from .crtpstack import CRTPPacket from .exceptions import WrongUriType from cflib.crtp.crtpdriver import CRTPDriver from cflib.drivers.crazyradio import Crazyradio @@ -536,12 +536,6 @@ def __init__(self, radio, inQueue, outQueue, self._retry_sum = 0 self.rate_limit = rate_limit - self._packed_last_ping_time = None - self._last_ping_time = 0 - self._latencies = [] - self._p95_latency = None - self._ping_timeout = 3 - self._curr_up = 0 self._curr_down = 1 @@ -572,7 +566,6 @@ def _send_packet_safe(self, cr, packet): self._curr_down = 1 - self._curr_down if resp and resp.ack: self._curr_up = 1 - self._curr_up - return resp def run(self): @@ -640,12 +633,8 @@ def run(self): # If there is a copter in range, the packet is analysed and the # next packet to send is prepared if (len(data) > 0): - if data[1:] == self._packed_last_ping_time: - # This is a ping response - self._calculate_latency(struct.unpack(" self._ping_timeout): - out_packet = CRTPPacket() - out_packet.set_header(CRTPPort.LINKCTRL, 0) - - # Pack the current time as the ping timestamp - current_time = time.time() - out_packet.data = struct.pack(" 100: - self._latencies.pop(0) - self._p95_latency = np.percentile(self._latencies, 95) - print("95th percentile latency: {} ms".format(self._p95_latency)) - - # Indicate that the next ping can be sent - self._packed_last_ping_time = None def set_retries_before_disconnect(nr_of_retries): global _nr_of_retries From bb2e55c05c9c7ad58680954be1167371e0dbdf8e Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Thu, 26 Sep 2024 12:43:39 +0200 Subject: [PATCH 03/13] Undo redundant radiodriver changes --- cflib/crtp/radiodriver.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/cflib/crtp/radiodriver.py b/cflib/crtp/radiodriver.py index 5f773cc0a..7d22b6c52 100644 --- a/cflib/crtp/radiodriver.py +++ b/cflib/crtp/radiodriver.py @@ -49,8 +49,6 @@ from urllib.parse import parse_qs from urllib.parse import urlparse -import numpy as np - import cflib.drivers.crazyradio as crazyradio from .crtpstack import CRTPPacket from .exceptions import WrongUriType @@ -566,6 +564,7 @@ def _send_packet_safe(self, cr, packet): self._curr_down = 1 - self._curr_down if resp and resp.ack: self._curr_up = 1 - self._curr_up + return resp def run(self): @@ -653,12 +652,10 @@ def run(self): # get the next packet to send of relaxation (wait 10ms) outPacket = None - - if not outPacket: - try: - outPacket = self._out_queue.get(True, waitTime) - except queue.Empty: - outPacket = None + try: + outPacket = self._out_queue.get(True, waitTime) + except queue.Empty: + outPacket = None dataOut = array.array('B') From 1b1f1b129cd3de958b6fd90bad22a55425f8e06a Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Thu, 26 Sep 2024 12:57:10 +0200 Subject: [PATCH 04/13] =?UTF-8?q?=F0=9F=90=8D=20Python=20Formatting=20Fias?= =?UTF-8?q?co:=20The=20Quest=20for=20Perfect=20Quotation!?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In a valiant attempt to appease the relentless gods of autopep8 and the all-seeing eyes of CI, I embarked on an epic journey to fix double-quoted strings and reorder imports. --- cflib/crazyflie/__init__.py | 24 ++++++++++++------------ cflib/crazyflie/link_statistics.py | 22 ++++++++++++---------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/cflib/crazyflie/__init__.py b/cflib/crazyflie/__init__.py index 679b32bde..015188f44 100644 --- a/cflib/crazyflie/__init__.py +++ b/cflib/crazyflie/__init__.py @@ -290,7 +290,7 @@ def add_port_callback(self, port, cb): def remove_port_callback(self, port, cb): """Remove the callback cb on port""" self.incoming.remove_port_callback(port, cb) - + def add_header_callback(self, cb, port, channel, port_mask=0xFF, channel_mask=0xFF): """Add a callback to cb on port and channel""" self.incoming.add_header_callback(cb, port, channel, port_mask, channel_mask) @@ -408,17 +408,17 @@ def add_header_callback(self, cb, port, channel, port_mask=0xFF, channel, channel_mask, cb)) def remove_header_callback(self, cb, port, channel, port_mask=0xFF, - channel_mask=0xFF): - """ - Remove a callback for a specific port/header callback with the - possibility to add a mask for channel and port for multiple - hits for same callback. - """ - for port_callback in self.cb: - if port_callback.port == port and port_callback.port_mask == port_mask and \ - port_callback.channel == channel and port_callback.channel_mask == channel_mask and \ - port_callback.callback == cb: - self.cb.remove(port_callback) + channel_mask=0xFF): + """ + Remove a callback for a specific port/header callback with the + possibility to add a mask for channel and port for multiple + hits for same callback. + """ + for port_callback in self.cb: + if port_callback.port == port and port_callback.port_mask == port_mask and \ + port_callback.channel == channel and port_callback.channel_mask == channel_mask and \ + port_callback.callback == cb: + self.cb.remove(port_callback) def run(self): while True: diff --git a/cflib/crazyflie/link_statistics.py b/cflib/crazyflie/link_statistics.py index 3dfe28819..16c4fe154 100644 --- a/cflib/crazyflie/link_statistics.py +++ b/cflib/crazyflie/link_statistics.py @@ -19,21 +19,23 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - """ This module provides tools for tracking statistics related to the communication link between the Crazyflie and the lib. Currently, it focuses on tracking latency but is designed to be extended with additional link statistics in the future. """ - -from cflib.crtp.crtpstack import CRTPPort, CRTPPacket -from threading import Thread, Event -import time import struct +import time +from threading import Event +from threading import Thread + import numpy as np -__author__ = "Bitcraze AB" -__all__ = ["LinkStatistics"] +from cflib.crtp.crtpstack import CRTPPacket +from cflib.crtp.crtpstack import CRTPPort + +__author__ = 'Bitcraze AB' +__all__ = ['LinkStatistics'] PING_HEADER = 0x0 ECHO_CHANNEL = 0 @@ -141,7 +143,7 @@ def ping(self) -> None: # Pack the current time as the ping timestamp current_time = time.time() - ping_packet.data = struct.pack(" Date: Thu, 26 Sep 2024 15:06:02 +0200 Subject: [PATCH 05/13] Enhance Latency class to allow restarting the ping thread - Modified start() and stop() methods for better thread management. - Implemented thread reinitialization on start after stop. --- cflib/crazyflie/link_statistics.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cflib/crazyflie/link_statistics.py b/cflib/crazyflie/link_statistics.py index 16c4fe154..14630405a 100644 --- a/cflib/crazyflie/link_statistics.py +++ b/cflib/crazyflie/link_statistics.py @@ -94,7 +94,7 @@ def __init__(self, crazyflie): self._cf = crazyflie self._cf.add_header_callback(self._ping_response, CRTPPort.LINKCTRL, 0) self._stop_event = Event() - self._ping_thread_instance = Thread(target=self._ping_thread) + self._ping_thread_instance = None self.latency = 0 def start(self): @@ -104,7 +104,10 @@ def start(self): This method initiates a background thread that sends ping requests at regular intervals to measure and track latency statistics. """ - self._ping_thread_instance.start() + if self._ping_thread_instance is None or not self._ping_thread_instance.is_alive(): + self._stop_event.clear() + self._ping_thread_instance = Thread(target=self._ping_thread) + self._ping_thread_instance.start() def stop(self): """ @@ -114,7 +117,9 @@ def stop(self): ping requests, halting latency measurement. """ self._stop_event.set() - self._ping_thread_instance.join() + if self._ping_thread_instance is not None: + self._ping_thread_instance.join() + self._ping_thread_instance = None def _ping_thread(self, interval: float = 1.0) -> None: """ From a80e67005b81ff9edccd3e4d67393ac2780e86bf Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Thu, 26 Sep 2024 16:17:56 +0200 Subject: [PATCH 06/13] Add Caller to indicate updated latency + example how to use --- cflib/crazyflie/link_statistics.py | 3 ++ examples/link_quality/latency.py | 55 ++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 examples/link_quality/latency.py diff --git a/cflib/crazyflie/link_statistics.py b/cflib/crazyflie/link_statistics.py index 14630405a..1c536fdac 100644 --- a/cflib/crazyflie/link_statistics.py +++ b/cflib/crazyflie/link_statistics.py @@ -33,6 +33,7 @@ from cflib.crtp.crtpstack import CRTPPacket from cflib.crtp.crtpstack import CRTPPort +from cflib.utils.callbacks import Caller __author__ = 'Bitcraze AB' __all__ = ['LinkStatistics'] @@ -96,6 +97,7 @@ def __init__(self, crazyflie): self._stop_event = Event() self._ping_thread_instance = None self.latency = 0 + self.latencyUpdated = Caller() def start(self): """ @@ -167,6 +169,7 @@ def _ping_response(self, packet): if received_header != PING_HEADER: return self.latency = self._calculate_p95_latency(received_timestamp) + self.latencyUpdated.call(self.latency) def _calculate_p95_latency(self, timestamp): """ diff --git a/examples/link_quality/latency.py b/examples/link_quality/latency.py new file mode 100644 index 000000000..ece87577c --- /dev/null +++ b/examples/link_quality/latency.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# +# ,---------, ____ _ __ +# | ,-^-, | / __ )(_) /_______________ _____ ___ +# | ( O ) | / __ / / __/ ___/ ___/ __ `/_ / / _ \ +# | / ,--' | / /_/ / / /_/ /__/ / / /_/ / / /_/ __/ +# +------` /_____/_/\__/\___/_/ \__,_/ /___/\___/ +# +# Copyright (C) 2024 Bitcraze AB +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, in version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import time + +import cflib.crtp # noqa +from cflib.crazyflie import Crazyflie +from cflib.crazyflie.syncCrazyflie import SyncCrazyflie +from cflib.utils import uri_helper + +# Reads the CFLIB_URI environment variable for URI or uses default +uri = uri_helper.uri_from_env(default="radio://0/90/2M/E7E7E7E7E7") + + +def latency_callback(latency: float): + """A callback to run when we get an updated latency estimate""" + print(f"Latency: {latency:.3f} ms") + + +if __name__ == "__main__": + # Initialize the low-level drivers + cflib.crtp.init_drivers() + + # Create Crazyflie object, with cache to avoid re-reading param and log TOC + cf = Crazyflie(rw_cache="./cache") + + # Add a callback to whenever we receive an updated latency estimate + # + # This could also be a Python lambda, something like: + cf.link_statistics.latency.latencyUpdated.add_callback(latency_callback) + + # This will connect the Crazyflie with the URI specified above. + with SyncCrazyflie(uri, cf=cf) as scf: + print("[host] Connected, use ctrl-c to quit.") + + while True: + time.sleep(1) From 4136a69eb0ccc3dcfd66fa728e482cbcc884a9d7 Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Mon, 30 Sep 2024 12:51:25 +0200 Subject: [PATCH 07/13] " -> ' --- examples/link_quality/latency.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/link_quality/latency.py b/examples/link_quality/latency.py index ece87577c..b73c92b72 100644 --- a/examples/link_quality/latency.py +++ b/examples/link_quality/latency.py @@ -27,20 +27,20 @@ from cflib.utils import uri_helper # Reads the CFLIB_URI environment variable for URI or uses default -uri = uri_helper.uri_from_env(default="radio://0/90/2M/E7E7E7E7E7") +uri = uri_helper.uri_from_env(default='radio://0/90/2M/E7E7E7E7E7') def latency_callback(latency: float): """A callback to run when we get an updated latency estimate""" - print(f"Latency: {latency:.3f} ms") + print(f'Latency: {latency:.3f} ms') -if __name__ == "__main__": +if __name__ == '__main__': # Initialize the low-level drivers cflib.crtp.init_drivers() # Create Crazyflie object, with cache to avoid re-reading param and log TOC - cf = Crazyflie(rw_cache="./cache") + cf = Crazyflie(rw_cache='./cache') # Add a callback to whenever we receive an updated latency estimate # @@ -49,7 +49,7 @@ def latency_callback(latency: float): # This will connect the Crazyflie with the URI specified above. with SyncCrazyflie(uri, cf=cf) as scf: - print("[host] Connected, use ctrl-c to quit.") + print('[host] Connected, use ctrl-c to quit.') while True: time.sleep(1) From da4fcbb235c801194c0aa3563dc01c103df580db Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Thu, 3 Oct 2024 10:28:46 +0200 Subject: [PATCH 08/13] Receive uplink RSSI - Refactor link quality callback to contain signal health obj with additional signal health information - Parse uplink RSSI in ack into signal health obj --- cflib/crazyflie/__init__.py | 12 ++-- cflib/crtp/__init__.py | 4 +- cflib/crtp/cflinkcppdriver.py | 18 ++--- cflib/crtp/crtpdriver.py | 4 +- cflib/crtp/radiodriver.py | 39 +++++----- cflib/crtp/signal_health.py | 71 +++++++++++++++++++ cflib/crtp/usbdriver.py | 19 +++-- sys_test/swarm_test_rig/test_response_time.py | 4 +- 8 files changed, 120 insertions(+), 51 deletions(-) create mode 100644 cflib/crtp/signal_health.py diff --git a/cflib/crazyflie/__init__.py b/cflib/crazyflie/__init__.py index a64930683..988cc4958 100644 --- a/cflib/crazyflie/__init__.py +++ b/cflib/crazyflie/__init__.py @@ -99,6 +99,7 @@ def __init__(self, link=None, ro_cache=None, rw_cache=None): self.packet_sent = Caller() # Called when the link driver updates the link quality measurement self.link_quality_updated = Caller() + self.uplink_rssi_updated = Caller() self.state = State.DISCONNECTED @@ -204,9 +205,12 @@ def _link_error_cb(self, errmsg): self.connection_lost.call(self.link_uri, errmsg) self.state = State.DISCONNECTED - def _link_quality_cb(self, percentage): - """Called from link driver to report link quality""" - self.link_quality_updated.call(percentage) + def _signal_health_cb(self, signal_health): + """Called from link driver to report signal health""" + if signal_health.link_quality: + self.link_quality_updated.call(signal_health.link_quality) + if signal_health.uplink_rssi: + self.uplink_rssi_updated.call(signal_health.uplink_rssi) def _check_for_initial_packet_cb(self, data): """ @@ -229,7 +233,7 @@ def open_link(self, link_uri): self.link_uri = link_uri try: self.link = cflib.crtp.get_link_driver( - link_uri, self._link_quality_cb, self._link_error_cb) + link_uri, self._signal_health_cb, self._link_error_cb) if not self.link: message = 'No driver found or malformed URI: {}' \ diff --git a/cflib/crtp/__init__.py b/cflib/crtp/__init__.py index 756e9a66c..ddb950b92 100644 --- a/cflib/crtp/__init__.py +++ b/cflib/crtp/__init__.py @@ -89,14 +89,14 @@ def get_interfaces_status(): return status -def get_link_driver(uri, link_quality_callback=None, link_error_callback=None): +def get_link_driver(uri, signal_health_callback=None, link_error_callback=None): """Return the link driver for the given URI. Returns None if no driver was found for the URI or the URI was not well formatted for the matching driver.""" for driverClass in CLASSES: try: instance = driverClass() - instance.connect(uri, link_quality_callback, link_error_callback) + instance.connect(uri, signal_health_callback, link_error_callback) return instance except WrongUriType: continue diff --git a/cflib/crtp/cflinkcppdriver.py b/cflib/crtp/cflinkcppdriver.py index ab69f0773..fff1ebfc3 100644 --- a/cflib/crtp/cflinkcppdriver.py +++ b/cflib/crtp/cflinkcppdriver.py @@ -34,6 +34,7 @@ from .crtpstack import CRTPPacket from cflib.crtp.crtpdriver import CRTPDriver +from cflib.crtp.signal_health import SignalHealth __author__ = 'Bitcraze AB' __all__ = ['CfLinkCppDriver'] @@ -54,22 +55,23 @@ def __init__(self): self.needs_resending = False self._connection = None + self._signal_health = SignalHealth() - def connect(self, uri, link_quality_callback, link_error_callback): + def connect(self, uri, signal_health_callback, link_error_callback): """Connect the driver to a specified URI @param uri Uri of the link to open - @param link_quality_callback Callback to report link quality in percent + @param signal_health_callback Callback to report signal health @param link_error_callback Callback to report errors (will result in disconnection) """ self._connection = cflinkcpp.Connection(uri) self.uri = uri - self._link_quality_callback = link_quality_callback + self._signal_health_callback = signal_health_callback self._link_error_callback = link_error_callback - if uri.startswith('radio://') and link_quality_callback is not None: + if uri.startswith('radio://') and signal_health_callback is not None: self._last_connection_stats = self._connection.statistics self._recompute_link_quality_timer() @@ -181,13 +183,13 @@ def _recompute_link_quality_timer(self): sent_count = stats.sent_count - self._last_connection_stats.sent_count ack_count = stats.ack_count - self._last_connection_stats.ack_count if sent_count > 0: - link_quality = min(ack_count, sent_count) / sent_count * 100.0 + self._signal_health.link_quality = min(ack_count, sent_count) / sent_count * 100.0 else: - link_quality = 1 + self._signal_health.link_quality = 1 self._last_connection_stats = stats - if self._link_quality_callback is not None: - self._link_quality_callback(link_quality) + if self._signal_health_callback is not None: + self._signal_health_callback(self._signal_health) if sent_count > 10 and ack_count == 0 and self._link_error_callback is not None: self._link_error_callback('Too many packets lost') diff --git a/cflib/crtp/crtpdriver.py b/cflib/crtp/crtpdriver.py index 2f620fd30..d38247731 100644 --- a/cflib/crtp/crtpdriver.py +++ b/cflib/crtp/crtpdriver.py @@ -42,11 +42,11 @@ def __init__(self): """ self.needs_resending = True - def connect(self, uri, link_quality_callback, link_error_callback): + def connect(self, uri, signal_health_callback, link_error_callback): """Connect the driver to a specified URI @param uri Uri of the link to open - @param link_quality_callback Callback to report link quality in percent + @param signal_health_callback Callback to report signal health @param link_error_callback Callback to report errors (will result in disconnection) """ diff --git a/cflib/crtp/radiodriver.py b/cflib/crtp/radiodriver.py index 7d22b6c52..e37b8b37d 100644 --- a/cflib/crtp/radiodriver.py +++ b/cflib/crtp/radiodriver.py @@ -30,7 +30,6 @@ """ import array import binascii -import collections import logging import queue import re @@ -53,6 +52,7 @@ from .crtpstack import CRTPPacket from .exceptions import WrongUriType from cflib.crtp.crtpdriver import CRTPDriver +from cflib.crtp.signal_health import SignalHealth from cflib.drivers.crazyradio import Crazyradio @@ -241,20 +241,20 @@ def __init__(self): self._radio = None self.uri = '' self.link_error_callback = None - self.link_quality_callback = None + self.signal_health_callback = None self.in_queue = None self.out_queue = None self._thread = None self.needs_resending = True - def connect(self, uri, link_quality_callback, link_error_callback): + def connect(self, uri, signal_health_callback, link_error_callback): """ Connect the link driver to a specified URI of the format: radio:////[250K,1M,2M] - The callback for linkQuality can be called at any moment from the - driver to report back the link quality in percentage. The - callback from linkError will be called when a error occurs with + The callback for signal health can be called at any moment from the + driver to report back the signal health. The callback from linkError + will be called when a error occurs with an error message. """ @@ -283,7 +283,7 @@ def connect(self, uri, link_quality_callback, link_error_callback): self._thread = _RadioDriverThread(self._radio, self.in_queue, self.out_queue, - link_quality_callback, + signal_health_callback, link_error_callback, self, rate_limit) @@ -381,7 +381,7 @@ def restart(self): self._thread = _RadioDriverThread(self._radio, self.in_queue, self.out_queue, - self.link_quality_callback, + self.signal_health_callback, self.link_error_callback, self) self._thread.start() @@ -401,7 +401,7 @@ def close(self): # Clear callbacks self.link_error_callback = None - self.link_quality_callback = None + self.signal_health_callback = None def _scan_radio_channels(self, radio: _SharedRadioInstance, start=0, stop=125): @@ -520,7 +520,7 @@ class _RadioDriverThread(threading.Thread): Crazyradio USB driver. """ def __init__(self, radio, inQueue, outQueue, - link_quality_callback, link_error_callback, link, rate_limit: Optional[int]): + signal_health_callback, link_error_callback, link, rate_limit: Optional[int]): """ Create the object """ threading.Thread.__init__(self) self._radio = radio @@ -528,10 +528,9 @@ def __init__(self, radio, inQueue, outQueue, self._out_queue = outQueue self._sp = False self._link_error_callback = link_error_callback - self._link_quality_callback = link_quality_callback + self._signal_health_callback = signal_health_callback + self._signal_health = SignalHealth() self._retry_before_disconnect = _nr_of_retries - self._retries = collections.deque() - self._retry_sum = 0 self.rate_limit = rate_limit self._curr_up = 0 @@ -606,16 +605,10 @@ def run(self): if ackStatus is None: logger.info('Dongle reported ACK status == None') continue - - if (self._link_quality_callback is not None): - # track the mean of a sliding window of the last N packets - retry = 10 - ackStatus.retry - self._retries.append(retry) - self._retry_sum += retry - if len(self._retries) > 100: - self._retry_sum -= self._retries.popleft() - link_quality = float(self._retry_sum) / len(self._retries) * 10 - self._link_quality_callback(link_quality) + else: + self._signal_health.update(ackStatus) + if (self._signal_health_callback is not None): + self._signal_health_callback(self._signal_health) # If no copter, retry if ackStatus.ack is False: diff --git a/cflib/crtp/signal_health.py b/cflib/crtp/signal_health.py new file mode 100644 index 000000000..057034101 --- /dev/null +++ b/cflib/crtp/signal_health.py @@ -0,0 +1,71 @@ +import collections +import time + +import numpy as np + + +class SignalHealth: + """ + Tracks the health of the signal by monitoring link quality and uplink RSSI + using exponential moving averages. + """ + + def __init__(self, alpha=0.1): + """ + Initialize the SignalHealth class. + + :param alpha: Weight for the exponential moving average (default 0.1) + """ + self.alpha = alpha + self.link_quality = 0 + self.uplink_rssi = 0 + self._retries = collections.deque() + self._retry_sum = 0 + + def update(self, ack): + """ + Update the signal health based on the acknowledgment data. + + :param ack: Acknowledgment object containing retry and RSSI data. + """ + self._update_link_quality(ack) + self._update_rssi(ack) + + def _update_link_quality(self, ack): + """ + Updates the link quality based on the acknowledgment data. + + :param ack: Acknowledgment object with retry data. + """ + retry = 10 - ack.retry + self._retries.append(retry) + self._retry_sum += retry + if len(self._retries) > 100: + self._retry_sum -= self._retries.popleft() + self.link_quality = float(self._retry_sum) / len(self._retries) * 10 + + def _update_rssi(self, ack): + """ + Updates the uplink RSSI based on the acknowledgment signal. + + :param ack: Acknowledgment object with RSSI data. + """ + if not hasattr(self, '_rssi_timestamps'): + self._rssi_timestamps = collections.deque(maxlen=100) + if not hasattr(self, '_rssi_values'): + self._rssi_values = collections.deque(maxlen=100) + + # update RSSI if the acknowledgment contains RSSI data + if ack.ack and len(ack.data) > 2 and ack.data[0] & 0xf3 == 0xf3 and ack.data[1] == 0x01: + instantaneous_rssi = ack.data[2] + self._rssi_values.append(instantaneous_rssi) + self._rssi_timestamps.append(time.time()) + + # Calculate time-weighted average RSSI + if len(self._rssi_timestamps) >= 2: # At least 2 points are needed to calculate differences + time_diffs = np.diff(self._rssi_timestamps, prepend=time.time()) + weights = np.exp(-time_diffs) + weighted_average = np.sum(weights * self._rssi_values) / np.sum(weights) + self.uplink_rssi = weighted_average + else: + self.uplink_rssi = instantaneous_rssi # Return the raw value if not enough data diff --git a/cflib/crtp/usbdriver.py b/cflib/crtp/usbdriver.py index 859244438..eb18fed27 100644 --- a/cflib/crtp/usbdriver.py +++ b/cflib/crtp/usbdriver.py @@ -52,21 +52,20 @@ def __init__(self): self.cfusb = None self.uri = '' self.link_error_callback = None - self.link_quality_callback = None + self.signal_health_callback = None self.in_queue = None self.out_queue = None self._thread = None self.needs_resending = False - def connect(self, uri, link_quality_callback, link_error_callback): + def connect(self, uri, signal_health_callback, link_error_callback): """ Connect the link driver to a specified URI of the format: radio:////[250K,1M,2M] - The callback for linkQuality can be called at any moment from the - driver to report back the link quality in percentage. The - callback from linkError will be called when a error occurs with - an error message. + The callback for signal health can be called at any moment from + the driver to report back the signal health. The callback from + linkError will be called when a error occurs with an error message. """ # check if the URI is a radio URI @@ -100,7 +99,7 @@ def connect(self, uri, link_quality_callback, link_error_callback): # Launch the comm thread self._thread = _UsbReceiveThread(self.cfusb, self.in_queue, - link_quality_callback, + signal_health_callback, link_error_callback) self._thread.start() @@ -152,7 +151,7 @@ def restart(self): return self._thread = _UsbReceiveThread(self.cfusb, self.in_queue, - self.link_quality_callback, + self.signal_health_callback, self.link_error_callback) self._thread.start() @@ -208,7 +207,7 @@ class _UsbReceiveThread(threading.Thread): Radio link receiver thread used to read data from the Crazyradio USB driver. """ - def __init__(self, cfusb, inQueue, link_quality_callback, + def __init__(self, cfusb, inQueue, signal_health_callback, link_error_callback): """ Create the object """ threading.Thread.__init__(self) @@ -216,7 +215,7 @@ def __init__(self, cfusb, inQueue, link_quality_callback, self.in_queue = inQueue self.sp = False self.link_error_callback = link_error_callback - self.link_quality_callback = link_quality_callback + self.signal_health_callback = signal_health_callback def stop(self): """ Stop the thread """ diff --git a/sys_test/swarm_test_rig/test_response_time.py b/sys_test/swarm_test_rig/test_response_time.py index da9222fce..582a9519f 100644 --- a/sys_test/swarm_test_rig/test_response_time.py +++ b/sys_test/swarm_test_rig/test_response_time.py @@ -128,14 +128,14 @@ def _is_response_correct_seq_nr(self, response, seq_nr): return False def connect_link(self, uri): - link = cflib.crtp.get_link_driver(uri, self._link_quality_cb, + link = cflib.crtp.get_link_driver(uri, self._signal_health_cb, self._link_error_cb) self.assertIsNotNone(link) self.links.append(link) return link - def _link_quality_cb(self, percentage): + def _signal_health_cb(self, signal_health): pass def _link_error_cb(self, errmsg): From 4435c7c19f125cd2d9ea807d9e2a922201ba814f Mon Sep 17 00:00:00 2001 From: Kimberly McGuire Date: Thu, 3 Oct 2024 13:10:30 +0200 Subject: [PATCH 09/13] add congestion statistics --- cflib/crtp/radiodriver.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/cflib/crtp/radiodriver.py b/cflib/crtp/radiodriver.py index e37b8b37d..ba35a5a01 100644 --- a/cflib/crtp/radiodriver.py +++ b/cflib/crtp/radiodriver.py @@ -584,6 +584,12 @@ def run(self): break self._link.needs_resending = not self._has_safelink + previous_time_stamp = time.time() + amount_null_packets_up = 0 + amount_packets_up = 0 + amount_null_packets_down = 0 + amount_packets_down = 0 + while (True): if (self._sp): break @@ -620,10 +626,18 @@ def run(self): continue self._retry_before_disconnect = _nr_of_retries + ## Find null packets in the downlink and count them data = ackStatus.data + mask = 0b11110011 + empty_ack_packet = int(data[0]) & mask + + if empty_ack_packet == 0xF3: + amount_null_packets_down += 1 + amount_packets_down += 1 # If there is a copter in range, the packet is analysed and the # next packet to send is prepared + # TODO: THis seems not to work since there is always a byte filled in the data even with null packets if (len(data) > 0): inPacket = CRTPPacket(data[0], list(data[1:])) self._in_queue.put(inPacket) @@ -660,7 +674,25 @@ def run(self): else: dataOut.append(ord(X)) else: + # If no packet to send, send a null packet dataOut.append(0xFF) + amount_null_packets_up += 1 + amount_packets_up += 1 + + # Low level stats every second + if time.time() - previous_time_stamp > 1: + rate_up = amount_packets_up / (time.time() - previous_time_stamp) + rate_down = amount_packets_down / (time.time() - previous_time_stamp) + congestion_up = 1.0 - amount_null_packets_up / amount_packets_up + congestion_down = 1.0 - amount_null_packets_down / amount_packets_down + + amount_packets_up = 0 + amount_null_packets_up = 0 + amount_packets_down = 0 + amount_null_packets_down = 0 + previous_time_stamp = time.time() + + self._link_quality_low_level_callback(rate_up, rate_down, congestion_up, congestion_down) def set_retries_before_disconnect(nr_of_retries): From 73963e01b0e4a1a9337f28bfce00e4379c63397f Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Thu, 3 Oct 2024 15:57:34 +0200 Subject: [PATCH 10/13] Move bandwidth congestion and packet rate into signal health class Return a dictionary with all updated signal quality statistics in the callback, rather than the entire class. --- cflib/crazyflie/__init__.py | 20 +++++++-- cflib/crtp/radiodriver.py | 41 ++----------------- cflib/crtp/signal_health.py | 81 ++++++++++++++++++++++++++++++------- 3 files changed, 86 insertions(+), 56 deletions(-) diff --git a/cflib/crazyflie/__init__.py b/cflib/crazyflie/__init__.py index 988cc4958..d96028b7f 100644 --- a/cflib/crazyflie/__init__.py +++ b/cflib/crazyflie/__init__.py @@ -100,6 +100,10 @@ def __init__(self, link=None, ro_cache=None, rw_cache=None): # Called when the link driver updates the link quality measurement self.link_quality_updated = Caller() self.uplink_rssi_updated = Caller() + self.uplink_rate_updated = Caller() + self.downlink_rate_updated = Caller() + self.uplink_congestion_updated = Caller() + self.downlink_congestion_updated = Caller() self.state = State.DISCONNECTED @@ -207,10 +211,18 @@ def _link_error_cb(self, errmsg): def _signal_health_cb(self, signal_health): """Called from link driver to report signal health""" - if signal_health.link_quality: - self.link_quality_updated.call(signal_health.link_quality) - if signal_health.uplink_rssi: - self.uplink_rssi_updated.call(signal_health.uplink_rssi) + if 'link_quality' in signal_health: + self.link_quality_updated.call(signal_health['link_quality']) + if 'uplink_rssi' in signal_health: + self.uplink_rssi_updated.call(signal_health['uplink_rssi']) + if 'uplink_rate' in signal_health: + self.uplink_rate_updated.call(signal_health['uplink_rate']) + if 'downlink_rate' in signal_health: + self.downlink_rate_updated.call(signal_health['downlink_rate']) + if 'uplink_congestion' in signal_health: + self.uplink_congestion_updated.call(signal_health['uplink_congestion']) + if 'downlink_congestion' in signal_health: + self.downlink_congestion_updated.call(signal_health['downlink_congestion']) def _check_for_initial_packet_cb(self, data): """ diff --git a/cflib/crtp/radiodriver.py b/cflib/crtp/radiodriver.py index ba35a5a01..4ffd04ee3 100644 --- a/cflib/crtp/radiodriver.py +++ b/cflib/crtp/radiodriver.py @@ -528,8 +528,7 @@ def __init__(self, radio, inQueue, outQueue, self._out_queue = outQueue self._sp = False self._link_error_callback = link_error_callback - self._signal_health_callback = signal_health_callback - self._signal_health = SignalHealth() + self._signal_health = SignalHealth(signal_health_callback) self._retry_before_disconnect = _nr_of_retries self.rate_limit = rate_limit @@ -584,12 +583,6 @@ def run(self): break self._link.needs_resending = not self._has_safelink - previous_time_stamp = time.time() - amount_null_packets_up = 0 - amount_packets_up = 0 - amount_null_packets_down = 0 - amount_packets_down = 0 - while (True): if (self._sp): break @@ -611,10 +604,6 @@ def run(self): if ackStatus is None: logger.info('Dongle reported ACK status == None') continue - else: - self._signal_health.update(ackStatus) - if (self._signal_health_callback is not None): - self._signal_health_callback(self._signal_health) # If no copter, retry if ackStatus.ack is False: @@ -626,18 +615,11 @@ def run(self): continue self._retry_before_disconnect = _nr_of_retries - ## Find null packets in the downlink and count them data = ackStatus.data - mask = 0b11110011 - empty_ack_packet = int(data[0]) & mask - - if empty_ack_packet == 0xF3: - amount_null_packets_down += 1 - amount_packets_down += 1 # If there is a copter in range, the packet is analysed and the # next packet to send is prepared - # TODO: THis seems not to work since there is always a byte filled in the data even with null packets + # TODO: This does not seem to work since there is always a byte filled in the data even with null packets if (len(data) > 0): inPacket = CRTPPacket(data[0], list(data[1:])) self._in_queue.put(inPacket) @@ -676,23 +658,8 @@ def run(self): else: # If no packet to send, send a null packet dataOut.append(0xFF) - amount_null_packets_up += 1 - amount_packets_up += 1 - - # Low level stats every second - if time.time() - previous_time_stamp > 1: - rate_up = amount_packets_up / (time.time() - previous_time_stamp) - rate_down = amount_packets_down / (time.time() - previous_time_stamp) - congestion_up = 1.0 - amount_null_packets_up / amount_packets_up - congestion_down = 1.0 - amount_null_packets_down / amount_packets_down - - amount_packets_up = 0 - amount_null_packets_up = 0 - amount_packets_down = 0 - amount_null_packets_down = 0 - previous_time_stamp = time.time() - - self._link_quality_low_level_callback(rate_up, rate_down, congestion_up, congestion_down) + + self._signal_health.update(ackStatus, outPacket) def set_retries_before_disconnect(nr_of_retries): diff --git a/cflib/crtp/signal_health.py b/cflib/crtp/signal_health.py index 057034101..d44354e08 100644 --- a/cflib/crtp/signal_health.py +++ b/cflib/crtp/signal_health.py @@ -10,39 +10,46 @@ class SignalHealth: using exponential moving averages. """ - def __init__(self, alpha=0.1): + def __init__(self, signal_health_callback, alpha=0.1): """ Initialize the SignalHealth class. :param alpha: Weight for the exponential moving average (default 0.1) """ - self.alpha = alpha - self.link_quality = 0 - self.uplink_rssi = 0 + self._alpha = alpha + self._signal_health_callback = signal_health_callback + self._retries = collections.deque() self._retry_sum = 0 - def update(self, ack): + def update(self, ack, packet_out): """ Update the signal health based on the acknowledgment data. :param ack: Acknowledgment object containing retry and RSSI data. """ + self.signal_health = {} + self._update_link_quality(ack) self._update_rssi(ack) + self._update_rate_and_congestion(ack, packet_out) + + if self.signal_health: + self._signal_health_callback(self.signal_health) def _update_link_quality(self, ack): """ - Updates the link quality based on the acknowledgment data. + Updates the link quality based on the number of retries. :param ack: Acknowledgment object with retry data. """ - retry = 10 - ack.retry - self._retries.append(retry) - self._retry_sum += retry - if len(self._retries) > 100: - self._retry_sum -= self._retries.popleft() - self.link_quality = float(self._retry_sum) / len(self._retries) * 10 + if ack: + retry = 10 - ack.retry + self._retries.append(retry) + self._retry_sum += retry + if len(self._retries) > 100: + self._retry_sum -= self._retries.popleft() + self.signal_health['link_quality'] = float(self._retry_sum) / len(self._retries) * 10 def _update_rssi(self, ack): """ @@ -66,6 +73,50 @@ def _update_rssi(self, ack): time_diffs = np.diff(self._rssi_timestamps, prepend=time.time()) weights = np.exp(-time_diffs) weighted_average = np.sum(weights * self._rssi_values) / np.sum(weights) - self.uplink_rssi = weighted_average - else: - self.uplink_rssi = instantaneous_rssi # Return the raw value if not enough data + self.signal_health['uplink_rssi'] = weighted_average + + def _update_rate_and_congestion(self, ack, packet_out): + """ + Updates the packet rate and bandwidth congestion based on the acknowledgment data. + + :param ack: Acknowledgment object with congestion data. + """ + if not hasattr(self, '_previous_time_stamp'): + self._previous_time_stamp = time.time() + if not hasattr(self, '_amount_null_packets_up'): + self._amount_null_packets_up = 0 + if not hasattr(self, '_amount_packets_up'): + self._amount_packets_up = 0 + if not hasattr(self, '_amount_null_packets_down'): + self._amount_null_packets_down = 0 + if not hasattr(self, '_amount_packets_down'): + self._amount_packets_down = 0 + + self._amount_packets_up += 1 # everytime this function is called, a packet is sent + if not packet_out: # if the packet is empty, we send a null packet + self._amount_null_packets_up += 1 + + # Find null packets in the downlink and count them + mask = 0b11110011 + if ack.data: + empty_ack_packet = int(ack.data[0]) & mask + + if empty_ack_packet == 0xF3: + self._amount_null_packets_down += 1 + self._amount_packets_down += 1 + + # rate and congestion stats every N seconds + if time.time() - self._previous_time_stamp > 0.1: + # self._uplink_rate = self._amount_packets_up / (time.time() - self._previous_time_stamp) + self.signal_health['uplink_rate'] = self._amount_packets_up / (time.time() - self._previous_time_stamp) + self.signal_health['downlink_rate'] = self._amount_packets_down / \ + (time.time() - self._previous_time_stamp) + self.signal_health['uplink_congestion'] = 1.0 - self._amount_null_packets_up / self._amount_packets_up + self.signal_health['downlink_congestion'] = 1.0 - \ + self._amount_null_packets_down / self._amount_packets_down + + self._amount_packets_up = 0 + self._amount_null_packets_up = 0 + self._amount_packets_down = 0 + self._amount_null_packets_down = 0 + self._previous_time_stamp = time.time() From c0186bab1f05f522d245c3ef5f544f5cbd87cc71 Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Fri, 4 Oct 2024 12:30:22 +0200 Subject: [PATCH 11/13] Only try to callback signal health when set --- cflib/crtp/signal_health.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cflib/crtp/signal_health.py b/cflib/crtp/signal_health.py index d44354e08..ed66a6f41 100644 --- a/cflib/crtp/signal_health.py +++ b/cflib/crtp/signal_health.py @@ -34,7 +34,7 @@ def update(self, ack, packet_out): self._update_rssi(ack) self._update_rate_and_congestion(ack, packet_out) - if self.signal_health: + if self.signal_health and self._signal_health_callback: self._signal_health_callback(self.signal_health) def _update_link_quality(self, ack): From 81bfd4a65186a845fbbf449879ecaa1cca00584d Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Tue, 22 Oct 2024 16:22:04 +0200 Subject: [PATCH 12/13] Refactor SignalHealth to RadioLinkStatistics for clarity Renamed SignalHealth class to RadioLinkStatistics to better reflect its responsibility of handling and processing radio-specific metrics. --- cflib/crazyflie/__init__.py | 30 ++++++++-------- cflib/crtp/__init__.py | 4 +-- cflib/crtp/cflinkcppdriver.py | 20 +++++------ cflib/crtp/crtpdriver.py | 4 +-- ...nal_health.py => radio_link_statistics.py} | 34 ++++++++++--------- cflib/crtp/radiodriver.py | 22 ++++++------ cflib/crtp/usbdriver.py | 17 +++++----- sys_test/swarm_test_rig/test_response_time.py | 4 +-- 8 files changed, 68 insertions(+), 67 deletions(-) rename cflib/crtp/{signal_health.py => radio_link_statistics.py} (75%) diff --git a/cflib/crazyflie/__init__.py b/cflib/crazyflie/__init__.py index 1472f3002..d8c4fa919 100644 --- a/cflib/crazyflie/__init__.py +++ b/cflib/crazyflie/__init__.py @@ -220,20 +220,20 @@ def _link_error_cb(self, errmsg): self.disconnected_link_error.call(self.link_uri, errmsg) self.state = State.DISCONNECTED - def _signal_health_cb(self, signal_health): - """Called from link driver to report signal health""" - if 'link_quality' in signal_health: - self.link_quality_updated.call(signal_health['link_quality']) - if 'uplink_rssi' in signal_health: - self.uplink_rssi_updated.call(signal_health['uplink_rssi']) - if 'uplink_rate' in signal_health: - self.uplink_rate_updated.call(signal_health['uplink_rate']) - if 'downlink_rate' in signal_health: - self.downlink_rate_updated.call(signal_health['downlink_rate']) - if 'uplink_congestion' in signal_health: - self.uplink_congestion_updated.call(signal_health['uplink_congestion']) - if 'downlink_congestion' in signal_health: - self.downlink_congestion_updated.call(signal_health['downlink_congestion']) + def _radio_link_statistics_cb(self, radio_link_statistics): + """Called from link driver to report radio link statistics""" + if 'link_quality' in radio_link_statistics: + self.link_quality_updated.call(radio_link_statistics['link_quality']) + if 'uplink_rssi' in radio_link_statistics: + self.uplink_rssi_updated.call(radio_link_statistics['uplink_rssi']) + if 'uplink_rate' in radio_link_statistics: + self.uplink_rate_updated.call(radio_link_statistics['uplink_rate']) + if 'downlink_rate' in radio_link_statistics: + self.downlink_rate_updated.call(radio_link_statistics['downlink_rate']) + if 'uplink_congestion' in radio_link_statistics: + self.uplink_congestion_updated.call(radio_link_statistics['uplink_congestion']) + if 'downlink_congestion' in radio_link_statistics: + self.downlink_congestion_updated.call(radio_link_statistics['downlink_congestion']) def _check_for_initial_packet_cb(self, data): """ @@ -256,7 +256,7 @@ def open_link(self, link_uri): self.link_uri = link_uri try: self.link = cflib.crtp.get_link_driver( - link_uri, self._signal_health_cb, self._link_error_cb) + link_uri, self._radio_link_statistics_cb, self._link_error_cb) if not self.link: message = 'No driver found or malformed URI: {}' \ diff --git a/cflib/crtp/__init__.py b/cflib/crtp/__init__.py index ddb950b92..3d410554b 100644 --- a/cflib/crtp/__init__.py +++ b/cflib/crtp/__init__.py @@ -89,14 +89,14 @@ def get_interfaces_status(): return status -def get_link_driver(uri, signal_health_callback=None, link_error_callback=None): +def get_link_driver(uri, radio_link_statistics_callback=None, link_error_callback=None): """Return the link driver for the given URI. Returns None if no driver was found for the URI or the URI was not well formatted for the matching driver.""" for driverClass in CLASSES: try: instance = driverClass() - instance.connect(uri, signal_health_callback, link_error_callback) + instance.connect(uri, radio_link_statistics_callback, link_error_callback) return instance except WrongUriType: continue diff --git a/cflib/crtp/cflinkcppdriver.py b/cflib/crtp/cflinkcppdriver.py index fff1ebfc3..9d760a7a7 100644 --- a/cflib/crtp/cflinkcppdriver.py +++ b/cflib/crtp/cflinkcppdriver.py @@ -34,7 +34,7 @@ from .crtpstack import CRTPPacket from cflib.crtp.crtpdriver import CRTPDriver -from cflib.crtp.signal_health import SignalHealth +from cflib.crtp.radio_link_statistics import RadioLinkStatistics __author__ = 'Bitcraze AB' __all__ = ['CfLinkCppDriver'] @@ -55,23 +55,23 @@ def __init__(self): self.needs_resending = False self._connection = None - self._signal_health = SignalHealth() + self._radio_link_statistics = RadioLinkStatistics() - def connect(self, uri, signal_health_callback, link_error_callback): + def connect(self, uri, radio_link_statistics_callback, link_error_callback): """Connect the driver to a specified URI @param uri Uri of the link to open - @param signal_health_callback Callback to report signal health + @param radio_link_statistics_callback Callback to report radio link statistics @param link_error_callback Callback to report errors (will result in disconnection) """ self._connection = cflinkcpp.Connection(uri) self.uri = uri - self._signal_health_callback = signal_health_callback + self._radio_link_statistics_callback = radio_link_statistics_callback self._link_error_callback = link_error_callback - if uri.startswith('radio://') and signal_health_callback is not None: + if uri.startswith('radio://') and radio_link_statistics_callback is not None: self._last_connection_stats = self._connection.statistics self._recompute_link_quality_timer() @@ -183,13 +183,13 @@ def _recompute_link_quality_timer(self): sent_count = stats.sent_count - self._last_connection_stats.sent_count ack_count = stats.ack_count - self._last_connection_stats.ack_count if sent_count > 0: - self._signal_health.link_quality = min(ack_count, sent_count) / sent_count * 100.0 + self._radio_link_statistics.link_quality = min(ack_count, sent_count) / sent_count * 100.0 else: - self._signal_health.link_quality = 1 + self._radio_link_statistics.link_quality = 1 self._last_connection_stats = stats - if self._signal_health_callback is not None: - self._signal_health_callback(self._signal_health) + if self._radio_link_statistics_callback is not None: + self._radio_link_statistics_callback(self._radio_link_statistics) if sent_count > 10 and ack_count == 0 and self._link_error_callback is not None: self._link_error_callback('Too many packets lost') diff --git a/cflib/crtp/crtpdriver.py b/cflib/crtp/crtpdriver.py index d38247731..598b29e3d 100644 --- a/cflib/crtp/crtpdriver.py +++ b/cflib/crtp/crtpdriver.py @@ -42,11 +42,11 @@ def __init__(self): """ self.needs_resending = True - def connect(self, uri, signal_health_callback, link_error_callback): + def connect(self, uri, radio_link_statistics_callback, link_error_callback): """Connect the driver to a specified URI @param uri Uri of the link to open - @param signal_health_callback Callback to report signal health + @param radio_link_statistics_callback Callback to report radio link statistics @param link_error_callback Callback to report errors (will result in disconnection) """ diff --git a/cflib/crtp/signal_health.py b/cflib/crtp/radio_link_statistics.py similarity index 75% rename from cflib/crtp/signal_health.py rename to cflib/crtp/radio_link_statistics.py index ed66a6f41..d2a68e0f6 100644 --- a/cflib/crtp/signal_health.py +++ b/cflib/crtp/radio_link_statistics.py @@ -4,38 +4,38 @@ import numpy as np -class SignalHealth: +class RadioLinkStatistics: """ - Tracks the health of the signal by monitoring link quality and uplink RSSI - using exponential moving averages. + Tracks the health of the signal by monitoring link quality, uplink RSSI, + packet rates, and congestion. """ - def __init__(self, signal_health_callback, alpha=0.1): + def __init__(self, radio_link_statistics_callback, alpha=0.1): """ - Initialize the SignalHealth class. + Initialize the RadioLinkStatistics class. :param alpha: Weight for the exponential moving average (default 0.1) """ self._alpha = alpha - self._signal_health_callback = signal_health_callback + self._radio_link_statistics_callback = radio_link_statistics_callback self._retries = collections.deque() self._retry_sum = 0 def update(self, ack, packet_out): """ - Update the signal health based on the acknowledgment data. + Update the radio link statistics based on the acknowledgment data. :param ack: Acknowledgment object containing retry and RSSI data. """ - self.signal_health = {} + self.radio_link_statistics = {} self._update_link_quality(ack) self._update_rssi(ack) self._update_rate_and_congestion(ack, packet_out) - if self.signal_health and self._signal_health_callback: - self._signal_health_callback(self.signal_health) + if self.radio_link_statistics and self._radio_link_statistics_callback: + self._radio_link_statistics_callback(self.radio_link_statistics) def _update_link_quality(self, ack): """ @@ -49,7 +49,7 @@ def _update_link_quality(self, ack): self._retry_sum += retry if len(self._retries) > 100: self._retry_sum -= self._retries.popleft() - self.signal_health['link_quality'] = float(self._retry_sum) / len(self._retries) * 10 + self.radio_link_statistics['link_quality'] = float(self._retry_sum) / len(self._retries) * 10 def _update_rssi(self, ack): """ @@ -73,7 +73,7 @@ def _update_rssi(self, ack): time_diffs = np.diff(self._rssi_timestamps, prepend=time.time()) weights = np.exp(-time_diffs) weighted_average = np.sum(weights * self._rssi_values) / np.sum(weights) - self.signal_health['uplink_rssi'] = weighted_average + self.radio_link_statistics['uplink_rssi'] = weighted_average def _update_rate_and_congestion(self, ack, packet_out): """ @@ -108,11 +108,13 @@ def _update_rate_and_congestion(self, ack, packet_out): # rate and congestion stats every N seconds if time.time() - self._previous_time_stamp > 0.1: # self._uplink_rate = self._amount_packets_up / (time.time() - self._previous_time_stamp) - self.signal_health['uplink_rate'] = self._amount_packets_up / (time.time() - self._previous_time_stamp) - self.signal_health['downlink_rate'] = self._amount_packets_down / \ + self.radio_link_statistics['uplink_rate'] = self._amount_packets_up / \ (time.time() - self._previous_time_stamp) - self.signal_health['uplink_congestion'] = 1.0 - self._amount_null_packets_up / self._amount_packets_up - self.signal_health['downlink_congestion'] = 1.0 - \ + self.radio_link_statistics['downlink_rate'] = self._amount_packets_down / \ + (time.time() - self._previous_time_stamp) + self.radio_link_statistics['uplink_congestion'] = 1.0 - \ + self._amount_null_packets_up / self._amount_packets_up + self.radio_link_statistics['downlink_congestion'] = 1.0 - \ self._amount_null_packets_down / self._amount_packets_down self._amount_packets_up = 0 diff --git a/cflib/crtp/radiodriver.py b/cflib/crtp/radiodriver.py index 4ffd04ee3..a4d82ee19 100644 --- a/cflib/crtp/radiodriver.py +++ b/cflib/crtp/radiodriver.py @@ -52,7 +52,7 @@ from .crtpstack import CRTPPacket from .exceptions import WrongUriType from cflib.crtp.crtpdriver import CRTPDriver -from cflib.crtp.signal_health import SignalHealth +from cflib.crtp.radio_link_statistics import RadioLinkStatistics from cflib.drivers.crazyradio import Crazyradio @@ -241,19 +241,19 @@ def __init__(self): self._radio = None self.uri = '' self.link_error_callback = None - self.signal_health_callback = None + self.radio_link_statistics_callback = None self.in_queue = None self.out_queue = None self._thread = None self.needs_resending = True - def connect(self, uri, signal_health_callback, link_error_callback): + def connect(self, uri, radio_link_statistics_callback, link_error_callback): """ Connect the link driver to a specified URI of the format: radio:////[250K,1M,2M] - The callback for signal health can be called at any moment from the - driver to report back the signal health. The callback from linkError + The callback for radio link statistics can be called at any moment from the + driver to report back the radio link statistics. The callback from linkError will be called when a error occurs with an error message. """ @@ -283,7 +283,7 @@ def connect(self, uri, signal_health_callback, link_error_callback): self._thread = _RadioDriverThread(self._radio, self.in_queue, self.out_queue, - signal_health_callback, + radio_link_statistics_callback, link_error_callback, self, rate_limit) @@ -381,7 +381,7 @@ def restart(self): self._thread = _RadioDriverThread(self._radio, self.in_queue, self.out_queue, - self.signal_health_callback, + self.radio_link_statistics_callback, self.link_error_callback, self) self._thread.start() @@ -401,7 +401,7 @@ def close(self): # Clear callbacks self.link_error_callback = None - self.signal_health_callback = None + self.radio_link_statistics_callback = None def _scan_radio_channels(self, radio: _SharedRadioInstance, start=0, stop=125): @@ -520,7 +520,7 @@ class _RadioDriverThread(threading.Thread): Crazyradio USB driver. """ def __init__(self, radio, inQueue, outQueue, - signal_health_callback, link_error_callback, link, rate_limit: Optional[int]): + radio_link_statistics_callback, link_error_callback, link, rate_limit: Optional[int]): """ Create the object """ threading.Thread.__init__(self) self._radio = radio @@ -528,7 +528,7 @@ def __init__(self, radio, inQueue, outQueue, self._out_queue = outQueue self._sp = False self._link_error_callback = link_error_callback - self._signal_health = SignalHealth(signal_health_callback) + self._radio_link_statistics = RadioLinkStatistics(radio_link_statistics_callback) self._retry_before_disconnect = _nr_of_retries self.rate_limit = rate_limit @@ -659,7 +659,7 @@ def run(self): # If no packet to send, send a null packet dataOut.append(0xFF) - self._signal_health.update(ackStatus, outPacket) + self._radio_link_statistics.update(ackStatus, outPacket) def set_retries_before_disconnect(nr_of_retries): diff --git a/cflib/crtp/usbdriver.py b/cflib/crtp/usbdriver.py index eb18fed27..349e58d5c 100644 --- a/cflib/crtp/usbdriver.py +++ b/cflib/crtp/usbdriver.py @@ -52,20 +52,19 @@ def __init__(self): self.cfusb = None self.uri = '' self.link_error_callback = None - self.signal_health_callback = None + self.radio_link_statistics_callback = None self.in_queue = None self.out_queue = None self._thread = None self.needs_resending = False - def connect(self, uri, signal_health_callback, link_error_callback): + def connect(self, uri, radio_link_statistics_callback, link_error_callback): """ Connect the link driver to a specified URI of the format: radio:////[250K,1M,2M] - The callback for signal health can be called at any moment from - the driver to report back the signal health. The callback from - linkError will be called when a error occurs with an error message. + The callback for radio link statistics should not be called from the usb driver + The callback from linkError will be called when a error occurs with an error message. """ # check if the URI is a radio URI @@ -99,7 +98,7 @@ def connect(self, uri, signal_health_callback, link_error_callback): # Launch the comm thread self._thread = _UsbReceiveThread(self.cfusb, self.in_queue, - signal_health_callback, + radio_link_statistics_callback, link_error_callback) self._thread.start() @@ -151,7 +150,7 @@ def restart(self): return self._thread = _UsbReceiveThread(self.cfusb, self.in_queue, - self.signal_health_callback, + self.radio_link_statistics_callback, self.link_error_callback) self._thread.start() @@ -207,7 +206,7 @@ class _UsbReceiveThread(threading.Thread): Radio link receiver thread used to read data from the Crazyradio USB driver. """ - def __init__(self, cfusb, inQueue, signal_health_callback, + def __init__(self, cfusb, inQueue, radio_link_statistics_callback, link_error_callback): """ Create the object """ threading.Thread.__init__(self) @@ -215,7 +214,7 @@ def __init__(self, cfusb, inQueue, signal_health_callback, self.in_queue = inQueue self.sp = False self.link_error_callback = link_error_callback - self.signal_health_callback = signal_health_callback + self.radio_link_statistics_callback = radio_link_statistics_callback def stop(self): """ Stop the thread """ diff --git a/sys_test/swarm_test_rig/test_response_time.py b/sys_test/swarm_test_rig/test_response_time.py index 582a9519f..40158239c 100644 --- a/sys_test/swarm_test_rig/test_response_time.py +++ b/sys_test/swarm_test_rig/test_response_time.py @@ -128,14 +128,14 @@ def _is_response_correct_seq_nr(self, response, seq_nr): return False def connect_link(self, uri): - link = cflib.crtp.get_link_driver(uri, self._signal_health_cb, + link = cflib.crtp.get_link_driver(uri, self._radio_link_statistics_cb, self._link_error_cb) self.assertIsNotNone(link) self.links.append(link) return link - def _signal_health_cb(self, signal_health): + def _radio_link_statistics_cb(self, radoi_link_statistics): pass def _link_error_cb(self, errmsg): From 31ff8e44ac5a45f27426547fa2c1c869b644f007 Mon Sep 17 00:00:00 2001 From: Rik Bouwmeester Date: Tue, 12 Nov 2024 12:51:08 +0100 Subject: [PATCH 13/13] Rename latency update callback, reduce ping interval to 0.1 seconds --- cflib/crazyflie/link_statistics.py | 8 ++++---- examples/link_quality/latency.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cflib/crazyflie/link_statistics.py b/cflib/crazyflie/link_statistics.py index 1c536fdac..af23f3b56 100644 --- a/cflib/crazyflie/link_statistics.py +++ b/cflib/crazyflie/link_statistics.py @@ -97,7 +97,7 @@ def __init__(self, crazyflie): self._stop_event = Event() self._ping_thread_instance = None self.latency = 0 - self.latencyUpdated = Caller() + self.latency_updated = Caller() def start(self): """ @@ -123,7 +123,7 @@ def stop(self): self._ping_thread_instance.join() self._ping_thread_instance = None - def _ping_thread(self, interval: float = 1.0) -> None: + def _ping_thread(self, interval: float = 0.1) -> None: """ Background thread method that sends a ping to the Crazyflie at regular intervals. @@ -131,7 +131,7 @@ def _ping_thread(self, interval: float = 1.0) -> None: until the stop event is set. Args: - interval (float): The time (in seconds) to wait between ping requests. Default is 1 second. + interval (float): The time (in seconds) to wait between ping requests. Default is 0.1 seconds. """ while not self._stop_event.is_set(): self.ping() @@ -169,7 +169,7 @@ def _ping_response(self, packet): if received_header != PING_HEADER: return self.latency = self._calculate_p95_latency(received_timestamp) - self.latencyUpdated.call(self.latency) + self.latency_updated.call(self.latency) def _calculate_p95_latency(self, timestamp): """ diff --git a/examples/link_quality/latency.py b/examples/link_quality/latency.py index b73c92b72..21faf8aa0 100644 --- a/examples/link_quality/latency.py +++ b/examples/link_quality/latency.py @@ -45,7 +45,7 @@ def latency_callback(latency: float): # Add a callback to whenever we receive an updated latency estimate # # This could also be a Python lambda, something like: - cf.link_statistics.latency.latencyUpdated.add_callback(latency_callback) + cf.link_statistics.latency.latency_updated.add_callback(latency_callback) # This will connect the Crazyflie with the URI specified above. with SyncCrazyflie(uri, cf=cf) as scf: