diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index ac7576b5c..eab9236e9 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -64,6 +64,7 @@ jobs: python -m unittest test_pybpod_config.py # python -m unittest test_scripts.py python -m unittest test_start_pybpod.py + python -m unittest test_spacers.py python -m unittest test_task.py # python -m unittest discover diff --git a/iblrig/__init__.py b/iblrig/__init__.py index 4d593b4c2..7ddf1ad5a 100644 --- a/iblrig/__init__.py +++ b/iblrig/__init__.py @@ -1,4 +1,4 @@ -__version__ = "7.0.5" +__version__ = "7.1.0" import logging import colorlog diff --git a/iblrig/spacer.py b/iblrig/spacer.py new file mode 100644 index 000000000..e39283c42 --- /dev/null +++ b/iblrig/spacer.py @@ -0,0 +1,117 @@ +""" +The purpose of this module is provide tools to generate and identify spacers. + +Spacers are sequences of up and down pulses with a specific, identifiable pattern. +They are generated with a chirp coding to reduce cross-correlaation sidelobes. +They are used to mark the beginning of a behaviour sequence within a session. + +Usage: + spacer = Spacer() + spacer.add_spacer_states(sma, t, next_state='first_state') + for i in range(ntrials): + sma.add_state( + state_name="first_state", + state_timer=tup, + state_change_conditions={"Tup": f"spacer_low_{i:02d}"}, + output_actions=[("BNC1", 255)], # To FPGA + ) +""" + +import numpy as np + + +class Spacer(object): + """ + Computes spacer up times using a chirp up and down pattern + Returns a list of times for the spacer states + Each time corresponds to an up time of the BNC1 signal + + dt_start: first spacer up time + dt_end: last spacer up time + n_pulses: number of spacer up times, one-sided (i.e. 8 means 16 - 1 spacers times) + tup: duration of the spacer up time + """ + def __init__(self, dt_start=.02, dt_end=.4, n_pulses=8, tup=.05): + self.dt_start = dt_start + self.dt_end = dt_end + self.n_pulses = n_pulses + self.tup = tup + assert np.all(np.diff(self.times) > self.tup), 'Spacers are overlapping' + + def __repr__(self): + return f"Spacer(dt_start={self.dt_start}, dt_end={self.dt_end}, n_pulses={self.n_pulses}, tup={self.tup})" + + @property + def times(self, latency=0): + """ + Computes spacer up times using a chirp up and down pattern + :return: numpy arrays of times + """ + # upsweep + t = np.linspace(self.dt_start, self.dt_end, self.n_pulses) + self.tup + # downsweep + t = np.r_[t, np.flipud(t[1:])] + t = np.cumsum(t) + return t + + def generate_template(self, fs=1000): + """ + Generates a spacer voltage template to cross-correlate with a voltage trace + from a DAQ to detect a voltage trace + :return: + """ + t = self.times + ns = int((t[-1] + self.tup * 10) * fs) + sig = np.zeros(ns, ) + sig[(t * fs).astype(np.int32)] = 1 + sig[((t + self.tup) * fs).astype(np.int32)] = -1 + sig = np.cumsum(sig) + return sig + + def add_spacer_states(self, sma=None, next_state="exit"): + """ + Add spacer states to a state machine + :param sma: pybpodapi.state_machine.StateMachine object + :param next_state: name of the state following the spacer states + :return: + """ + assert next_state is not None + t = self.times + dt = np.diff(t, append=t[-1] + self.tup * 2) + for i, time in enumerate(t): + if sma is None: + print(i, time, dt[i]) + continue + next_loop = f"spacer_high_{i + 1:02d}" if i < len(t) - 1 else next_state + sma.add_state( + state_name=f"spacer_high_{i:02d}", + state_timer=self.tup, + state_change_conditions={"Tup": f"spacer_low_{i:02d}"}, + output_actions=[("BNC1", 255)], # To FPGA + ) + sma.add_state( + state_name=f"spacer_low_{i:02d}", + state_timer=dt[i] - self.tup, + state_change_conditions={"Tup": next_loop}, + output_actions=[], + ) + + def find_spacers(self, signal, threshold=0.9, fs=1000): + """ + Find spacers in a voltage time serie. Assumes that the signal is a digital signal between 0 and 1 + :param signal: + :param threshold: + :param fs: + :return: + """ + template = self.generate_template(fs=fs) + xcor = np.correlate(signal, template, mode="full") / np.sum(template) + idetect = np.where(xcor > threshold)[0] + iidetect = np.cumsum(np.diff(idetect, prepend=0) > 1) + nspacers = iidetect[-1] + tspacer = np.zeros(nspacers) + for i in range(nspacers): + ispacer = idetect[iidetect == i + 1] + imax = np.argmax(xcor[ispacer]) + tspacer[i] = (ispacer[imax] - template.size + 1) / fs + return tspacer diff --git a/release_notes.md b/release_notes.md index 4cd726148..48b06fd68 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,9 +1,12 @@ # **Release notes** +## **Release Notes 7.1.0** +- spacer utility feature added for chaining multiple tasks + ## **Release Notes 7.0.5** - - minor fix to windows pathing for experiment description gui to grab alyx username - - removal of unnecessary ONE version check in purge_rig_data script - - added bpod BNC1 output to several training tasks +- minor fix to windows pathing for experiment description gui to grab alyx username +- removal of unnecessary ONE version check in purge_rig_data script +- added bpod BNC1 output to several training tasks ## **Release Notes 7.0.4** - modified call to experiment description gui to pass alyx username and subject information diff --git a/tasks/_iblrig_tasks_passiveChoiceWorldIndependent/_iblrig_tasks_passiveChoiceWorldIndependent.py b/tasks/_iblrig_tasks_passiveChoiceWorldIndependent/_iblrig_tasks_passiveChoiceWorldIndependent.py index 41a3b50f1..dafa2dbb1 100644 --- a/tasks/_iblrig_tasks_passiveChoiceWorldIndependent/_iblrig_tasks_passiveChoiceWorldIndependent.py +++ b/tasks/_iblrig_tasks_passiveChoiceWorldIndependent/_iblrig_tasks_passiveChoiceWorldIndependent.py @@ -10,6 +10,7 @@ import iblrig.frame2TTL as frame2TTL import iblrig.misc as misc import iblrig.params as params +import iblrig.spacer import task_settings import user_settings from iblrig.bpod_helper import BpodMessageCreator, bpod_lights @@ -97,7 +98,16 @@ def do_card_sound(card, sound_msg): ) popup("WARNING!", msg) # Locks +# Start task by sending a spacer signal on BNC1 +log.info("Starting task by sending a spacer signal on BNC1") +sma = StateMachine(bpod) +spacer = iblrig.spacer.Spacer() +spacer.add_spacer_states(sma, next_state="exit") +bpod.send_state_machine(sma) +bpod.run_state_machine(sma) # Locks until state machine 'exit' is reached + # Run the passive part i.e. spontaneous activity and RFMapping stim +log.info("Run the passive part ie. spontaneous activity and RFMapping stim") bonsai.start_passive_visual_stim( sph.SESSION_RAW_DATA_FOLDER, display_idx=sph.PARAMS["DISPLAY_IDX"], @@ -107,6 +117,7 @@ def do_card_sound(card, sound_msg): ) # Locks # start Bonsai stim workflow +log.info("Run the visual stimulus mapping") bonsai.start_visual_stim(sph) time.sleep(5) log.info("Starting replay of task stims") diff --git a/test_iblrig/test_spacers.py b/test_iblrig/test_spacers.py new file mode 100644 index 000000000..87d217c78 --- /dev/null +++ b/test_iblrig/test_spacers.py @@ -0,0 +1,34 @@ +import numpy as np +import unittest + +from iblrig.spacer import Spacer + + +class TestSpacer(unittest.TestCase): + + def test_spacer(self): + spacer = Spacer(dt_start=.02, dt_end=.4, n_pulses=8, tup=.05) + np.testing.assert_equal(spacer.times.size, 15) + sig = spacer.generate_template(fs=1000) + ac = np.correlate(sig, sig, 'full') / np.sum(sig**2) + # import matplotlib.pyplot as plt + # plt.plot(ac) + # plt.show() + ac[sig.size-100: sig.size + 100] = 0 # remove the main peak + # the autocorrelation side lobes should be less than 30% + assert np.max(ac) < .3 + + def test_find_spacers(self): + """ + Generates a fake signal with 2 spacers and finds them + :return: + """ + fs = 1000 + spacer = Spacer(dt_start=.02, dt_end=.4, n_pulses=8, tup=.05) + start_times = [4.38, 96.58] + template = spacer.generate_template(fs) + signal = np.zeros(int(start_times[-1] * fs + template.size * 2)) + for start_time in start_times: + signal[int(start_time * fs): int(start_time * fs) + template.size] = template + spacer_times = spacer.find_spacers(signal, fs=fs) + np.testing.assert_allclose(spacer_times, start_times)