diff --git a/synapse/simulator/nodes/electrical_broadband.py b/synapse/simulator/nodes/electrical_broadband.py index 2938a5f..8226cd9 100644 --- a/synapse/simulator/nodes/electrical_broadband.py +++ b/synapse/simulator/nodes/electrical_broadband.py @@ -1,3 +1,4 @@ +import asyncio import random import time @@ -57,4 +58,4 @@ async def run(self): t0 = now - time.sleep(0.100) + await asyncio.sleep(0.100) diff --git a/synapse/tests/test_ndtp.py b/synapse/tests/test_ndtp.py index d6404d8..bd4afe7 100644 --- a/synapse/tests/test_ndtp.py +++ b/synapse/tests/test_ndtp.py @@ -143,6 +143,77 @@ def test_ndtp_payload_broadband(): payload = NDTPPayloadBroadband(is_signed, bit_width, sample_rate, channels) p = payload.pack() + hexstring = " ".join(f"{i:02x}" for i in p) + assert hexstring == "18 00 00 03 00 00 03 00 00 00 00 03 00 10 02 00 30 00 00 10 00 30 04 00 50 06 00 00 02 00 03 bb 87 d0 3e 80" + + assert p[0] == (bit_width << 1) | (is_signed << 0) + + # number of channels + assert p[1] == 0 + assert p[2] == 0 + assert p[3] == 3 + + # sample rate + assert p[4] == 0 + assert p[5] == 0 + assert p[6] == 3 + + # ch 0 channel_id, 0 (24 bits, 3 bytes) + assert p[7] == 0 + assert p[8] == 0 + assert p[9] == 0 + + # ch 0 num_samples, 3 (16 bits, 2 bytes) + assert p[10] == 0 + assert p[11] == 3 + + # ch 0 channel_data, 1, 2, 3 (12 bits, 1.5 bytes each) + # 0000 0000 0001 0000 0000 0010 0000 0000 0011 .... + assert p[12] == 0 + assert p[13] == 16 + assert p[14] == 2 + assert p[15] == 0 + assert p[16] >= 3 + + # ch 1 channel_id, 1 (24 bits, 3 bytes, starting from 4 bit offset) + # 0011 0000 0000 0000 0000 0000 0001 .... + assert p[16] == 48 + assert p[17] == 0 + assert p[18] == 0 + assert p[19] >= 16 + + # ch 1 num_samples, 3 (16 bits, 2 bytes, starting from 4 bit offset) + # 0001 0000 0000 0000 0011 .... + assert p[19] == 16 + assert p[20] == 0 + assert p[21] >= 48 + + # ch 1 channel_data, 4, 5, 6 (12 bits, 1.5 bytes each) + # 0011 0000 0000 0100 0000 0000 0101 0000 0000 0110 + assert p[21] == 48 + assert p[22] == 4 + assert p[23] == 0 + assert p[24] == 80 + assert p[25] >= 6 + + # ch 2 channel_id, 2 (24 bits, 3 bytes) + # 0000 0000 0000 0000 0000 0010 + assert p[26] == 0 + assert p[27] == 0 + assert p[28] == 2 + + # ch 2 num_samples, 3 (16 bits, 2 bytes) + # 0000 0000 0000 0011 + assert p[29] == 0 + assert p[30] == 3 + + # ch 2 channel_data, 3000, 2000, 1000 (12 bits, 1.5 bytes each) + # 1011 1011 1000 0111 1101 0000 0011 1110 1000 .... + assert p[31] == 187 + assert p[32] == 135 + assert p[33] == 208 + assert p[34] == 62 + assert p[35] >= 128 u = NDTPPayloadBroadband.unpack(p) assert u.bit_width == bit_width @@ -161,7 +232,7 @@ def test_ndtp_payload_broadband(): assert p[0] >> 1 == bit_width assert (p[1] << 16) | (p[2] << 8) | p[3] == 3 - p = p[6:] + p = p[7:] unpacked, offset, p = to_ints(p, bit_width=24, count=1) assert unpacked[0] == 0 @@ -175,6 +246,43 @@ def test_ndtp_payload_broadband(): assert unpacked == [1, 2, 3] assert offset == 36 +def test_ndtp_payload_broadband_large(): + n_samples = 20000 + bit_width = 16 + sample_rate = 100000 + is_signed = False + channels = [ + NDTPPayloadBroadbandChannelData( + channel_id=0, + channel_data=[i for i in range(n_samples)], + ), + NDTPPayloadBroadbandChannelData( + channel_id=1, + channel_data=[i + 1 for i in range(n_samples)], + ), + NDTPPayloadBroadbandChannelData( + channel_id=2, + channel_data=[i + 2 for i in range(n_samples)], + ), + ] + + payload = NDTPPayloadBroadband(is_signed, bit_width, sample_rate, channels) + packed = payload.pack() + + unpacked = NDTPPayloadBroadband.unpack(packed) + assert unpacked.bit_width == bit_width + assert unpacked.is_signed == is_signed + assert len(unpacked.channels) == 3 + + assert unpacked.channels[0].channel_id == 0 + assert list(unpacked.channels[0].channel_data) == [i for i in range(n_samples)] + + assert unpacked.channels[1].channel_id == 1 + assert list(unpacked.channels[1].channel_data) == [i + 1 for i in range(n_samples)] + + assert unpacked.channels[2].channel_id == 2 + assert list(unpacked.channels[2].channel_data) == [i + 2 for i in range(n_samples)] + def test_ndtp_payload_spiketrain(): samples = [0, 1, 2, 3, 2] @@ -199,22 +307,22 @@ def test_ndtp_header(): # Data too smol with pytest.raises(ValueError): NDTPHeader.unpack( - struct.pack("B", NDTP_VERSION) + + struct.pack(">B", DataType.kBroadband) + + struct.pack(">Q", 123) ) -def test_ndtp_message(): +def test_ndtp_message_broadband(): header = NDTPHeader(DataType.kBroadband, timestamp=1234567890, seq_number=42) payload = NDTPPayloadBroadband( bit_width=12, - sample_rate=100, + sample_rate=3, is_signed=False, channels=[ NDTPPayloadBroadbandChannelData( channel_id=c, - channel_data=[c * 100 for _ in range(c + 1)], + channel_data=[s * 1000 for s in range(c + 1)], ) for c in range(3) ], @@ -222,12 +330,54 @@ def test_ndtp_message(): message = NDTPMessage(header, payload) packed = message.pack() + assert message._crc16 == 23793 + + hexstring = " ".join(f"{i:02x}" for i in packed) + assert hexstring == "01 02 00 00 00 00 49 96 02 d2 00 2a 18 00 00 03 00 00 03 00 00 00 00 01 00 00 00 00 10 00 20 00 3e 80 00 00 20 00 30 00 3e 87 d0 5c f1" + unpacked = NDTPMessage.unpack(packed) + assert message._crc16 == 23793 assert unpacked.header == message.header assert isinstance(unpacked.payload, NDTPPayloadBroadband) assert unpacked.payload == message.payload +def test_ndtp_message_broadband_large(): + header = NDTPHeader(DataType.kBroadband, timestamp=1234567890, seq_number=42) + payload = NDTPPayloadBroadband( + bit_width=16, + sample_rate=36000, + is_signed=False, + channels=[ + NDTPPayloadBroadbandChannelData( + channel_id=c, + channel_data=[i for i in range(10000)], + ) + for c in range(20) + ], + ) + message = NDTPMessage(header, payload) + + packed = message.pack() + assert message._crc16 == 45907 + + unpacked = NDTPMessage.unpack(packed) + assert unpacked._crc16 == 45907 + + assert unpacked.header == message.header + assert isinstance(unpacked.payload, NDTPPayloadBroadband) + assert unpacked.payload == message.payload + + u_payload = unpacked.payload + assert u_payload.bit_width == payload.bit_width + assert u_payload.sample_rate == payload.sample_rate + assert u_payload.is_signed == payload.is_signed + assert len(u_payload.channels) == len(payload.channels) + for i, c in enumerate(payload.channels): + assert u_payload.channels[i].channel_id == c.channel_id + assert list(u_payload.channels[i].channel_data) == list(c.channel_data) + +def test_ndtp_message_spiketrain(): header = NDTPHeader(DataType.kSpiketrain, timestamp=1234567890, seq_number=42) payload = NDTPPayloadSpiketrain(spike_counts=[1, 2, 3, 2, 1]) message = NDTPMessage(header, payload) diff --git a/synapse/tests/test_stream_out.py b/synapse/tests/test_stream_out.py index 109b0f8..0c5b8b6 100644 --- a/synapse/tests/test_stream_out.py +++ b/synapse/tests/test_stream_out.py @@ -1,10 +1,11 @@ import numpy as np import pytest -from synapse.api.datatype_pb2 import DataType from synapse.server.nodes.stream_out import StreamOut from synapse.utils.ndtp import NDTPMessage from synapse.utils.ndtp_types import ( + MAX_CH_PAYLOAD_SIZE_BYTES, + chunk_channel_data, ElectricalBroadbandData, SpiketrainData, ) @@ -43,9 +44,9 @@ def test_packing_broadband_data(): # Unsigned sample_data = [ - (1, np.array([1000, 2000, 3000], dtype=np.int16)), - (2, np.array([1234, 1234, 1234, 1234], dtype=np.int16)), - (3, np.array([1000, 2000, 3000, 4000, 3000], dtype=np.int16)), + (1, np.array([1000, 2000, 3000], dtype=np.uint16)), + (2, np.array([1234, 1234, 1234, 1234], dtype=np.uint16)), + (3, np.array([1000, 2000, 3000, 4000, 3000], dtype=np.uint16)), ] bdata = ElectricalBroadbandData( bit_width=12, @@ -60,13 +61,78 @@ def test_packing_broadband_data(): for i, p in enumerate(packed): unpacked = NDTPMessage.unpack(p) - assert unpacked.header.timestamp == bdata.t0 - assert unpacked.payload.bit_width == 12 - assert unpacked.payload.channels[0].channel_id == bdata.samples[i][0] - assert list(unpacked.payload.channels[0].channel_data) == list( - bdata.samples[i][1] - ) +def test_packing_broadband_data(): + node = StreamOut(id=1) + seq = 0 + + n_samples = 10000 + + sample_data = [ + (1, np.array([i for i in range(n_samples)], dtype=np.int16)), + (2, np.array([i for i in range(n_samples)], dtype=np.int16)), + (3, np.array([i for i in range(n_samples)], dtype=np.int16)), + ] + bdata = ElectricalBroadbandData( + bit_width=16, + sample_rate=36000, + t0=1234567890, + samples=sample_data, + is_signed=True + ) + + packed = node._pack(bdata) + seq = 0 + for c in range(3): + ch_data = sample_data[c] + chunks = chunk_channel_data(ch_data[1], MAX_CH_PAYLOAD_SIZE_BYTES) + + for chunk in chunks: + p = packed[seq] + unpacked = NDTPMessage.unpack(p) + assert unpacked.header.timestamp == bdata.t0 + assert unpacked.header.seq_number == seq + + assert unpacked.payload.bit_width == 16 + assert unpacked.payload.sample_rate == bdata.sample_rate + assert unpacked.payload.is_signed == bdata.is_signed + assert unpacked.payload.channels[0].channel_id == ch_data[0] + assert list(unpacked.payload.channels[0].channel_data) == list(chunk) + + seq += 1 + + sample_data = [ + (1, np.array([i for i in range(n_samples)], dtype=np.uint16)), + (2, np.array([i for i in range(n_samples)], dtype=np.uint16)), + (3, np.array([i for i in range(n_samples)], dtype=np.uint16)), + ] + bdata = ElectricalBroadbandData( + bit_width=16, + sample_rate=36000, + t0=1234567890, + samples=sample_data, + ) + + packed = node._pack(bdata) + seq2 = 0 + for c in range(3): + ch_data = sample_data[c] + chunks = chunk_channel_data(ch_data[1], MAX_CH_PAYLOAD_SIZE_BYTES) + + for chunk in chunks: + p = packed[seq2] + unpacked = NDTPMessage.unpack(p) + + assert unpacked.header.timestamp == bdata.t0 + assert unpacked.header.seq_number == seq + seq2 + + assert unpacked.payload.bit_width == 16 + assert unpacked.payload.sample_rate == bdata.sample_rate + assert unpacked.payload.is_signed == bdata.is_signed + assert unpacked.payload.channels[0].channel_id == ch_data[0] + assert list(unpacked.payload.channels[0].channel_data) == list(chunk) + + seq2 += 1 def test_packing_spiketrain_data(): diff --git a/synapse/utils/ndtp.pyx b/synapse/utils/ndtp.pyx index 4a54ce5..4c94650 100644 --- a/synapse/utils/ndtp.pyx +++ b/synapse/utils/ndtp.pyx @@ -14,8 +14,6 @@ from synapse.api.datatype_pb2 import DataType cdef int DATA_TYPE_K_BROADBAND = DataType.kBroadband cdef int DATA_TYPE_K_SPIKETRAIN = DataType.kSpiketrain -cdef object NDTPHeader_STRUCT = struct.Struct(" 0 else 0 + if buffer_length > 0: + if writing_bit_offset > 0: + bit_offset = (buffer_length - 1) * 8 + writing_bit_offset + else: + bit_offset = (buffer_length) * 8 cdef int total_bits_needed = bit_offset + num_bits_to_write cdef int total_bytes_needed = (total_bits_needed + 7) // 8 @@ -161,13 +162,14 @@ def to_ints( cdef int start cdef int value_index = 0 cdef int max_values = count if count > 0 else (data_len * 8) // bit_width + if max_values == 0: + raise ValueError(f"max_values must be > 0 (got {len(data)} data, {count} count, bit width {bit_width})") cdef int[::1] values_array = cython.view.array(shape=(max_values,), itemsize=cython.sizeof(cython.int), format="i") - cdef int bit_width_minus1 = bit_width - 1 - cdef int sign_bit = 1 << bit_width_minus1 - cdef uint8_t byte # Declare byte here, outside the loop + cdef int sign_bit = 1 << (bit_width - 1) + cdef uint8_t byte for byte_index in range(data_len): - byte = data_view[byte_index] # Initialize byte inside the loop + byte = data_view[byte_index] if byteorder == 'little': start = start_bit if byte_index == 0 else 0 @@ -282,22 +284,35 @@ cdef class NDTPPayloadBroadband: # Next three bytes: number of channels (24-bit integer) payload += n_channels.to_bytes(3, byteorder='big', signed=False) - # Next two bytes: sample rate (16-bit integer) - payload += struct.pack(">H", self.sample_rate) + # Next three bytes: sample rate (24-bit integer) + payload += self.sample_rate.to_bytes(3, byteorder='big', signed=False) cdef NDTPPayloadBroadbandChannelData c + bit_offset = 0 for c in self.channels: - # Pack channel_id (3 bytes, 24 bits) - payload += c.channel_id.to_bytes(3, byteorder='big', signed=False) + payload, bit_offset = to_bytes( + values=[c.channel_id], + bit_width=24, + is_signed=False, + existing=payload, + writing_bit_offset=bit_offset, + ) - # Pack number of samples (2 bytes, 16 bits) - payload += struct.pack(">H", len(c.channel_data)) + payload, bit_offset = to_bytes( + values=[len(c.channel_data)], + bit_width=16, + is_signed=False, + existing=payload, + writing_bit_offset=bit_offset, + ) - # Pack channel_data - channel_data_bytes, _ = to_bytes( - c.channel_data, self.bit_width, is_signed=self.is_signed + payload, bit_offset = to_bytes( + values=c.channel_data, + bit_width=self.bit_width, + is_signed=self.is_signed, + existing=payload, + writing_bit_offset=bit_offset, ) - payload += channel_data_bytes return payload @@ -305,52 +320,35 @@ cdef class NDTPPayloadBroadband: def unpack(data): if isinstance(data, bytes): data = bytearray(data) - + + cdef int payload_h_size = 7 cdef int len_data = len(data) - if len_data < 6: + if len_data < payload_h_size: raise ValueError( - f"Invalid broadband data size {len_data}: expected at least 6 bytes" + f"Invalid broadband data size {len_data}: expected at least {payload_h_size} bytes" ) cdef int bit_width = data[0] >> 1 cdef bint is_signed = (data[0] & 1) == 1 cdef int num_channels = int.from_bytes(data[1:4], 'big') - cdef int sample_rate = struct.unpack(">H", data[4:6])[0] - - cdef int pos = 6 # Starting byte position after the header + cdef int sample_rate = int.from_bytes(data[4:7], 'big') cdef list channels = [] cdef int channel_id, num_samples cdef list channel_data cdef NDTPPayloadBroadbandChannelData channel - for _ in range(num_channels): - # Unpack channel_id (3 bytes, big-endian) - if pos + 3 > len(data): - raise ValueError("Incomplete data for channel_id") - channel_id = int.from_bytes(data[pos:pos+3], 'big') - pos += 3 - - # Unpack num_samples (2 bytes, big-endian) - if pos + 2 > len(data): - raise ValueError("Incomplete data for num_samples") - num_samples = struct.unpack(">H", data[pos:pos+2])[0] - pos += 2 - - # Calculate the number of bits and bytes needed for channel data - total_bits = num_samples * bit_width - bytes_needed = (total_bits + 7) // 8 # Round up to the nearest byte - - # Ensure we have enough data - if pos + bytes_needed > len(data): - raise ValueError("Incomplete data for channel_data") - channel_data_bytes = data[pos:pos + bytes_needed] - pos += bytes_needed - - # Unpack channel_data - channel_data, _, _ = to_ints( - channel_data_bytes, bit_width, num_samples, is_signed=is_signed - ) + truncated = data[7:] + bit_offset = 0 + + for c in range(num_channels): + a_channel_id, bit_offset, truncated = to_ints(data=truncated, bit_width=24, count=1, start_bit=bit_offset, is_signed=False) + channel_id = a_channel_id[0] + + a_num_samples, bit_offset, truncated = to_ints(data=truncated, bit_width=16, count=1, start_bit=bit_offset, is_signed=False) + num_samples = a_num_samples[0] + + channel_data, bit_offset, truncated = to_ints(data=truncated, bit_width=bit_width, count=num_samples, start_bit=bit_offset, is_signed=is_signed) channel = NDTPPayloadBroadbandChannelData(channel_id, channel_data) channels.append(channel) @@ -403,7 +401,7 @@ cdef class NDTPPayloadSpiketrain: clamped_counts[i] = min(self.spike_counts[i], max_value) # Pack the number of spikes (4 bytes) - payload += struct.pack("I", spike_counts_len) # Pack clamped spike counts spike_counts_bytes, _ = to_bytes( @@ -423,7 +421,7 @@ cdef class NDTPPayloadSpiketrain: f"Invalid spiketrain data size {len_data}: expected at least 4 bytes" ) - cdef int num_spikes = struct.unpack("I", data[:4])[0] cdef bytearray payload = data[4:] cdef int bits_needed = num_spikes * NDTPPayloadSpiketrain_BIT_WIDTH cdef int bytes_needed = (bits_needed + 7) // 8 @@ -452,7 +450,7 @@ cdef class NDTPHeader: cdef public long long timestamp cdef public int seq_number - STRUCT = struct.Struct("BBQH") # Define as a Python class attribute def __init__(self, int data_type, long long timestamp, int seq_number): self.data_type = data_type @@ -500,6 +498,7 @@ cdef class NDTPHeader: cdef class NDTPMessage: cdef public NDTPHeader header cdef public object payload + cdef public int _crc16 def __init__(self, NDTPHeader header, payload=None): self.header = header @@ -537,8 +536,8 @@ cdef class NDTPMessage: message += header_bytes message += payload_bytes - crc = NDTPMessage.crc16(message) - crc_bytes = struct.pack("H", self._crc16) message += crc_bytes # Appending bytes to bytearray is acceptable @@ -549,7 +548,7 @@ cdef class NDTPMessage: if isinstance(data, bytes): data = bytearray(data) - cdef int header_size = NDTPHeader_STRUCT.size + cdef int header_size = NDTPHeader.STRUCT.size cdef NDTPHeader header cdef int crc16_value cdef object pbytes @@ -557,7 +556,7 @@ cdef class NDTPMessage: cdef object payload = None header = NDTPHeader.unpack(data[:header_size]) - crc16_value = struct.unpack("H", bytes(data[-2:]))[0] pbytes = data[header_size:-2] pdtype = header.data_type @@ -572,4 +571,6 @@ cdef class NDTPMessage: if not NDTPMessage.crc16_verify(data[:-2], crc16_value): raise ValueError(f"CRC16 verification failed (expected {crc16_value})") - return NDTPMessage(header, payload) + msg = NDTPMessage(header, payload) + msg._crc16 = crc16_value + return msg diff --git a/synapse/utils/ndtp_types.py b/synapse/utils/ndtp_types.py index 85641c3..80eeb5d 100644 --- a/synapse/utils/ndtp_types.py +++ b/synapse/utils/ndtp_types.py @@ -1,3 +1,4 @@ +import math from typing import List, Tuple, Union import numpy as np @@ -11,6 +12,15 @@ NDTPPayloadSpiketrain, ) +MAX_CH_PAYLOAD_SIZE_BYTES = 1400 + +def chunk_channel_data(ch_data: List[float], max_payload_size_bytes: int): + n_packets = math.ceil(len(ch_data) / max_payload_size_bytes) + n_pts_per_packet = math.ceil(len(ch_data) / n_packets) + for i in range(n_packets): + start_idx = i * n_pts_per_packet + end_idx = min(start_idx + n_pts_per_packet, len(ch_data)) + yield ch_data[start_idx:end_idx] class ElectricalBroadbandData: __slots__ = ["data_type", "t0", "is_signed", "bit_width", "samples", "sample_rate"] @@ -27,38 +37,49 @@ def pack(self, seq_number: int): packets = [] seq_number_offset = 0 - for ch_samples in self.samples: - packets.append( - NDTPMessage( - header=NDTPHeader( - data_type=DataType.kBroadband, - timestamp=self.t0, - seq_number=seq_number + seq_number_offset, - ), - payload=NDTPPayloadBroadband( - is_signed=self.is_signed, - bit_width=self.bit_width, - sample_rate=self.sample_rate, - channels=[ - NDTPPayloadBroadbandChannelData( - channel_id=ch_samples[0], channel_data=ch_samples[1] - ) - ], - ), - ).pack() - ) - seq_number_offset += 1 + try: + for ch_samples in self.samples: + ch_id = ch_samples[0] + ch_data = ch_samples[1] + if (len(ch_data) == 0): + continue + + for ch_sample_sub in chunk_channel_data(ch_data, MAX_CH_PAYLOAD_SIZE_BYTES): + msg = NDTPMessage( + header=NDTPHeader( + data_type=DataType.kBroadband, + timestamp=self.t0, + seq_number=seq_number + seq_number_offset, + ), + payload=NDTPPayloadBroadband( + is_signed=self.is_signed, + bit_width=self.bit_width, + sample_rate=self.sample_rate, + channels=[ + NDTPPayloadBroadbandChannelData( + channel_id=ch_id, channel_data=ch_sample_sub + ) + ], + ), + ) + packed = msg.pack() + packets.append(packed) + seq_number_offset += 1 + except Exception as e: + print(f"Error packing NDTP message: {e}") return packets @staticmethod def from_ndtp_message(msg: NDTPMessage): + dtype = np.int16 if msg.payload.is_signed else np.uint16 return ElectricalBroadbandData( t0=msg.header.timestamp, bit_width=msg.payload.bit_width, + is_signed=msg.payload.is_signed, sample_rate=msg.payload.sample_rate, samples=[ - (ch.channel_id, np.array(ch.channel_data, dtype=np.int16)) + (ch.channel_id, np.array(ch.channel_data, dtype=dtype)) for ch in msg.payload.channels ], )