Skip to content

Commit

Permalink
Add option to write streaming data in raw binary format
Browse files Browse the repository at this point in the history
  • Loading branch information
polymerizedsage committed Nov 26, 2024
1 parent cb808b7 commit 2d45123
Showing 1 changed file with 56 additions and 4 deletions.
60 changes: 56 additions & 4 deletions synapse/cli/streaming.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import dataclasses
import json
import queue
import threading
import time
import traceback
from typing import Optional
from operator import itemgetter

from google.protobuf.json_format import Parse

from synapse.api.node_pb2 import NodeType
from synapse.api.status_pb2 import DeviceState
from synapse.api.synapse_pb2 import DeviceConfiguration
import synapse as syn
import synapse.client.channel as channel
import synapse.utils.ndtp_types as ndtp_types


def add_commands(subparsers):
Expand All @@ -22,6 +24,8 @@ def add_commands(subparsers):
type=str,
help="Configuration file",
)
a.add_argument("--num_ch", type=int, help="Number of channels to read from, overrides config")
a.add_argument("--bin", type=bool, help="Output binary format instead of JSON")
a.add_argument("--duration", type=int, help="Duration to read for in seconds")
a.add_argument("--node_id", type=int, help="ID of the StreamOut node to read from")
a.set_defaults(func=read)
Expand Down Expand Up @@ -56,6 +60,18 @@ def read(args):
)
assert stream_out is not None, "No StreamOut node found in config"

if args.num_ch:
ephys = next(
(n for n in config.nodes if n.type == NodeType.kElectricalBroadband), None
)
num_ch = args.num_ch
offset = 0
channels = []
for ch in range(offset, offset + num_ch):
channels.append(channel.Channel(ch, 2*ch, 2*ch + 1))

ephys.channels = channels

if not device.configure(config):
raise ValueError("Failed to configure device")

Expand Down Expand Up @@ -86,7 +102,10 @@ def read(args):
print(f"Streaming data... Ctrl+C to stop")
q = queue.Queue()
stop = threading.Event()
thread = threading.Thread(target=_data_writer, args=(stop, q))
if args.bin:
thread = threading.Thread(target=_binary_writer, args=(stop, q, num_ch))
else:
thread = threading.Thread(target=_data_writer, args=(stop, q))
thread.start()

try:
Expand All @@ -109,6 +128,7 @@ def read(args):
def read_packets(node: syn.StreamOut, q: queue.Queue, duration: Optional[int] = None):
packet_count = 0
seq_number = None
dropped_packets = 0
start = time.time()

print(f"Reading packets for duration {duration} seconds" if duration else "Reading packets...")
Expand All @@ -120,16 +140,48 @@ def read_packets(node: syn.StreamOut, q: queue.Queue, duration: Optional[int] =
packet_count += 1
if seq_number is not None and header.seq_number != seq_number + 1:
print(f"Seq number out of order: {header.seq_number} != {seq_number + 1}")
dropped_packets += header.seq_number - (seq_number + 1)
seq_number = header.seq_number

q.put(data)

if duration and time.time() - start > duration:
if duration and (time.time() - start) > duration:
break

print(f"Recieved {packet_count} packets in {time.time() - start} seconds")
print(f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)")


def _binary_writer(stop, q, num_ch):
filename = f"synapse_data_{time.strftime('%Y%m%d-%H%M%S')}.dat"
if filename:
fd = open(filename, "wb")

channel_data = []
while not stop.is_set() or not q.empty():
try:
data: ndtp_types.ElectricalBroadbandData = q.get(True, 1)
except queue.Empty:
continue

try:
for ch_id, samples in data.samples:
channel_data.append([ch_id, samples])
if len(channel_data) == num_ch:
channel_data.sort(key=itemgetter(0))
channel_samples = [ch_data[1] for ch_data in channel_data]
frames = list(zip(*channel_samples))
channel_data = []

for frame in frames:
for sample in frame:
fd.write(int(sample).to_bytes(2, byteorder="little", signed=False))

except Exception as e:
print(f"Error processing data: {e}")
traceback.print_exc()
continue


def _data_writer(stop, q):
filename = f"synapse_data_{time.strftime('%Y%m%d-%H%M%S')}.jsonl"
if filename:
Expand Down

0 comments on commit 2d45123

Please sign in to comment.