Skip to content

Commit

Permalink
Merge pull request #426 from int-brain-lab/feature/7.1.0
Browse files Browse the repository at this point in the history
Feature/7.1.0
  • Loading branch information
micheleangelofabbri authored Nov 8, 2022
2 parents 146f2de + 1de28aa commit 8d7b9c2
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 4 deletions.
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
python -m unittest test_pybpod_config.py
# python -m unittest test_scripts.py
python -m unittest test_start_pybpod.py
python -m unittest test_spacers.py
python -m unittest test_task.py
# python -m unittest discover
Expand Down
2 changes: 1 addition & 1 deletion iblrig/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "7.0.5"
__version__ = "7.1.0"
import logging

import colorlog
Expand Down
117 changes: 117 additions & 0 deletions iblrig/spacer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
The purpose of this module is provide tools to generate and identify spacers.
Spacers are sequences of up and down pulses with a specific, identifiable pattern.
They are generated with a chirp coding to reduce cross-correlaation sidelobes.
They are used to mark the beginning of a behaviour sequence within a session.
Usage:
spacer = Spacer()
spacer.add_spacer_states(sma, t, next_state='first_state')
for i in range(ntrials):
sma.add_state(
state_name="first_state",
state_timer=tup,
state_change_conditions={"Tup": f"spacer_low_{i:02d}"},
output_actions=[("BNC1", 255)], # To FPGA
)
"""

import numpy as np


class Spacer(object):
"""
Computes spacer up times using a chirp up and down pattern
Returns a list of times for the spacer states
Each time corresponds to an up time of the BNC1 signal
dt_start: first spacer up time
dt_end: last spacer up time
n_pulses: number of spacer up times, one-sided (i.e. 8 means 16 - 1 spacers times)
tup: duration of the spacer up time
"""
def __init__(self, dt_start=.02, dt_end=.4, n_pulses=8, tup=.05):
self.dt_start = dt_start
self.dt_end = dt_end
self.n_pulses = n_pulses
self.tup = tup
assert np.all(np.diff(self.times) > self.tup), 'Spacers are overlapping'

def __repr__(self):
return f"Spacer(dt_start={self.dt_start}, dt_end={self.dt_end}, n_pulses={self.n_pulses}, tup={self.tup})"

@property
def times(self, latency=0):
"""
Computes spacer up times using a chirp up and down pattern
:return: numpy arrays of times
"""
# upsweep
t = np.linspace(self.dt_start, self.dt_end, self.n_pulses) + self.tup
# downsweep
t = np.r_[t, np.flipud(t[1:])]
t = np.cumsum(t)
return t

def generate_template(self, fs=1000):
"""
Generates a spacer voltage template to cross-correlate with a voltage trace
from a DAQ to detect a voltage trace
:return:
"""
t = self.times
ns = int((t[-1] + self.tup * 10) * fs)
sig = np.zeros(ns, )
sig[(t * fs).astype(np.int32)] = 1
sig[((t + self.tup) * fs).astype(np.int32)] = -1
sig = np.cumsum(sig)
return sig

def add_spacer_states(self, sma=None, next_state="exit"):
"""
Add spacer states to a state machine
:param sma: pybpodapi.state_machine.StateMachine object
:param next_state: name of the state following the spacer states
:return:
"""
assert next_state is not None
t = self.times
dt = np.diff(t, append=t[-1] + self.tup * 2)
for i, time in enumerate(t):
if sma is None:
print(i, time, dt[i])
continue
next_loop = f"spacer_high_{i + 1:02d}" if i < len(t) - 1 else next_state
sma.add_state(
state_name=f"spacer_high_{i:02d}",
state_timer=self.tup,
state_change_conditions={"Tup": f"spacer_low_{i:02d}"},
output_actions=[("BNC1", 255)], # To FPGA
)
sma.add_state(
state_name=f"spacer_low_{i:02d}",
state_timer=dt[i] - self.tup,
state_change_conditions={"Tup": next_loop},
output_actions=[],
)

def find_spacers(self, signal, threshold=0.9, fs=1000):
"""
Find spacers in a voltage time serie. Assumes that the signal is a digital signal between 0 and 1
:param signal:
:param threshold:
:param fs:
:return:
"""
template = self.generate_template(fs=fs)
xcor = np.correlate(signal, template, mode="full") / np.sum(template)
idetect = np.where(xcor > threshold)[0]
iidetect = np.cumsum(np.diff(idetect, prepend=0) > 1)
nspacers = iidetect[-1]
tspacer = np.zeros(nspacers)
for i in range(nspacers):
ispacer = idetect[iidetect == i + 1]
imax = np.argmax(xcor[ispacer])
tspacer[i] = (ispacer[imax] - template.size + 1) / fs
return tspacer
9 changes: 6 additions & 3 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# **Release notes**

## **Release Notes 7.1.0**
- spacer utility feature added for chaining multiple tasks

## **Release Notes 7.0.5**
- minor fix to windows pathing for experiment description gui to grab alyx username
- removal of unnecessary ONE version check in purge_rig_data script
- added bpod BNC1 output to several training tasks
- minor fix to windows pathing for experiment description gui to grab alyx username
- removal of unnecessary ONE version check in purge_rig_data script
- added bpod BNC1 output to several training tasks

## **Release Notes 7.0.4**
- modified call to experiment description gui to pass alyx username and subject information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import iblrig.frame2TTL as frame2TTL
import iblrig.misc as misc
import iblrig.params as params
import iblrig.spacer
import task_settings
import user_settings
from iblrig.bpod_helper import BpodMessageCreator, bpod_lights
Expand Down Expand Up @@ -97,7 +98,16 @@ def do_card_sound(card, sound_msg):
)
popup("WARNING!", msg) # Locks

# Start task by sending a spacer signal on BNC1
log.info("Starting task by sending a spacer signal on BNC1")
sma = StateMachine(bpod)
spacer = iblrig.spacer.Spacer()
spacer.add_spacer_states(sma, next_state="exit")
bpod.send_state_machine(sma)
bpod.run_state_machine(sma) # Locks until state machine 'exit' is reached

# Run the passive part i.e. spontaneous activity and RFMapping stim
log.info("Run the passive part ie. spontaneous activity and RFMapping stim")
bonsai.start_passive_visual_stim(
sph.SESSION_RAW_DATA_FOLDER,
display_idx=sph.PARAMS["DISPLAY_IDX"],
Expand All @@ -107,6 +117,7 @@ def do_card_sound(card, sound_msg):
) # Locks

# start Bonsai stim workflow
log.info("Run the visual stimulus mapping")
bonsai.start_visual_stim(sph)
time.sleep(5)
log.info("Starting replay of task stims")
Expand Down
34 changes: 34 additions & 0 deletions test_iblrig/test_spacers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import numpy as np
import unittest

from iblrig.spacer import Spacer


class TestSpacer(unittest.TestCase):

def test_spacer(self):
spacer = Spacer(dt_start=.02, dt_end=.4, n_pulses=8, tup=.05)
np.testing.assert_equal(spacer.times.size, 15)
sig = spacer.generate_template(fs=1000)
ac = np.correlate(sig, sig, 'full') / np.sum(sig**2)
# import matplotlib.pyplot as plt
# plt.plot(ac)
# plt.show()
ac[sig.size-100: sig.size + 100] = 0 # remove the main peak
# the autocorrelation side lobes should be less than 30%
assert np.max(ac) < .3

def test_find_spacers(self):
"""
Generates a fake signal with 2 spacers and finds them
:return:
"""
fs = 1000
spacer = Spacer(dt_start=.02, dt_end=.4, n_pulses=8, tup=.05)
start_times = [4.38, 96.58]
template = spacer.generate_template(fs)
signal = np.zeros(int(start_times[-1] * fs + template.size * 2))
for start_time in start_times:
signal[int(start_time * fs): int(start_time * fs) + template.size] = template
spacer_times = spacer.find_spacers(signal, fs=fs)
np.testing.assert_allclose(spacer_times, start_times)

0 comments on commit 8d7b9c2

Please sign in to comment.