Skip to content

Commit

Permalink
Add zmq server and json support
Browse files Browse the repository at this point in the history
  • Loading branch information
bkerler committed Oct 20, 2024
1 parent 84b1ec0 commit 65098d8
Show file tree
Hide file tree
Showing 13 changed files with 338 additions and 100 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Sniffle has a number of useful features, including:
* Easy to extend host-side software written in Python
* PCAP export compatible with the Ubertooth
* Wireshark compatible plugin
* ZMQ Publishing server

## Prerequisites

Expand Down Expand Up @@ -244,6 +245,8 @@ options:
-d, --decode Decode advertising data
-o OUTPUT, --output OUTPUT
PCAP output file name
-z, --zmq Enable ZMQ server
--zmqsetting Set ZMQ server ip and port (default:127.0.0.1:4222)
```

The XDS110 debugger on the Launchpad boards creates two serial ports. On
Expand Down
5 changes: 4 additions & 1 deletion python_cli/advertiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
# global variable to access hardware
hw = None


def main():
aparse = argparse.ArgumentParser(description="Connection initiator test script for Sniffle BLE5 sniffer")
aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name")
aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate")
args = aparse.parse_args()

global hw
hw = SniffleHW(args.serport)
hw = SniffleHW(args.serport, baudrate=args.baudrate)

# set the advertising channel (and return to ad-sniffing mode)
hw.cmd_chan_aa_phy(37, BLE_ADV_AA, 0)
Expand Down Expand Up @@ -69,5 +71,6 @@ def main():
if msg is not None:
print(msg, end='\n\n')


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion python_cli/initiator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
def main():
aparse = argparse.ArgumentParser(description="Connection initiator test script for Sniffle BLE5 sniffer")
aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name")
aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate")
aparse.add_argument("-c", "--advchan", default=37, choices=[37, 38, 39], type=int,
help="Advertising channel to listen on")
aparse.add_argument("-m", "--mac", default=None, help="Specify target MAC address")
Expand All @@ -31,7 +32,7 @@ def main():
args = aparse.parse_args()

global hw
hw = SniffleHW(args.serport)
hw = SniffleHW(args.serport, baudrate=args.baudrate)

targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string)
if targ_specs < 1:
Expand Down
4 changes: 4 additions & 0 deletions python_cli/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pyserial
zmq
numpy
scipy
3 changes: 2 additions & 1 deletion python_cli/reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
def main():
aparse = argparse.ArgumentParser(description="Firmware reset utility for Sniffle BLE5 sniffer")
aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name")
aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate")
args = aparse.parse_args()

hw = SniffleHW(args.serport)
hw = SniffleHW(args.serport, baudrate=args.baudrate)

# 5 resets seems to work more reliably than fewer
print("Sending reset commands...")
Expand Down
36 changes: 21 additions & 15 deletions python_cli/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
advertisers = {}
done_scan = False


def sigint_handler(sig, frame):
global done_scan
done_scan = True
hw.cancel_recv()


class Advertiser:
def __init__(self):
self.adv = None
Expand All @@ -42,32 +44,34 @@ def add_hit(self, rssi):
self.rssi_min = rssi
elif rssi > self.rssi_max:
self.rssi_max = rssi
self.rssi_avg = (self.rssi_avg*self.hits + rssi) / (self.hits + 1)
self.rssi_avg = (self.rssi_avg * self.hits + rssi) / (self.hits + 1)
self.hits += 1


def main():
aparse = argparse.ArgumentParser(description="Scanner utility for Sniffle BLE5 sniffer")
aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name")
aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate")
aparse.add_argument("-c", "--advchan", default=37, choices=[37, 38, 39], type=int,
help="Advertising channel to listen on")
help="Advertising channel to listen on")
aparse.add_argument("-r", "--rssi", default=-128, type=int,
help="Filter packets by minimum RSSI")
help="Filter packets by minimum RSSI")
aparse.add_argument("-l", "--longrange", action="store_true",
help="Use long range (coded) PHY for primary advertising")
help="Use long range (coded) PHY for primary advertising")
aparse.add_argument("-d", "--decode", action="store_true",
help="Decode advertising data")
help="Decode advertising data")
aparse.add_argument("-o", "--output", default=None, help="PCAP output file name")
args = aparse.parse_args()

global hw
hw = make_sniffle_hw(args.serport)
hw = make_sniffle_hw(serport=args.serport, baudrate=args.baudrate)

hw.setup_sniffer(
mode=SnifferMode.ACTIVE_SCAN,
chan=args.advchan,
ext_adv=True,
coded_phy=args.longrange,
rssi_min=args.rssi)
mode=SnifferMode.ACTIVE_SCAN,
chan=args.advchan,
ext_adv=True,
coded_phy=args.longrange,
rssi_min=args.rssi)

# zero timestamps and flush old packets
hw.mark_and_flush()
Expand Down Expand Up @@ -96,10 +100,10 @@ def main():

print("\n\nScan Results:")
for a in sorted(advertisers.keys(), key=lambda k: advertisers[k].rssi_avg, reverse=True):
print("="*80)
print("=" * 80)
print("AdvA: %s Avg/Min/Max RSSI: %.1f/%i/%i Hits: %i" % (
a, advertisers[a].rssi_avg, advertisers[a].rssi_min, advertisers[a].rssi_max,
advertisers[a].hits))
a, advertisers[a].rssi_avg, advertisers[a].rssi_min, advertisers[a].rssi_max,
advertisers[a].hits))
if advertisers[a].adv:
print("\nAdvertisement:")
print(advertisers[a].adv.str_header())
Expand All @@ -120,7 +124,8 @@ def main():
print(advertisers[a].scan_rsp.hexdump())
else:
print("\nScan Response: None")
print("="*80, end="\n\n")
print("=" * 80, end="\n\n")


def handle_packet(dpkt):
# Ignore non-advertisements (shouldn't get any)
Expand Down Expand Up @@ -151,5 +156,6 @@ def handle_packet(dpkt):
else:
advertisers[adva].adv = dpkt


if __name__ == "__main__":
main()
53 changes: 49 additions & 4 deletions python_cli/sniff_receiver.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
#!/usr/bin/env python3

# Written by Sultan Qasim Khan
# OpenDroneID mods (c) by B. Kerler
# Copyright (c) 2018-2024, NCC Group plc
# Released as open source under GPLv3

import argparse, sys
import json
import time
from binascii import unhexlify
from sniffle.constants import BLE_ADV_AA
from sniffle.pcap import PcapBleWriter
from sniffle.sniffle_hw import (make_sniffle_hw, PacketMessage, DebugMessage, StateMessage,
MeasurementMessage, SnifferMode, PhyMode)
from sniffle.packet_decoder import (AdvaMessage, AdvDirectIndMessage, AdvExtIndMessage,
ScanRspMessage, DataMessage, str_mac)
ScanRspMessage, DataMessage, str_mac, AdvIndMessage)
from sniffle.errors import UsageError, SourceDone
from sniffle.advdata.decoder import decode_adv_data

Expand All @@ -24,6 +26,7 @@
def main():
aparse = argparse.ArgumentParser(description="Host-side receiver for Sniffle BLE5 sniffer")
aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name")
aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baudrate")
aparse.add_argument("-c", "--advchan", default=40, choices=[37, 38, 39], type=int,
help="Advertising channel to listen on")
aparse.add_argument("-p", "--pause", action="store_true",
Expand Down Expand Up @@ -55,8 +58,41 @@ def main():
aparse.add_argument("-d", "--decode", action="store_true",
help="Decode advertising data")
aparse.add_argument("-o", "--output", default=None, help="PCAP output file name")
aparse.add_argument("-z", "--zmq", action="store_true", help="Enable zmq")
aparse.add_argument("--zmqsetting", default="127.0.0.1:4222", help="Define zmq server settings")
aparse.add_argument("-v", "--verbose", action="store_true", help="Print messages")
args = aparse.parse_args()

if args.zmq:
import zmq

url = f"tcp://{args.zmqsetting}"

context = zmq.Context()
socket = context.socket(zmq.XPUB)
socket.setsockopt(zmq.XPUB_VERBOSE, True)
socket.bind(url)

def zmq_thread(socket):
try:
while True:
event = socket.recv()
# Event is one byte 0=unsub or 1=sub, followed by topic
if event[0] == 1:
log("new subscriber for", event[1:])
elif event[0] == 0:
log("unsubscribed", event[1:])
except zmq.error.ContextTerminated:
pass

def log(*msg):
s = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print("%s:" % s, *msg, end="\n", file=sys.stderr)

from threading import Thread
zthread = Thread(target=zmq_thread, args=[socket], daemon=True, name='zmq')
zthread.start()

# Sanity check argument combinations
targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string)
if args.hop and targ_specs < 1:
Expand All @@ -70,7 +106,7 @@ def main():
raise UsageError("Don't specify an advertising channel if you want advertising channel hopping!")

global hw
hw = make_sniffle_hw(args.serport)
hw = make_sniffle_hw(serport=args.serport, baudrate=args.baudrate)

# if a channel was explicitly specified, don't hop
hop3 = True if targ_specs else False
Expand Down Expand Up @@ -137,10 +173,19 @@ def main():
while True:
try:
msg = hw.recv_and_decode()
print_message(msg, args.quiet, args.decode)
if args.zmq:
smsg = msg.to_dict()
smsg = json.dumps(smsg)
socket.send_string(smsg)
if args.verbose:
print_message(msg, args.quiet, args.decode)
else:
print_message(msg, args.quiet, args.decode)
except SourceDone:
break
except KeyboardInterrupt:
if args.zmq:
socket.close()
hw.cancel_recv()
sys.stderr.write("\r")
break
Expand Down
Loading

0 comments on commit 65098d8

Please sign in to comment.