-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #425 from int-brain-lab/spacer
Spacer
- Loading branch information
Showing
6 changed files
with
169 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
__version__ = "7.0.4" | ||
__version__ = "7.0.5" | ||
import logging | ||
|
||
import colorlog | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
""" | ||
The purpose of this module is provide tools to generate and identify spacers. | ||
Spacers are sequences of up and down pulses with a specific, identifiable pattern. | ||
They are generated with a chirp coding to reduce cross-correlaation sidelobes. | ||
They are used to mark the beginning of a behaviour sequence within a session. | ||
Usage: | ||
spacer = Spacer() | ||
spacer.add_spacer_states(sma, t, next_state='first_state') | ||
for i in range(ntrials): | ||
sma.add_state( | ||
state_name="first_state", | ||
state_timer=tup, | ||
state_change_conditions={"Tup": f"spacer_low_{i:02d}"}, | ||
output_actions=[("BNC1", 255)], # To FPGA | ||
) | ||
""" | ||
|
||
import numpy as np | ||
|
||
|
||
class Spacer(object): | ||
""" | ||
Computes spacer up times using a chirp up and down pattern | ||
Returns a list of times for the spacer states | ||
Each time corresponds to an up time of the BNC1 signal | ||
dt_start: first spacer up time | ||
dt_end: last spacer up time | ||
n_pulses: number of spacer up times, one-sided (i.e. 8 means 16 - 1 spacers times) | ||
tup: duration of the spacer up time | ||
""" | ||
def __init__(self, dt_start=.02, dt_end=.4, n_pulses=8, tup=.05): | ||
self.dt_start = dt_start | ||
self.dt_end = dt_end | ||
self.n_pulses = n_pulses | ||
self.tup = tup | ||
assert np.all(np.diff(self.times) > self.tup), 'Spacers are overlapping' | ||
|
||
def __repr__(self): | ||
return f"Spacer(dt_start={self.dt_start}, dt_end={self.dt_end}, n_pulses={self.n_pulses}, tup={self.tup})" | ||
|
||
@property | ||
def times(self, latency=0): | ||
""" | ||
Computes spacer up times using a chirp up and down pattern | ||
:return: numpy arrays of times | ||
""" | ||
# upsweep | ||
t = np.linspace(self.dt_start, self.dt_end, self.n_pulses) + self.tup | ||
# downsweep | ||
t = np.r_[t, np.flipud(t[1:])] | ||
t = np.cumsum(t) | ||
return t | ||
|
||
def generate_template(self, fs=1000): | ||
""" | ||
Generates a spacer voltage template to cross-correlate with a voltage trace | ||
from a DAQ to detect a voltage trace | ||
:return: | ||
""" | ||
t = self.times | ||
ns = int((t[-1] + self.tup * 10) * fs) | ||
sig = np.zeros(ns, ) | ||
sig[(t * fs).astype(np.int32)] = 1 | ||
sig[((t + self.tup) * fs).astype(np.int32)] = -1 | ||
sig = np.cumsum(sig) | ||
return sig | ||
|
||
def add_spacer_states(self, sma=None, next_state="exit"): | ||
""" | ||
Add spacer states to a state machine | ||
:param sma: pybpodapi.state_machine.StateMachine object | ||
:param next_state: name of the state following the spacer states | ||
:return: | ||
""" | ||
assert next_state is not None | ||
t = self.times | ||
dt = np.diff(t, append=t[-1] + self.tup * 2) | ||
for i, time in enumerate(t): | ||
if sma is None: | ||
print(i, time, dt[i]) | ||
continue | ||
next_loop = f"spacer_high_{i + 1:02d}" if i < len(t) - 1 else next_state | ||
sma.add_state( | ||
state_name=f"spacer_high_{i:02d}", | ||
state_timer=self.tup, | ||
state_change_conditions={"Tup": f"spacer_low_{i:02d}"}, | ||
output_actions=[("BNC1", 255)], # To FPGA | ||
) | ||
sma.add_state( | ||
state_name=f"spacer_low_{i:02d}", | ||
state_timer=dt[i] - self.tup, | ||
state_change_conditions={"Tup": next_loop}, | ||
output_actions=[], | ||
) | ||
|
||
def find_spacers(self, signal, threshold=0.9, fs=1000): | ||
""" | ||
Find spacers in a voltage time serie. Assumes that the signal is a digital signal between 0 and 1 | ||
:param signal: | ||
:param threshold: | ||
:param fs: | ||
:return: | ||
""" | ||
template = self.generate_template(fs=fs) | ||
xcor = np.correlate(signal, template, mode="full") / np.sum(template) | ||
idetect = np.where(xcor > threshold)[0] | ||
iidetect = np.cumsum(np.diff(idetect, prepend=0) > 1) | ||
nspacers = iidetect[-1] | ||
tspacer = np.zeros(nspacers) | ||
for i in range(nspacers): | ||
ispacer = idetect[iidetect == i + 1] | ||
imax = np.argmax(xcor[ispacer]) | ||
tspacer[i] = (ispacer[imax] - template.size + 1) / fs | ||
return tspacer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import numpy as np | ||
import unittest | ||
|
||
from iblrig.spacer import Spacer | ||
|
||
|
||
class TestSpacer(unittest.TestCase): | ||
|
||
def test_spacer(self): | ||
spacer = Spacer(dt_start=.02, dt_end=.4, n_pulses=8, tup=.05) | ||
np.testing.assert_equal(spacer.times.size, 15) | ||
sig = spacer.generate_template(fs=1000) | ||
ac = np.correlate(sig, sig, 'full') / np.sum(sig**2) | ||
# import matplotlib.pyplot as plt | ||
# plt.plot(ac) | ||
# plt.show() | ||
ac[sig.size-100: sig.size + 100] = 0 # remove the main peak | ||
# the autocorrelation side lobes should be less than 30% | ||
assert np.max(ac) < .3 | ||
|
||
def test_find_spacers(self): | ||
""" | ||
Generates a fake signal with 2 spacers and finds them | ||
:return: | ||
""" | ||
fs = 1000 | ||
spacer = Spacer(dt_start=.02, dt_end=.4, n_pulses=8, tup=.05) | ||
start_times = [4.38, 96.58] | ||
template = spacer.generate_template(fs) | ||
signal = np.zeros(int(start_times[-1] * fs + template.size * 2)) | ||
for start_time in start_times: | ||
signal[int(start_time * fs): int(start_time * fs) + template.size] = template | ||
spacer_times = spacer.find_spacers(signal, fs=fs) | ||
np.testing.assert_allclose(spacer_times, start_times) |