Skip to content

Commit

Permalink
chore: support signed channel data
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniaelsen committed Sep 24, 2024
1 parent 921de75 commit 0bd995c
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 15 deletions.
2 changes: 1 addition & 1 deletion synapse-api
20 changes: 19 additions & 1 deletion synapse/tests/test_ndtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@ def test_to_bytes():
assert to_bytes([1, 2, 3, 2, 1], bit_width=2) == (bytearray(b'\x6E\x40'), 2)

assert to_bytes([7, 5, 3, 1], bit_width=12) == (bytearray(b'\x00\x70\x05\x00\x30\x01'), 0)

assert to_bytes([-7, -5, -3, -1], bit_width=12, signed=True) == (bytearray(b'\xFF\x9F\xFB\xFF\xDF\xFF'), 0)

assert to_bytes(
[7, 5, 3],
bit_width=12,
existing=bytearray(b'\x01\x00'),
writing_bit_offset=4
) == (bytearray(b'\x01\x00\x07\x00\x50\x03'), 0)

assert to_bytes(
[-7, -5, -3],
bit_width=12,
existing=bytearray(b'\x01\x00'),
writing_bit_offset=4,
signed=True
) == (bytearray(b'\x01\x0F\xF9\xFF\xBF\xFD'), 0)

assert to_bytes([7, 5, 3], bit_width=12) == (bytearray(b'\x00p\x05\x000'), 4)

Expand Down Expand Up @@ -70,6 +80,10 @@ def test_to_ints():
assert res == [7, 5, 3]
assert offset == 36 + 4

res, offset, _ = to_ints(b'\xFF\xF9\xFF\xBF\xFD', 12, 3, 4, signed=True)
assert res == [-7, -5, -3]
assert offset == 36 + 4

arry = bytearray(b'\x6E\x40')
res, offset, arry = to_ints(arry, 2, 1)
assert res == [1]
Expand Down Expand Up @@ -102,6 +116,8 @@ def test_to_ints():

def test_ndtp_payload_broadband():
bit_width = 12
sample_rate = 3,
signed = False,
channels = [
NDTPPayloadBroadband.ChannelData(
channel_id=0,
Expand All @@ -117,7 +133,7 @@ def test_ndtp_payload_broadband():
)
]

payload = NDTPPayloadBroadband(bit_width, channels)
payload = NDTPPayloadBroadband(signed, bit_width, sample_rate, channels)
p = payload.pack()

u = NDTPPayloadBroadband.unpack(p)
Expand Down Expand Up @@ -212,6 +228,8 @@ def test_ndtp_message():
header = NDTPHeader(DataType.kBroadband, timestamp=1234567890, seq_number=42)
payload = NDTPPayloadBroadband(
bit_width=12,
sample_rate=100,
signed=False,
channels=[
NDTPPayloadBroadband.ChannelData(
channel_id=c,
Expand Down
1 change: 1 addition & 0 deletions synapse/tests/test_stream_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def test_packing_broadband_data():
bdata = ElectricalBroadbandData(
bit_width = 12,
t0 = 1234567890,
sample_rate=3,
channels = [
ElectricalBroadbandData.ChannelData(
channel_id=0,
Expand Down
51 changes: 38 additions & 13 deletions synapse/utils/ndtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ def bytes_to_binary(data: bytes, start_bit: int) -> str:
Can append to an existing byte array, and will correctly handle the case where
the end of the existing array is not byte aligned (and may contain a partial byte at the end).
"""


def to_bytes(
values: List[int],
bit_width: int,
existing: bytearray = None,
writing_bit_offset: int = 0,
signed: bool = False,
) -> Tuple[bytearray, int]:
if bit_width <= 0:
raise ValueError("bit width must be > 0")
Expand All @@ -60,8 +59,20 @@ def to_bytes(
bits_in_current_byte = writing_bit_offset

for value in values:
if value < 0 or value >= (1 << bit_width):
raise ValueError(f"value {value} doesn't fit in {bit_width} bits")
if signed:
min_value = -(1 << (bit_width - 1))
max_value = (1 << (bit_width - 1)) - 1
if value < min_value or value > max_value:
raise ValueError(f"signed value {value} doesn't fit in {bit_width} bits")
# Convert to two's complement representation
if value < 0:
value = (1 << bit_width) + value
else:
if value < 0:
raise ValueError("unsigned packing specified, but value is negative")

if value >= (1 << bit_width):
raise ValueError(f"unsigned value {value} doesn't fit in {bit_width} bits")

remaining_bits = bit_width
while remaining_bits > 0:
Expand Down Expand Up @@ -94,13 +105,16 @@ def to_bytes(
"""
Parses a list of integers from a byte array, using the specified bit width for each value.
A 'count' parameter can be used if the number of values and their bit widths do not pack neatly into whole bytes.
A 'count' parameter can be used if the expected number of values does not pack neatly into whole byte, with their given bit_width.
"""


import math

def to_ints(
data: bytes, bit_width: int, count: int = 0, start_bit: int = 0
) -> Tuple[List[int], int]:
data: bytes, bit_width: int, count: int = 0, start_bit: int = 0,
signed: bool = False
) -> Tuple[List[int], int, bytes]:
if bit_width <= 0:
raise ValueError("bit width must be > 0")

Expand All @@ -109,7 +123,7 @@ def to_ints(

data = data[truncate_bytes:]

if len(data) < bit_width * count / 8:
if count > 0 and len(data) < math.ceil(bit_width * count / 8):
raise ValueError(
f"insufficient data for {count} x {bit_width} bit values (expected {math.ceil(bit_width * count / 8)} bytes, given {len(data)} bytes)"
)
Expand All @@ -129,7 +143,11 @@ def to_ints(
total_bits_read += 1

if bits_in_current_value == bit_width:
values.append(current_value & mask)
if signed and (current_value & (1 << (bit_width - 1))):
current_value = current_value - (1 << bit_width)
else:
current_value = current_value & mask
values.append(current_value)
current_value = 0
bits_in_current_value = 0

Expand All @@ -139,7 +157,11 @@ def to_ints(

if bits_in_current_value > 0:
if bits_in_current_value == bit_width:
values.append(current_value & mask)
if signed and (current_value & (1 << (bit_width - 1))):
current_value = current_value - (1 << bit_width)
else:
current_value = current_value & mask
values.append(current_value)
elif count == 0:
raise ValueError(
f"{bits_in_current_value} bits left over, not enough to form a complete value of bit width {bit_width}"
Expand All @@ -159,6 +181,7 @@ class ChannelData:
channel_id: int # 24-bit
channel_data: List[int] # bit_width * num_samples bits

signed: bool
bit_width: int
sample_rate: int
channels: List[ChannelData]
Expand All @@ -179,7 +202,7 @@ def pack(self):
n_channels & 0xFF,
)

payload += struct.pack("<H", self.sample_rate & 0xFFFF)
payload += struct.pack("<H", self.sample_rate)

# packed data is not byte aligned, we do not add padding, so we need to pack the data manually
b_offset = 0
Expand All @@ -204,8 +227,9 @@ def unpack(data: bytes):
bit_width = struct.unpack("<B", data[0:1])[0] >> 1
signed = (struct.unpack("<B", data[0:1])[0] & 1) == 1
num_channels = (data[1] << 16) | (data[2] << 8) | data[3]
sample_rate = struct.unpack("<H", data[4:6])

payload = data[4:]
payload = data[6:]
end_bit = 0

channels = []
Expand All @@ -225,10 +249,11 @@ def unpack(data: bytes):
NDTPPayloadBroadband.ChannelData(
channel_id=channel_id,
channel_data=channel_data,
sample_rate=sample_rate
)
)

return NDTPPayloadBroadband(bit_width, channels)
return NDTPPayloadBroadband(signed, bit_width, sample_rate, channels)


@dataclass
Expand Down

0 comments on commit 0bd995c

Please sign in to comment.