Skip to content

Commit

Permalink
Added new annotation labels for bit_duration, high_period, and low_pe…
Browse files Browse the repository at this point in the history
…riod.

Split 4th bit(white component) into its own annotation with options to display Hex Decimal or Percent.
Added option to display first 3 bits(RGB components) in decimal format
Moved several repetitive tasks to their own methods.
Improved the handling of RESET conditions within the decode method, ensuring accurate annotation and timing.
  • Loading branch information
lsellens committed Jul 11, 2024
1 parent 06f09f6 commit 12743b4
Showing 1 changed file with 142 additions and 89 deletions.
231 changes: 142 additions & 89 deletions decoders/rgb_led_clockless/pd.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ class SamplerateError(Exception):
class DecoderError(Exception):
pass

# Define annotation constants for easy reference.
(
ANN_BIT, ANN_RESET, ANN_RGB,
ANN_BIT, ANN_RESET, ANN_RGB, ANN_W,
ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,
) = range(7)
ANN_BIT_DURATION, ANN_HIGH_PERIOD, ANN_LOW_PERIOD
) = range(11)

class Decoder(srd.Decoder):
api_version = 3
Expand All @@ -76,15 +78,21 @@ class Decoder(srd.Decoder):
('bit', 'Bit'),
('reset', 'RESET'),
('rgb', 'RGB'),
('w', 'W'),
('r', 'R'),
('g', 'G'),
('b', 'B'),
('w', 'W'),
('w_comp', 'W'),
('bit_duration', 'Bit Duration'),
('high_period', 'High Period'),
('low_period', 'Low Period'),
)
annotation_rows = (
('bit-timing', 'Bit Timing', (ANN_HIGH_PERIOD, ANN_LOW_PERIOD,)),
('bits', 'Bits', (ANN_BIT, ANN_RESET,)),
('bit-duration', 'Bit Duration', (ANN_BIT_DURATION,)),
('rgb-comps', 'RGB components', (ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,)),
('rgb-vals', 'RGB values', (ANN_RGB,)),
('rgb-vals', 'RGB values', (ANN_RGB, ANN_W,)),
)
options = (
{'id': 'led_type', 'desc': 'LED Type',
Expand All @@ -96,63 +104,80 @@ class Decoder(srd.Decoder):
{'id': 'is_rgbw', 'desc': 'Is this RGBW?',
'default': 'False', 'values': ('True', 'False')},
{'id': 'textorder', 'desc': 'Components output order (text)',
'default': 'RGB[W]', 'values': ('wire', 'RGB[W]', 'RGB')},
'default': 'RGB[W]', 'values': ('wire', 'RGB[W]')},
{'id': 'rgb_text_format', 'desc': 'RGB Text Format',
'default': 'hex', 'values': ('hex', 'decimal')},
{'id': 'w_text_format', 'desc': 'W Text Format',
'default': 'hex', 'values': ('hex', 'decimal', 'percentage')},
)

def __init__(self):
self.reset()

def reset(self):
"""Reset internal state for a new decoding session."""
self.samplerate = None
self.bits = []
self.bit_start_sample = None
self.inversion_point_sample = None
self.bit_end_sample = None
self.is_processing_bit = False
self.is_looking_for_reset = False

def preprocess_options(self):
"""Process user options and prepare internal settings based on them."""
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')

# Preprocess wire order
# Get the wireorder, textorder, and determine if RGBW mode is used.
wireorder = self.options['wireorder'].lower()

# Determine if RGBW is selected
is_rgbw = self.options['is_rgbw'].lower() == 'true'
self.is_rgbw = self.options['is_rgbw'].lower() == 'true'
textorder = self.options['textorder'].lower()

# Prepare wire format based on RGBW option
self.wireformat = [c for c in wireorder if c in 'rgb']
if is_rgbw:
if self.is_rgbw:
self.wireformat.append('w')

# Calculate the number of bits needed based on the wire format
self.need_bits = len(self.wireformat) * 8

# Handle the output text format
textorder = self.options['textorder'].lower()
if textorder == 'wire':
self.textformat = 'wire'
elif textorder == 'rgb[w]':
self.textformat = '#{r:02x}{g:02x}{b:02x}{wt:s}'
self.textformat = 'wire' if textorder == 'wire' else '#{r:02x}{g:02x}{b:02x}'

# Determine settings based on the LED type.
led_type = self.options['led_type'].lower()
# Constants for bit timing and reset detection
if led_type == 'ws281x':
self.DUTY_CYCLE_THRESHOLD = 0.5 # 50% threshold for distinguishing bit values
self.RESET_CODE_TIMING = round(self.samplerate * 50e-6)
elif led_type == 'sk6812':
self.DUTY_CYCLE_THRESHOLD = 0.375 # 37.5% threshold for distinguishing bit values
self.RESET_CODE_TIMING = round(self.samplerate * 80e-6)
else:
# Default RGB format string
self.textformat = '#{r:02x}{g:02x}{b:02x}'
raise DecoderError(f'Unsupported LED Type: {led_type}')
self.BIT_PERIOD = 1.25e-6 # 1.25 microseconds for WS281x and SK681

def start(self):
"""Initialize decoder output and prepare options."""
self.out_ann = self.register(srd.OUTPUT_ANN)
self.preprocess_options() # Preprocess options when the decoding starts

def metadata(self, key, value):
"""Receive and store metadata such as samplerate."""
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value

def putg(self, ss, es, cls, text):
"""Helper method to output annotated data."""
self.put(ss, es, self.out_ann, [cls, text])

def handle_bits(self):
"""Process and interpret collected bits into RGB(W) values."""
if len(self.bits) < self.need_bits:
return

# Determine the start and end sample numbers for the packet
ss_packet, es_packet = self.bits[0][1], self.bits[-1][2]

# Initialize RGB and W component values
ss_rgb_packet, es_rgb_packet = self.bits[0][1], self.bits[-1][2]
r, g, b, w = 0, 0, 0, None
comps = []

Expand All @@ -163,12 +188,19 @@ def handle_bits(self):
comp_ss, comp_es = comp_bits[0][1], comp_bits[-1][2]
comp_value = bitpack_msb(comp_bits, 0)
comp_text = '{:02x}'.format(comp_value)

# Annotation class selection for the component.
comp_ann = {
'r': ANN_COMP_R, 'g': ANN_COMP_G,
'b': ANN_COMP_B, 'w': ANN_COMP_W,
}.get(c.lower(), None)

if comp_ann is not None:
self.putg(comp_ss, comp_es, comp_ann, [comp_text])

comps.append((comp_ss, comp_es, comp_ann, comp_value, comp_text))

# Assign the value to the appropriate RGBW component.
if c.lower() == 'r':
r = comp_value
elif c.lower() == 'g':
Expand All @@ -178,106 +210,127 @@ def handle_bits(self):
elif c.lower() == 'w':
w = comp_value

# Determine the wt (white component text) for formatting
wt = '' if w is None else '{:02x}'.format(w)

# Format the RGB text for annotation
if self.textformat == 'wire':
rgb_text = '#' + ''.join([c[-1] for c in comps])
rgb_text = '#' + ''.join([comp[4] for comp in comps])
if rgb_text:
self.putg(ss_rgb_packet, es_rgb_packet, ANN_RGB, [rgb_text])
else:
rgb_text = self.textformat.format(r=r, g=g, b=b, w=w, wt=wt)

# Annotate each component and the RGB value
for ss_comp, es_comp, cls_comp, _, text_comp in comps:
self.putg(ss_comp, es_comp, cls_comp, [text_comp])

if rgb_text:
self.putg(ss_packet, es_packet, ANN_RGB, [rgb_text])
# Output the RGB part.
ss_rgb_end = self.bits[23][2] if len(self.bits) >= 24 else es_rgb_packet
ss_w_start = self.bits[24][1] if len(self.bits) > 24 else es_rgb_packet
es_w_end = self.bits[-1][2]

rgb_text_format = self.options['rgb_text_format']
if rgb_text_format == 'hex':
rgb_text = self.textformat.format(r=r, g=g, b=b)
elif rgb_text_format == 'decimal':
rgb_text = 'RGB({},{},{})'.format(r, g, b)

if rgb_text:
self.putg(ss_rgb_packet, ss_rgb_end, ANN_RGB, [rgb_text])

# Output the W component if in RGBW mode.
if self.is_rgbw and w is not None:
w_text_format = self.options['w_text_format']
if w_text_format == 'hex':
w_text = '{:02x}'.format(w)
elif w_text_format == 'decimal':
w_text = str(w)
elif w_text_format == 'percentage':
w_text = f'{w * 100 // 255}%'

self.putg(ss_w_start, es_w_end, ANN_W, [w_text])

# Clear the bits list after processing the packet
self.bits.clear()

def handle_bit(self, ss, es, value, ann_late=False):
def handle_bit(self, bit_start_sample, bit_end_sample, bit_value, ann_late=False):
"""Process a single bit and manage its annotations."""
if not ann_late:
text = ['{:d}'.format(value)]
self.putg(ss, es, ANN_BIT, text)
self.putg(bit_start_sample, bit_end_sample, ANN_BIT, ['{:d}'.format(bit_value)])

self.bits.append((value, ss, es))
self.bits.append((bit_value, bit_start_sample, bit_end_sample))
self.handle_bits()

if ann_late:
text = ['{:d}'.format(value)]
self.putg(ss, es, ANN_BIT, text)
self.putg(bit_start_sample, bit_end_sample, ANN_BIT, ['{:d}'.format(bit_value)])

def annotate_bit_timing(self, bit_start_sample, bit_end_sample, inversion_point_sample):
"""Calculate and annotate timing details for a bit."""
bit_duration_us = self.convert_samples_to_time(bit_end_sample - bit_start_sample, 'us')
high_period_us = self.convert_samples_to_time(inversion_point_sample - bit_start_sample, 'us')
low_period_us = self.convert_samples_to_time(bit_end_sample - inversion_point_sample, 'us')

self.putg(bit_start_sample, bit_end_sample, ANN_BIT_DURATION, [f'{bit_duration_us:.2f} µs'])
self.putg(bit_start_sample, inversion_point_sample, ANN_HIGH_PERIOD, [f'{high_period_us:.2f} µs'])
self.putg(inversion_point_sample, bit_end_sample, ANN_LOW_PERIOD, [f'{low_period_us:.2f} µs'])

def process_bit(self, bit_start_sample, bit_end_sample, inversion_point_sample):
"""Process a bit and update annotations."""
period = bit_end_sample - bit_start_sample
high_time = inversion_point_sample - bit_start_sample
bit_value = 1 if (high_time / period) > self.DUTY_CYCLE_THRESHOLD else 0

self.handle_bit(bit_start_sample, bit_end_sample, bit_value)
self.annotate_bit_timing(bit_start_sample, bit_end_sample, inversion_point_sample)

def decode(self):
led_type = self.options['led_type'].lower()
# Constants for bit timing and reset detection
if led_type == 'ws281x':
DUTY_CYCLE_THRESHOLD = 0.5 # 50% threshold for distinguishing bit values
RESET_CODE_TIMING = round(self.samplerate * 50e-6)
elif led_type == 'sk6812':
DUTY_CYCLE_THRESHOLD = 0.375 # 37.5% threshold for distinguishing bit values
RESET_CODE_TIMING = round(self.samplerate * 80e-6)
else:
raise DecoderError('Unsupported LED Type')
BIT_PERIOD = 1.25e-6 # 1.25 microseconds for WS281x and SK6812
HALF_BIT_PERIOD = int(self.samplerate * (BIT_PERIOD / 2))
"""Main decoding loop to interpret the input signal."""

# Conditions for bit and reset detection
cond_bit_starts = {0: 'r'}
cond_inbit_edge = {0: 'f'}
cond_reset_pulse = {'skip': RESET_CODE_TIMING + 1}
cond_reset_pulse = {'skip': self.RESET_CODE_TIMING + 1}
conds = [cond_bit_starts, cond_inbit_edge, cond_reset_pulse]

ss_bit, inv_bit, es_bit = None, None, None
self.bit_start_sample, self.inversion_point_sample, self.bit_end_sample = None, None, None
pin, = self.wait({0: 'l'})
inv_bit = self.samplenum
check_reset = False
self.inversion_point_sample = self.samplenum
self.is_processing_bit = False
self.is_looking_for_reset = False

while True:
pin, = self.wait(conds)

# Check for RESET condition
if check_reset and self.matched[2]:
es_bit = inv_bit
ss_rst, es_rst = inv_bit, self.samplenum

if ss_bit and inv_bit:
# Decode the last bit value
duty = inv_bit - ss_bit
period = self.samplerate * BIT_PERIOD
thres = period * DUTY_CYCLE_THRESHOLD
bit_value = 1 if duty >= thres else 0
if self.is_looking_for_reset and self.matched[2]:
self.bit_end_sample = self.inversion_point_sample
reset_start_sample, reset_end_sample = self.inversion_point_sample, self.samplenum

if self.bit_start_sample and self.inversion_point_sample:
# Extend the last bit's annotation to the expected end of the bit period
expected_es_bit = ss_bit + int(self.samplerate * BIT_PERIOD)
self.handle_bit(ss_bit, expected_es_bit, bit_value, True)
expected_bit_end_sample = self.bit_start_sample + int(self.samplerate * self.BIT_PERIOD)
self.process_bit(self.bit_start_sample, expected_bit_end_sample, self.inversion_point_sample)

# Update the start and end of the reset to be after the bit period
ss_rst = expected_es_bit
es_rst = expected_es_bit + RESET_CODE_TIMING
reset_start_sample = expected_bit_end_sample
reset_end_sample = expected_bit_end_sample + self.RESET_CODE_TIMING

# Annotate RESET after the extended bit period
if ss_rst and es_rst:
text = ['RESET', 'RST', 'R']
self.putg(ss_rst, es_rst, ANN_RESET, text)
if reset_start_sample and reset_end_sample:
self.putg(reset_start_sample, reset_end_sample, ANN_RESET, ['RESET', 'RST', 'R'])

check_reset = False
self.is_looking_for_reset = False
self.bits.clear()
ss_bit, inv_bit, es_bit = None, None, None
self.bit_start_sample, self.inversion_point_sample, self.bit_end_sample = None, None, None

# Bit value detection logic
if self.matched[0]: # Rising edge starts a bit time
check_reset = False
if ss_bit and inv_bit:
es_bit = self.samplenum
period = es_bit - ss_bit
duty = inv_bit - ss_bit
bit_value = 1 if (duty / period) > DUTY_CYCLE_THRESHOLD else 0
self.handle_bit(ss_bit, es_bit, bit_value)
ss_bit, inv_bit, es_bit = self.samplenum, None, None

if self.matched[1]: # Falling edge ends its high period
check_reset = True
inv_bit = self.samplenum
if self.matched[0]: # Rising edge indicates the start of a bit
self.is_processing_bit = True
self.is_looking_for_reset = False

if self.bit_start_sample and self.inversion_point_sample:
self.bit_end_sample = self.samplenum
self.process_bit(self.bit_start_sample, self.bit_end_sample, self.inversion_point_sample)

self.bit_start_sample, self.inversion_point_sample, self.bit_end_sample = self.samplenum, None, None

if self.matched[1]: # Falling edge indicates the end of high period in bit
self.is_looking_for_reset = True
self.is_processing_bit = False
self.inversion_point_sample = self.samplenum

def convert_samples_to_time(self, samples, unit='us'):
"""Convert sample counts to time units based on the current samplerate."""
factor = 1e6 if unit == 'us' else 1e3
return (samples / self.samplerate) * factor

0 comments on commit 12743b4

Please sign in to comment.