diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index fb213d7..69291bb 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -1,10 +1,10 @@ -import dataclasses import json import queue import threading import time import traceback from typing import Optional +from operator import itemgetter from google.protobuf.json_format import Parse @@ -12,6 +12,8 @@ from synapse.api.status_pb2 import DeviceState from synapse.api.synapse_pb2 import DeviceConfiguration import synapse as syn +import synapse.client.channel as channel +import synapse.utils.ndtp_types as ndtp_types def add_commands(subparsers): @@ -22,6 +24,8 @@ def add_commands(subparsers): type=str, help="Configuration file", ) + a.add_argument("--num_ch", type=int, help="Number of channels to read from, overrides config") + a.add_argument("--bin", type=bool, help="Output binary format instead of JSON") a.add_argument("--duration", type=int, help="Duration to read for in seconds") a.add_argument("--node_id", type=int, help="ID of the StreamOut node to read from") a.set_defaults(func=read) @@ -56,6 +60,18 @@ def read(args): ) assert stream_out is not None, "No StreamOut node found in config" + if args.num_ch: + ephys = next( + (n for n in config.nodes if n.type == NodeType.kElectricalBroadband), None + ) + num_ch = args.num_ch + offset = 0 + channels = [] + for ch in range(offset, offset + num_ch): + channels.append(channel.Channel(ch, 2*ch, 2*ch + 1)) + + ephys.channels = channels + if not device.configure(config): raise ValueError("Failed to configure device") @@ -86,7 +102,10 @@ def read(args): print(f"Streaming data... Ctrl+C to stop") q = queue.Queue() stop = threading.Event() - thread = threading.Thread(target=_data_writer, args=(stop, q)) + if args.bin: + thread = threading.Thread(target=_binary_writer, args=(stop, q, num_ch)) + else: + thread = threading.Thread(target=_data_writer, args=(stop, q)) thread.start() try: @@ -109,6 +128,7 @@ def read(args): def read_packets(node: syn.StreamOut, q: queue.Queue, duration: Optional[int] = None): packet_count = 0 seq_number = None + dropped_packets = 0 start = time.time() print(f"Reading packets for duration {duration} seconds" if duration else "Reading packets...") @@ -120,16 +140,48 @@ def read_packets(node: syn.StreamOut, q: queue.Queue, duration: Optional[int] = packet_count += 1 if seq_number is not None and header.seq_number != seq_number + 1: print(f"Seq number out of order: {header.seq_number} != {seq_number + 1}") + dropped_packets += header.seq_number - (seq_number + 1) seq_number = header.seq_number q.put(data) - if duration and time.time() - start > duration: + if duration and (time.time() - start) > duration: break - print(f"Recieved {packet_count} packets in {time.time() - start} seconds") + print(f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)") +def _binary_writer(stop, q, num_ch): + filename = f"synapse_data_{time.strftime('%Y%m%d-%H%M%S')}.dat" + if filename: + fd = open(filename, "wb") + + channel_data = [] + while not stop.is_set() or not q.empty(): + try: + data: ndtp_types.ElectricalBroadbandData = q.get(True, 1) + except queue.Empty: + continue + + try: + for ch_id, samples in data.samples: + channel_data.append([ch_id, samples]) + if len(channel_data) == num_ch: + channel_data.sort(key=itemgetter(0)) + channel_samples = [ch_data[1] for ch_data in channel_data] + frames = list(zip(*channel_samples)) + channel_data = [] + + for frame in frames: + for sample in frame: + fd.write(int(sample).to_bytes(2, byteorder="little", signed=False)) + + except Exception as e: + print(f"Error processing data: {e}") + traceback.print_exc() + continue + + def _data_writer(stop, q): filename = f"synapse_data_{time.strftime('%Y%m%d-%H%M%S')}.jsonl" if filename: