From 65098d86d3e0f97b1a7dd30fefdaf1b7f795a6f3 Mon Sep 17 00:00:00 2001 From: Bjoern Kerler Date: Mon, 7 Oct 2024 00:55:53 +0200 Subject: [PATCH] Add zmq server and json support --- README.md | 3 + python_cli/advertiser.py | 5 +- python_cli/initiator.py | 3 +- python_cli/requirements.txt | 4 + python_cli/reset.py | 3 +- python_cli/scanner.py | 36 ++-- python_cli/sniff_receiver.py | 53 ++++- python_cli/sniffle/packet_decoder.py | 288 +++++++++++++++++++++------ python_cli/sniffle/sniffle_hw.py | 22 +- python_cli/sniffle/sniffle_sdr.py | 12 +- python_cli/sniffle_extcap.py | 3 +- python_cli/uart_test.py | 3 +- python_cli/version_check.py | 3 +- 13 files changed, 338 insertions(+), 100 deletions(-) create mode 100644 python_cli/requirements.txt diff --git a/README.md b/README.md index 632455b..871a86c 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Sniffle has a number of useful features, including: * Easy to extend host-side software written in Python * PCAP export compatible with the Ubertooth * Wireshark compatible plugin +* ZMQ Publishing server ## Prerequisites @@ -244,6 +245,8 @@ options: -d, --decode Decode advertising data -o OUTPUT, --output OUTPUT PCAP output file name + -z, --zmq Enable ZMQ server + --zmqsetting Set ZMQ server ip and port (default:127.0.0.1:4222) ``` The XDS110 debugger on the Launchpad boards creates two serial ports. On diff --git a/python_cli/advertiser.py b/python_cli/advertiser.py index 31622df..f44def5 100755 --- a/python_cli/advertiser.py +++ b/python_cli/advertiser.py @@ -11,13 +11,15 @@ # global variable to access hardware hw = None + def main(): aparse = argparse.ArgumentParser(description="Connection initiator test script for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") args = aparse.parse_args() global hw - hw = SniffleHW(args.serport) + hw = SniffleHW(args.serport, baudrate=args.baudrate) # set the advertising channel (and return to ad-sniffing mode) hw.cmd_chan_aa_phy(37, BLE_ADV_AA, 0) @@ -69,5 +71,6 @@ def main(): if msg is not None: print(msg, end='\n\n') + if __name__ == "__main__": main() diff --git a/python_cli/initiator.py b/python_cli/initiator.py index a49f258..95fb862 100755 --- a/python_cli/initiator.py +++ b/python_cli/initiator.py @@ -18,6 +18,7 @@ def main(): aparse = argparse.ArgumentParser(description="Connection initiator test script for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") aparse.add_argument("-c", "--advchan", default=37, choices=[37, 38, 39], type=int, help="Advertising channel to listen on") aparse.add_argument("-m", "--mac", default=None, help="Specify target MAC address") @@ -31,7 +32,7 @@ def main(): args = aparse.parse_args() global hw - hw = SniffleHW(args.serport) + hw = SniffleHW(args.serport, baudrate=args.baudrate) targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string) if targ_specs < 1: diff --git a/python_cli/requirements.txt b/python_cli/requirements.txt new file mode 100644 index 0000000..2d34a82 --- /dev/null +++ b/python_cli/requirements.txt @@ -0,0 +1,4 @@ +pyserial +zmq +numpy +scipy \ No newline at end of file diff --git a/python_cli/reset.py b/python_cli/reset.py index 329b604..8f78c88 100755 --- a/python_cli/reset.py +++ b/python_cli/reset.py @@ -11,9 +11,10 @@ def main(): aparse = argparse.ArgumentParser(description="Firmware reset utility for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") args = aparse.parse_args() - hw = SniffleHW(args.serport) + hw = SniffleHW(args.serport, baudrate=args.baudrate) # 5 resets seems to work more reliably than fewer print("Sending reset commands...") diff --git a/python_cli/scanner.py b/python_cli/scanner.py index 6604fb9..d242dff 100755 --- a/python_cli/scanner.py +++ b/python_cli/scanner.py @@ -18,11 +18,13 @@ advertisers = {} done_scan = False + def sigint_handler(sig, frame): global done_scan done_scan = True hw.cancel_recv() + class Advertiser: def __init__(self): self.adv = None @@ -42,32 +44,34 @@ def add_hit(self, rssi): self.rssi_min = rssi elif rssi > self.rssi_max: self.rssi_max = rssi - self.rssi_avg = (self.rssi_avg*self.hits + rssi) / (self.hits + 1) + self.rssi_avg = (self.rssi_avg * self.hits + rssi) / (self.hits + 1) self.hits += 1 + def main(): aparse = argparse.ArgumentParser(description="Scanner utility for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") aparse.add_argument("-c", "--advchan", default=37, choices=[37, 38, 39], type=int, - help="Advertising channel to listen on") + help="Advertising channel to listen on") aparse.add_argument("-r", "--rssi", default=-128, type=int, - help="Filter packets by minimum RSSI") + help="Filter packets by minimum RSSI") aparse.add_argument("-l", "--longrange", action="store_true", - help="Use long range (coded) PHY for primary advertising") + help="Use long range (coded) PHY for primary advertising") aparse.add_argument("-d", "--decode", action="store_true", - help="Decode advertising data") + help="Decode advertising data") aparse.add_argument("-o", "--output", default=None, help="PCAP output file name") args = aparse.parse_args() global hw - hw = make_sniffle_hw(args.serport) + hw = make_sniffle_hw(serport=args.serport, baudrate=args.baudrate) hw.setup_sniffer( - mode=SnifferMode.ACTIVE_SCAN, - chan=args.advchan, - ext_adv=True, - coded_phy=args.longrange, - rssi_min=args.rssi) + mode=SnifferMode.ACTIVE_SCAN, + chan=args.advchan, + ext_adv=True, + coded_phy=args.longrange, + rssi_min=args.rssi) # zero timestamps and flush old packets hw.mark_and_flush() @@ -96,10 +100,10 @@ def main(): print("\n\nScan Results:") for a in sorted(advertisers.keys(), key=lambda k: advertisers[k].rssi_avg, reverse=True): - print("="*80) + print("=" * 80) print("AdvA: %s Avg/Min/Max RSSI: %.1f/%i/%i Hits: %i" % ( - a, advertisers[a].rssi_avg, advertisers[a].rssi_min, advertisers[a].rssi_max, - advertisers[a].hits)) + a, advertisers[a].rssi_avg, advertisers[a].rssi_min, advertisers[a].rssi_max, + advertisers[a].hits)) if advertisers[a].adv: print("\nAdvertisement:") print(advertisers[a].adv.str_header()) @@ -120,7 +124,8 @@ def main(): print(advertisers[a].scan_rsp.hexdump()) else: print("\nScan Response: None") - print("="*80, end="\n\n") + print("=" * 80, end="\n\n") + def handle_packet(dpkt): # Ignore non-advertisements (shouldn't get any) @@ -151,5 +156,6 @@ def handle_packet(dpkt): else: advertisers[adva].adv = dpkt + if __name__ == "__main__": main() diff --git a/python_cli/sniff_receiver.py b/python_cli/sniff_receiver.py index 0aeed42..6f73e71 100755 --- a/python_cli/sniff_receiver.py +++ b/python_cli/sniff_receiver.py @@ -1,17 +1,19 @@ #!/usr/bin/env python3 # Written by Sultan Qasim Khan +# OpenDroneID mods (c) by B. Kerler # Copyright (c) 2018-2024, NCC Group plc # Released as open source under GPLv3 import argparse, sys +import json +import time from binascii import unhexlify -from sniffle.constants import BLE_ADV_AA from sniffle.pcap import PcapBleWriter from sniffle.sniffle_hw import (make_sniffle_hw, PacketMessage, DebugMessage, StateMessage, MeasurementMessage, SnifferMode, PhyMode) from sniffle.packet_decoder import (AdvaMessage, AdvDirectIndMessage, AdvExtIndMessage, - ScanRspMessage, DataMessage, str_mac) + ScanRspMessage, DataMessage, str_mac, AdvIndMessage) from sniffle.errors import UsageError, SourceDone from sniffle.advdata.decoder import decode_adv_data @@ -24,6 +26,7 @@ def main(): aparse = argparse.ArgumentParser(description="Host-side receiver for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") aparse.add_argument("-c", "--advchan", default=40, choices=[37, 38, 39], type=int, help="Advertising channel to listen on") aparse.add_argument("-p", "--pause", action="store_true", @@ -55,8 +58,41 @@ def main(): aparse.add_argument("-d", "--decode", action="store_true", help="Decode advertising data") aparse.add_argument("-o", "--output", default=None, help="PCAP output file name") + aparse.add_argument("-z", "--zmq", action="store_true", help="Enable zmq") + aparse.add_argument("--zmqsetting", default="127.0.0.1:4222", help="Define zmq server settings") + aparse.add_argument("-v", "--verbose", action="store_true", help="Print messages") args = aparse.parse_args() + if args.zmq: + import zmq + + url = f"tcp://{args.zmqsetting}" + + context = zmq.Context() + socket = context.socket(zmq.XPUB) + socket.setsockopt(zmq.XPUB_VERBOSE, True) + socket.bind(url) + + def zmq_thread(socket): + try: + while True: + event = socket.recv() + # Event is one byte 0=unsub or 1=sub, followed by topic + if event[0] == 1: + log("new subscriber for", event[1:]) + elif event[0] == 0: + log("unsubscribed", event[1:]) + except zmq.error.ContextTerminated: + pass + + def log(*msg): + s = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print("%s:" % s, *msg, end="\n", file=sys.stderr) + + from threading import Thread + zthread = Thread(target=zmq_thread, args=[socket], daemon=True, name='zmq') + zthread.start() + # Sanity check argument combinations targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string) if args.hop and targ_specs < 1: @@ -70,7 +106,7 @@ def main(): raise UsageError("Don't specify an advertising channel if you want advertising channel hopping!") global hw - hw = make_sniffle_hw(args.serport) + hw = make_sniffle_hw(serport=args.serport, baudrate=args.baudrate) # if a channel was explicitly specified, don't hop hop3 = True if targ_specs else False @@ -137,10 +173,19 @@ def main(): while True: try: msg = hw.recv_and_decode() - print_message(msg, args.quiet, args.decode) + if args.zmq: + smsg = msg.to_dict() + smsg = json.dumps(smsg) + socket.send_string(smsg) + if args.verbose: + print_message(msg, args.quiet, args.decode) + else: + print_message(msg, args.quiet, args.decode) except SourceDone: break except KeyboardInterrupt: + if args.zmq: + socket.close() hw.cancel_recv() sys.stderr.write("\r") break diff --git a/python_cli/sniffle/packet_decoder.py b/python_cli/sniffle/packet_decoder.py index dc26673..d4ea75c 100644 --- a/python_cli/sniffle/packet_decoder.py +++ b/python_cli/sniffle/packet_decoder.py @@ -15,9 +15,11 @@ from .errors import SniffleHWPacketError from .hexdump import hexdump + def str_mac(mac): return ":".join(["%02X" % b for b in reversed(mac)]) + def _str_atype(addr, is_random): # Non-resolvable private address # Resolvable private address @@ -28,12 +30,15 @@ def _str_atype(addr, is_random): atype = addr[5] >> 6 return atypes[atype] + def str_mac2(mac, is_random): return "%s (%s)" % (str_mac(mac), _str_atype(mac, is_random)) + # radio time wraparound period in seconds TS_WRAP_PERIOD = 0x100000000 / 4E6 + class PacketMessage: def __init__(self, raw_msg, dstate: SniffleDecoderState, crc_rev=None): ts, l, event, rssi, chan = unpack("= 37: type_classes = [ - AdvIndMessage, # 0 - AdvDirectIndMessage, # 1 - AdvNonconnIndMessage, # 2 - ScanReqMessage, # 3 - ScanRspMessage, # 4 - ConnectIndMessage, # 5 - AdvScanIndMessage, # 6 - AdvExtIndMessage] # 7 + AdvIndMessage, # 0 + AdvDirectIndMessage, # 1 + AdvNonconnIndMessage, # 2 + ScanReqMessage, # 3 + ScanRspMessage, # 4 + ConnectIndMessage, # 5 + AdvScanIndMessage, # 6 + AdvExtIndMessage] # 7 if pdu_type < len(type_classes): tc = type_classes[pdu_type] else: @@ -228,6 +249,7 @@ def decode(pkt: PacketMessage, dstate=None): return tc(pkt) + class DataMessage(DPacketMessage): def __init__(self, pkt: PacketMessage): super().__init__(pkt) @@ -245,28 +267,72 @@ def str_datatype(self): dtstr += "Data Length: %i" % self.data_length return dtstr + def dict_datatype(self): + return {"LLID": self.pdutype, + "Dir": ("S->M" if self.data_dir else "M->S"), + "NESN": self.NESN, + "SN": self.SN, + "MD": self.MD, + "Data Length": self.data_length} + def str_header(self): return super().str_header() + " Event: %d" % self.event def _str_decode(self): return self.str_datatype() + def to_dict(self): + return self.dict_datatype() + @staticmethod def decode(pkt: PacketMessage, dstate=None): LLID = pkt.body[0] & 0x3 type_classes = [ - DataMessage, # 0 (RFU) - LlDataContMessage, # 1 - LlDataMessage, # 2 - LlControlMessage] # 3 + DataMessage, # 0 (RFU) + LlDataContMessage, # 1 + LlDataMessage, # 2 + LlControlMessage] # 3 return type_classes[LLID](pkt) + class LlDataMessage(DataMessage): pdutype = "LL DATA" + class LlDataContMessage(DataMessage): pdutype = "LL DATA CONT" + +control_opcodes = [ + "LL_CONNECTION_UPDATE_IND", + "LL_CHANNEL_MAP_IND", + "LL_TERMINATE_IND", + "LL_ENC_REQ", + "LL_ENC_RSP", + "LL_START_ENC_REQ", + "LL_START_ENC_RSP", + "LL_UNKNOWN_RSP", + "LL_FEATURE_REQ", + "LL_FEATURE_RSP", + "LL_PAUSE_ENC_REQ", + "LL_PAUSE_ENC_RSP", + "LL_VERSION_IND", + "LL_REJECT_IND", + "LL_SLAVE_FEATURE_REQ", + "LL_CONNECTION_PARAM_REQ", + "LL_CONNECTION_PARAM_RSP", + "LL_REJECT_EXT_IND", + "LL_PING_REQ", + "LL_PING_RSP", + "LL_LENGTH_REQ", + "LL_LENGTH_RSP", + "LL_PHY_REQ", + "LL_PHY_RSP", + "LL_PHY_UPDATE_IND", + "LL_MIN_USED_CHANNELS_IND" +] + + class LlControlMessage(DataMessage): pdutype = "LL CONTROL" @@ -275,44 +341,27 @@ def __init__(self, pkt: PacketMessage): self.opcode = self.body[2] def str_opcode(self): - control_opcodes = [ - "LL_CONNECTION_UPDATE_IND", - "LL_CHANNEL_MAP_IND", - "LL_TERMINATE_IND", - "LL_ENC_REQ", - "LL_ENC_RSP", - "LL_START_ENC_REQ", - "LL_START_ENC_RSP", - "LL_UNKNOWN_RSP", - "LL_FEATURE_REQ", - "LL_FEATURE_RSP", - "LL_PAUSE_ENC_REQ", - "LL_PAUSE_ENC_RSP", - "LL_VERSION_IND", - "LL_REJECT_IND", - "LL_SLAVE_FEATURE_REQ", - "LL_CONNECTION_PARAM_REQ", - "LL_CONNECTION_PARAM_RSP", - "LL_REJECT_EXT_IND", - "LL_PING_REQ", - "LL_PING_RSP", - "LL_LENGTH_REQ", - "LL_LENGTH_RSP", - "LL_PHY_REQ", - "LL_PHY_RSP", - "LL_PHY_UPDATE_IND", - "LL_MIN_USED_CHANNELS_IND" - ] if self.opcode < len(control_opcodes): return "Opcode: %s" % control_opcodes[self.opcode] else: return "Opcode: RFU (0x%02X)" % self.opcode + def dict_opcode(self): + if self.opcode < len(control_opcodes): + return {"Opcode": control_opcodes[self.opcode]} + else: + return {"Opcode RFU": self.opcode} + def _str_decode(self): return "\n".join([ self.str_datatype(), self.str_opcode()]) + def to_dict(self): + return {self.pdutype: self.pkt, "DataType": self.dict_datatype(), + "Opcode": self.dict_opcode()} + + class AdvaMessage(AdvertMessage): def __init__(self, pkt: PacketMessage): super().__init__(pkt) @@ -327,18 +376,30 @@ def _str_decode(self): self.str_adtype(), self.str_adva()]) + def to_dict(self): + res = {self.pdutype: self.pkt} + res["AdvA"] = str_mac2(self.AdvA, self.TxAdd) + if len(self.adv_data) > 0: + res["AdvData"] = self.adv_data.hex() + return res + + class AdvIndMessage(AdvaMessage): pdutype = "ADV_IND" + class AdvNonconnIndMessage(AdvaMessage): pdutype = "ADV_NONCONN_IND" + class ScanRspMessage(AdvaMessage): pdutype = "SCAN_RSP" + class AdvScanIndMessage(AdvaMessage): pdutype = "ADV_SCAN_IND" + class AdvDirectIndMessage(AdvertMessage): pdutype = "ADV_DIRECT_IND" @@ -351,11 +412,20 @@ def __init__(self, pkt: PacketMessage): def str_ata(self): return "AdvA: %s TargetA: %s" % (str_mac2(self.AdvA, self.TxAdd), str_mac2(self.TargetA, self.RxAdd)) + def dict_ata(self): + return {"AdvA": str_mac2(self.AdvA, self.TxAdd), "TargetA": str_mac2(self.TargetA, self.RxAdd), + "AdvData": self.adv_data.hex()} + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_ata()]) + def to_dict(self): + return {self.pdutype: self.pkt, "adtype": self.dict_adtype(), "ata": self.dict_ata(), + "AdvData": self.adv_data.hex()} + + class ScanReqMessage(AdvertMessage): pdutype = "SCAN_REQ" @@ -367,14 +437,22 @@ def __init__(self, pkt: PacketMessage): def str_asa(self): return "ScanA: %s AdvA: %s" % (str_mac2(self.ScanA, self.TxAdd), str_mac2(self.AdvA, self.RxAdd)) + def dict_asa(self): + return {"ScanA": str_mac2(self.ScanA, self.TxAdd), "AdvA": str_mac2(self.AdvA, self.RxAdd)} + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_asa()]) + def to_dict(self): + return {self.pdutype: self.pkt, "adtype": self.dict_adtype(), "asa": self.str_asa()} + + class AuxScanReqMessage(ScanReqMessage): pdutype = "AUX_SCAN_REQ" + class ConnectIndMessage(AdvertMessage): pdutype = "CONNECT_IND" @@ -386,19 +464,34 @@ def __init__(self, pkt: PacketMessage): self.CRCInit = self.body[18] | (self.body[19] << 8) | (self.body[20] << 16) self.WinSize = self.body[21] self.WinOffset, self.Interval, self.Latency, self.Timeout = unpack( - "> 5 def str_aia(self): return "InitA: %s AdvA: %s AA: 0x%08X CRCInit: 0x%06X" % ( - str_mac2(self.InitA, self.TxAdd), str_mac2(self.AdvA, self.RxAdd), self.aa_conn, self.CRCInit) + str_mac2(self.InitA, self.TxAdd), str_mac2(self.AdvA, self.RxAdd), self.aa_conn, self.CRCInit) + + def dict_aia(self): + return {"InitA": str_mac2(self.InitA, self.TxAdd), + "AdvA": str_mac2(self.AdvA, self.RxAdd), + "aa": self.aa_conn, + "CRCInit": self.CRCInit} def str_conn_params(self): return "WinSize: %d WinOffset: %d Interval: %d Latency: %d Timeout: %d Hop: %d SCA: %d" % ( - self.WinSize, self.WinOffset, self.Interval, self.Latency, self.Timeout, - self.Hop, self.SCA) + self.WinSize, self.WinOffset, self.Interval, self.Latency, self.Timeout, + self.Hop, self.SCA) + + def dict_conn_params(self): + return {"WinSize": self.WinSize, + "WinOffset": self.WinOffset, + "Interval": self.Interval, + "Latency": self.Latency, + "Timeout": self.Timeout, + "Hop": self.Hop, + "SCA": self.SCA} def str_chm(self): if self.ChM == b'\xFF\xFF\xFF\xFF\x1F': @@ -413,6 +506,19 @@ def str_chm(self): chanstr = "%02X %02X %02X %02X %02X" % tuple(self.ChM) return "Channel Map: %s (%s)" % (chanstr, descstr) + def dict_chm(self): + if self.ChM == b'\xFF\xFF\xFF\xFF\x1F': + descstr = "all channels" + else: + has_chan = lambda chm, i: (chm[i // 8] & (1 << (i & 7))) != 0 + excludes = [] + for i in range(37): + if not has_chan(self.ChM, i): + excludes.append(i) + descstr = "excludes " + ", ".join([str(i) for i in excludes]) + chanstr = "%02X %02X %02X %02X %02X" % tuple(self.ChM) + return {"Channel Map": {"channel": chanstr, "desc": descstr}} + def _str_decode(self): return "\n".join([ self.str_adtype(), @@ -420,9 +526,19 @@ def _str_decode(self): self.str_conn_params(), self.str_chm()]) + def to_dict(self): + return {self.pdutype: self.pkt, "adtype": self.dict_adtype(), "aia": self.dict_aia(), + "conn_params": self.dict_conn_params(), "chm": self.dict_chm()} + + class AuxConnectReqMessage(ConnectIndMessage): pdutype = "AUX_CONNECT_REQ" + +phy_names = ["1M", "2M", "Coded", "Invalid3", "Invalid4", + "Invalid5", "Invalid6", "Invalid7"] + + class AuxPtr: def __init__(self, ptr): self.chan = ptr[0] & 0x3F @@ -432,11 +548,14 @@ def __init__(self, ptr): self.offsetUsec = auxOffset * offsetMult def __str__(self): - phy_names = ["1M", "2M", "Coded", "Invalid3", "Invalid4", - "Invalid5", "Invalid6", "Invalid7"] return "AuxPtr Chan: %d PHY: %s Delay: %d us" % ( self.chan, phy_names[self.phy], self.offsetUsec) + def to_dict(self): + return {"chan": self.chan, "PHY": phy_names[self.phy], + "Delay_us": self.offsetUsec} + + class AdvDataInfo: def __init__(self, adi): self.did = adi[0] + ((adi[1] & 0x0F) << 8) @@ -450,6 +569,10 @@ def __eq__(self, other): return self.did == other.did and self.sid == other.sid return False + def to_dict(self): + return {"did": self.did, "sid": self.sid} + + class AdvExtIndMessage(AdvertMessage): pdutype = "ADV_EXT_IND" @@ -466,7 +589,7 @@ def __init__(self, pkt: PacketMessage): if len(self.body) < 3: raise ValueError("Extended advertisement too short!") - self.AdvMode = self.body[2] >> 6 # Neither, Connectable, Scannable, or RFU + self.AdvMode = self.body[2] >> 6 # Neither, Connectable, Scannable, or RFU hdrBodyLen = self.body[2] & 0x3F if len(self.body) < hdrBodyLen + 1: @@ -476,37 +599,37 @@ def __init__(self, pkt: PacketMessage): hdrPos = 4 if hdrFlags & 0x01: - self.AdvA = self.body[hdrPos:hdrPos+6] + self.AdvA = self.body[hdrPos:hdrPos + 6] hdrPos += 6 if hdrFlags & 0x02: - self.TargetA = self.body[hdrPos:hdrPos+6] + self.TargetA = self.body[hdrPos:hdrPos + 6] hdrPos += 6 if hdrFlags & 0x04: self.CTEInfo = self.body[hdrPos] hdrPos += 1 if hdrFlags & 0x08: - self.AdvDataInfo = AdvDataInfo(self.body[hdrPos:hdrPos+2]) + self.AdvDataInfo = AdvDataInfo(self.body[hdrPos:hdrPos + 2]) hdrPos += 2 if hdrFlags & 0x10: - self.AuxPtr = AuxPtr(self.body[hdrPos:hdrPos+3]) + self.AuxPtr = AuxPtr(self.body[hdrPos:hdrPos + 3]) hdrPos += 3 if hdrFlags & 0x20: # TODO decode this nicely - self.SyncInfo = self.body[hdrPos:hdrPos+18] + self.SyncInfo = self.body[hdrPos:hdrPos + 18] hdrPos += 18 if hdrFlags & 0x40: - self.TxPower = unpack("b", self.body[hdrPos:hdrPos+1])[0] + self.TxPower = unpack("b", self.body[hdrPos:hdrPos + 1])[0] hdrPos += 1 if hdrPos - 3 < hdrBodyLen: ACADLen = hdrBodyLen - (hdrPos - 3) - self.ACAD = self.body[hdrPos:hdrPos+ACADLen] + self.ACAD = self.body[hdrPos:hdrPos + ACADLen] hdrPos += ACADLen self.adv_data = self.body[hdrPos:] def str_aext(self): amodes = ["Non-connectable, non-scannable", - "Connectable", "Scannable", "RFU"] + "Connectable", "Scannable", "RFU"] modemsg = "AdvMode: %s\n" % amodes[self.AdvMode] dispMsgs = [] @@ -533,27 +656,62 @@ def str_aext(self): else: return dmsg + def dict_aext(self): + ret = {} + + amodes = ["Non-connectable, non-scannable", + "Connectable", "Scannable", "RFU"] + ret["AdvMode"] = amodes[self.AdvMode] + if self.AdvA: + ret["AdvA"] = str_mac2(self.AdvA, self.TxAdd) + if self.TargetA: + ret["TargetA"] = str_mac2(self.TargetA, self.RxAdd) + if self.CTEInfo: + ret["CTEInfo"] = self.CTEInfo + if self.AdvDataInfo: + ret["AdvDataInfo"] = self.AdvDataInfo.to_dict() + if self.SyncInfo: + # TODO decode this nicely + ret["SyncInfo"] = self.SyncInfo.hex() + if self.TxPower: + ret["TxPower"] = self.TxPower + if self.ACAD: + ret["ACAD"] = self.ACAD.hex() + if self.AuxPtr: + ret["AuxPtr"] = self.AuxPtr.to_dict() + return ret + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_aext()]) + def to_dict(self): + return {self.pdutype: self.pkt, "adtype": self.dict_adtype(), "aext": self.dict_aext(), + "AdvData": self.adv_data.hex()} + + def get_adi(pkt: PacketMessage): dpkt = AdvExtIndMessage(pkt) return dpkt.AdvDataInfo + class AuxAdvIndMessage(AdvExtIndMessage): pdutype = "AUX_ADV_IND" + class AuxScanRspMessage(AuxAdvIndMessage): pdutype = "AUX_SCAN_RSP" + class AuxChainIndMessage(AuxAdvIndMessage): pdutype = "AUX_CHAIN_IND" + class AuxConnectRspMessage(AdvExtIndMessage): pdutype = "AUX_CONNECT_RSP" + def update_state(pkt: DPacketMessage, dstate: SniffleDecoderState): if isinstance(pkt, ConnectIndMessage): if pkt.chan < 37 and dstate.last_state != SnifferState.ADVERTISING_EXT: @@ -569,15 +727,15 @@ def update_state(pkt: DPacketMessage, dstate: SniffleDecoderState): dstate.aux_pending_crci = None elif isinstance(pkt, AuxAdvIndMessage) and pkt.AuxPtr: dstate.aux_pending_chain = (pkt.AdvDataInfo, pkt.AuxPtr.chan, - pkt.ts + pkt.AuxPtr.offsetUsec*1E-6 + 0.0005) - elif isinstance(pkt, AuxAdvIndMessage) and pkt.AdvMode == 2: # scannable - overhead_bytes = 8 # 1 byte preamble, 4 byte AA, 3 byte CRC - if pkt.phy == 1: # 2M + pkt.ts + pkt.AuxPtr.offsetUsec * 1E-6 + 0.0005) + elif isinstance(pkt, AuxAdvIndMessage) and pkt.AdvMode == 2: # scannable + overhead_bytes = 8 # 1 byte preamble, 4 byte AA, 3 byte CRC + if pkt.phy == 1: # 2M time_per_byte = 4E-6 - elif pkt.phy == 2: # Coded S=8 + elif pkt.phy == 2: # Coded S=8 overhead_bytes = 10 time_per_byte = 64E-6 - elif pkt.phy == 3: # Coded S=2 + elif pkt.phy == 3: # Coded S=2 overhead_bytes = 27 time_per_byte = 16E-6 else: diff --git a/python_cli/sniffle/sniffle_hw.py b/python_cli/sniffle/sniffle_hw.py index 102b669..d3ab4f8 100644 --- a/python_cli/sniffle/sniffle_hw.py +++ b/python_cli/sniffle/sniffle_hw.py @@ -7,7 +7,7 @@ from struct import pack, unpack from base64 import b64encode, b64decode from binascii import Error as BAError -from time import time +from time import time, sleep from random import randint, randrange from serial.tools.list_ports import comports from traceback import format_exception @@ -80,7 +80,7 @@ def is_cp2102(serport): return True return False -def make_sniffle_hw(serport=None, logger=None, timeout=None): +def make_sniffle_hw(serport=None, baudrate=921600, logger=None, timeout=None): if serport is None: return SniffleHW(serport, logger, timeout) elif serport.startswith('rfnm'): @@ -96,14 +96,17 @@ def make_sniffle_hw(serport=None, logger=None, timeout=None): fname = serport[5:] return SniffleFileSDR(fname, logger=logger) else: - return SniffleHW(serport, logger, timeout) + return SniffleHW(serport, baudrate, logger, timeout) class SniffleHW: max_interval_preload_pairs = 4 api_level = 0 - def __init__(self, serport=None, logger=None, timeout=None): - baud = 2000000 + def __init__(self, serport=None, baudrate=None, logger=None, timeout=None): + if baudrate is None: + baud = 2000000 + else: + baud = int(baudrate) if serport is None: serport = find_xds110_serport() if serport is None: @@ -112,10 +115,11 @@ def __init__(self, serport=None, logger=None, timeout=None): serport = find_catsniffer_v3_serport() if serport is None: raise IOError("Sniffle device not found") - else: + elif baudrate is None: baud = 921600 elif is_cp2102(serport): - baud = 921600 + if baudrate is None: + baud = 921600 self.timeout = timeout self.decoder_state = SniffleDecoderState() @@ -420,10 +424,14 @@ def mark_and_flush(self): marker_data = pack(' 1000: + break def probe_fw_version(self): self.cmd_version() diff --git a/python_cli/sniffle/sniffle_sdr.py b/python_cli/sniffle/sniffle_sdr.py index 158bec3..63ba944 100644 --- a/python_cli/sniffle/sniffle_sdr.py +++ b/python_cli/sniffle/sniffle_sdr.py @@ -394,10 +394,16 @@ def __init__(self, driver='rfnm', mode='single', logger=None): multi_chan = False gain = 10 chan = 37 - + if driver == 'rfnm': - self.sdr = SoapyDevice({'driver': driver}) - + self.sdr = None + results = SoapyDevice.enumerate() + for device in results: + if device["driver"] == driver: + self.sdr = SoapyDevice(device) + break + if self.sdr is None: + assert False, "No SoapySDR found" rates = self.sdr.listSampleRates(SOAPY_SDR_RX, self.sdr_chan) antennas = self.sdr.listAntennas(SOAPY_SDR_RX, self.sdr_chan) self.sdr.setAntenna(SOAPY_SDR_RX, self.sdr_chan, antennas[1]) diff --git a/python_cli/sniffle_extcap.py b/python_cli/sniffle_extcap.py index 1b5ed93..74aabc2 100755 --- a/python_cli/sniffle_extcap.py +++ b/python_cli/sniffle_extcap.py @@ -38,7 +38,7 @@ import traceback from serial.tools.list_ports import comports from sniffle.constants import BLE_ADV_AA -from sniffle.sniffle_hw import make_sniffle_hw, PacketMessage, SnifferMode, PhyMode +from sniffle.sniffle_hw import make_sniffle_hw, PacketMessage, SnifferMode, PhyMode, SniffleHW from sniffle.packet_decoder import (DataMessage, AdvaMessage, AdvDirectIndMessage, ScanRspMessage, AdvExtIndMessage, str_mac) from sniffle.pcap import PcapBleWriter @@ -188,6 +188,7 @@ def loadArgs(self, args=None): help="Ignore encrypted PHY mode changes") argParser.add_argument("--crcerr", action="store_true", help="Capture packets with CRC errors") + argParser.add_argument("--baudrate", default=None, help="Sniffer serial port baudrate") self.args = argParser.parse_args(args=args) diff --git a/python_cli/uart_test.py b/python_cli/uart_test.py index 47ae826..3647f0a 100755 --- a/python_cli/uart_test.py +++ b/python_cli/uart_test.py @@ -10,9 +10,10 @@ def main(): aparse = argparse.ArgumentParser(description="UART echo test for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") args = aparse.parse_args() - hw = SniffleHW(args.serport, timeout=0.1) + hw = SniffleHW(serport=args.serport, baudrate=args.baudrate, timeout=0.1) # listen in a way that will receive nothing hw.cmd_chan_aa_phy(0, 0xFFFFFFFF, 0) diff --git a/python_cli/version_check.py b/python_cli/version_check.py index 09bf5bb..e22125e 100755 --- a/python_cli/version_check.py +++ b/python_cli/version_check.py @@ -10,9 +10,10 @@ def main(): aparse = argparse.ArgumentParser(description="Sniffle firmware version check utility") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate") args = aparse.parse_args() - hw = SniffleHW(args.serport, timeout=0.1) + hw = SniffleHW(serport=args.serport, baudrate=args.baudrate, timeout=0.1) ver_msg = hw.probe_fw_version() if ver_msg: