diff --git a/.github/workflows/publish_to_pypi.yaml b/.github/workflows/publish_to_pypi.yaml new file mode 100644 index 0000000..e6f6227 --- /dev/null +++ b/.github/workflows/publish_to_pypi.yaml @@ -0,0 +1,36 @@ +# Reference for this action: +# https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries +name: Publish to PyPI + +on: + release: + types: [ published ] + +permissions: + contents: read + +jobs: + deploy: + name: Build and publish Python distributions to PyPI + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: '3.x' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel + + - name: Build package + run: python setup.py sdist bdist_wheel + + - name: Publish package + # GitHub recommends pinning 3rd party actions to a commit SHA. + uses: pypa/gh-action-pypi-publish@37f50c210e3d2f9450da2cd423303d6a14a6e29f + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml new file mode 100644 index 0000000..aa76fca --- /dev/null +++ b/.github/workflows/tests.yaml @@ -0,0 +1,44 @@ +name: ibl-neuropixel CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + tests: + name: build (${{ matrix.python-version }}, ${{ matrix.os }}) + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: ["ubuntu-latest", "windows-latest"] + python-version: ["3.10"] + steps: + - name: Checkout ibl-neuropixel repo + uses: actions/checkout@v3 + with: + path: ibl-neuropixel + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: flake8 + run: | + pip install flake8 --quiet + cd ibl-neuropixel + python -m flake8 + + - name: iblrig and iblpybpod requirements + shell: bash -l {0} + run: | + pip install -e ibl-neuropixel + pip install -r ibl-neuropixel/requirements.txt + + - name: CPU unit tests + shell: bash -l {0} + run: | + cd ibl-neuropixel/src/tests/unit/cpu + python -m unittest discover diff --git a/release_notes.md b/release_notes.md index 760a839..3c408e4 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,4 +1,9 @@ -# 0.6.0 +# 0.7.0 + +## 0.7.0 2023-06-29 +- Add function `spike_venn3` in new module `neurodsp.spiketrains` +- Update `iblutil` dependency to 1.7.0 to use `iblutil.numerical.bincount2D` + ## 0.6.2 2023-06-19 - add option to specify meta-data file for spikeglx.Reader diff --git a/requirements.txt b/requirements.txt index 4b6a240..82984a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -iblutil >= 1.6.0 +iblutil >= 1.7.0 joblib mtscomp numpy diff --git a/setup.py b/setup.py index a2ab78c..0002ad3 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setuptools.setup( name="ibl-neuropixel", - version="0.6.2", + version="0.7.0", author="The International Brain Laboratory", description="Collection of tools for Neuropixel 1.0 and 2.0 probes data", long_description=long_description, diff --git a/src/neurodsp/spiketrains.py b/src/neurodsp/spiketrains.py new file mode 100644 index 0000000..344eff5 --- /dev/null +++ b/src/neurodsp/spiketrains.py @@ -0,0 +1,112 @@ +import numpy as np +from iblutil.numerical import bincount2D +import tqdm +import logging + +_logger = logging.getLogger("ibllib") +_logger.setLevel("INFO") + + +def spikes_venn3( + samples_tuple, + channels_tuple, + samples_binsize=None, + channels_binsize=4, + fs=30000, + num_channels=384, + chunk_size=None, +): + """ + Given a set of spikes found by different spike sorters over the same snippet, + return the venn diagram counts as a dictionary suitable for the `subsets` arg of + `matplotlib_venn.venn3()`. "100" represents the number of spikes found by sorter 1 + but not the other two, "110" represents the number of spikes found by sorters 1 and 2 + but not 3, etc. The algorithm works by binning in the time and channel dimensions and + counting spikes found by different sorters within the same bins. + + :param samples_tuple: A tuple of 3 sample times of spikes (each a 1D NumPy array). + :param channels_tuple: A tuple of 3 channel locations of spikes (each a 1D NumPy array). + :param samples_binsize: Size of sample bins in number of samples. Defaults to 0.4 ms. + :param channels_binsize: Size of channel bins in number of channels. Defaults to 4. + :param fs: Sampling rate (Hz). Defaults to 30000. + :param num_channels: Total number of channels where spikes appear. Defaults to 384. + :param chunk_size: Chunk size to process spike data (in samples). Defaults to 20 seconds. + :return: dict containing venn diagram spike counts for the spike sorters. + """ + assert len(samples_tuple) == 3, "Must have 3 sets of samples." + assert len(channels_tuple) == 3, "Must have 3 sets of channels." + assert all( + samples_tuple[i].shape == channels_tuple[i].shape for i in range(3) + ), "Samples and channels must match for each sorter." + + if not samples_binsize: + # set default: 0.4 ms + samples_binsize = int(0.4 * fs / 1000) + + if not chunk_size: + # set default: 20 s + chunk_size = 20 * fs + + # find the timestamp of the last spike detected by any of the sorters + # to calibrate chunking + max_samples = max([np.max(samples) for samples in samples_tuple]) + num_chunks = int((max_samples // chunk_size) + 1) + + # each spike falls into one of 7 conditions based on whether it was found + # by different sortings + cond_names = ["100", "010", "110", "001", "101", "011", "111"] + pre_result = np.zeros(7, int) + vec = np.array([1, 2, 4]) + + _logger.info(f"Running spike venning routine with {num_chunks} chunks.") + for ch in tqdm.tqdm(range(num_chunks)): + # select spikes within this chunk's time snippet + sample_offset = ch * chunk_size + spike_indices = [ + slice( + *np.searchsorted(samples, [sample_offset, sample_offset + chunk_size]) + ) + for samples in samples_tuple + ] + # get corresponding spike sample times and channels + samples_chunks = [ + samples[spike_indices[i]].astype(int) - sample_offset + for i, samples in enumerate(samples_tuple) + ] + channels_chunks = [ + channels[spike_indices[i]].astype(int) + for i, channels in enumerate(channels_tuple) + ] + + # compute fast 2D bin count for each sorter, resulting in an (3, num_bins) + # array where the (i, j) number is the number of spikes found by sorter i + # in (linearized) bin j. + bin_counts = np.array( + [ + bincount2D( + samples_chunks[i], + channels_chunks[i], + samples_binsize, + channels_binsize, + [0, chunk_size], + [0, num_channels], + )[0].flatten() + for i in range(3) + ] + ) + + # this process iteratively counts the number of spikes falling into each + # of the 7 conditions by separating out which spikes must have been found + # by each spike sorter within each bin, and updates the master `pre_result` + # count array for this chunk + max_per_spike = np.amax(bin_counts, axis=0) + overall_max = np.max(max_per_spike) + + for i in range(0, overall_max): + ind = max_per_spike - i > 0 + venn_info = bin_counts[:, ind] >= (max_per_spike - i)[ind] + venn_info_int = vec @ venn_info + conds, counts = np.unique(venn_info_int, return_counts=True) + pre_result[conds - 1] += counts + + return dict(zip(cond_names, pre_result)) diff --git a/src/tests/unit/cpu/test_dsp.py b/src/tests/unit/cpu/test_dsp.py index 6a37735..9088ad5 100644 --- a/src/tests/unit/cpu/test_dsp.py +++ b/src/tests/unit/cpu/test_dsp.py @@ -8,10 +8,10 @@ import neurodsp.voltage as voltage import neurodsp.cadzow as cadzow import neurodsp.smooth as smooth +import neurodsp.spiketrains as spiketrains class TestSyncTimestamps(unittest.TestCase): - def test_timestamps_lin(self): np.random.seed(4132) n = 50 @@ -27,16 +27,22 @@ def test_timestamps_lin(self): # test missing indices on a imiss = np.setxor1d(np.arange(n), [1, 2, 34, 35]) - _fcn, _drift, _ia, _ib = utils.sync_timestamps(tsa[imiss], tsb, return_indices=True) + _fcn, _drift, _ia, _ib = utils.sync_timestamps( + tsa[imiss], tsb, return_indices=True + ) assert np.all(np.isclose(_fcn(tsa[imiss[_ia]]), tsb[_ib])) # test missing indices on b - _fcn, _drift, _ia, _ib = utils.sync_timestamps(tsa, tsb[imiss], return_indices=True) + _fcn, _drift, _ia, _ib = utils.sync_timestamps( + tsa, tsb[imiss], return_indices=True + ) assert np.all(np.isclose(_fcn(tsa[_ia]), tsb[imiss[_ib]])) # test missing indices on both imiss2 = np.setxor1d(np.arange(n), [14, 17]) - _fcn, _drift, _ia, _ib = utils.sync_timestamps(tsa[imiss], tsb[imiss2], return_indices=True) + _fcn, _drift, _ia, _ib = utils.sync_timestamps( + tsa[imiss], tsb[imiss2], return_indices=True + ) assert np.all(np.isclose(_fcn(tsa[imiss[_ia]]), tsb[imiss2[_ib]])) @@ -45,14 +51,16 @@ class TestParabolicMax(unittest.TestCase): maxi = np.array([np.NaN, 0, 3.04166667, 3.04166667, 5, 5]) ipeak = np.array([np.NaN, 0, 5.166667, 2.166667, 0, 7]) # input - x = np.array([ - [0, 0, 0, 0, 0, np.NaN, 0, 0], # some NaNs - [0, 0, 0, 0, 0, 0, 0, 0], # all flat - [0, 0, 0, 0, 1, 3, 2, 0], - [0, 1, 3, 2, 0, 0, 0, 0], - [5, 1, 3, 2, 0, 0, 0, 0], # test first sample - [0, 1, 3, 2, 0, 0, 0, 5], # test last sample - ]) + x = np.array( + [ + [0, 0, 0, 0, 0, np.NaN, 0, 0], # some NaNs + [0, 0, 0, 0, 0, 0, 0, 0], # all flat + [0, 0, 0, 0, 1, 3, 2, 0], + [0, 1, 3, 2, 0, 0, 0, 0], + [5, 1, 3, 2, 0, 0, 0, 0], # test first sample + [0, 1, 3, 2, 0, 0, 0, 5], # test last sample + ] + ) def test_error_cases(self): pass @@ -71,7 +79,6 @@ def test_1d(self): class TestDspMisc(unittest.TestCase): - def test_dsp_cosine_func(self): x = np.linspace(0, 40) fcn = utils.fcn_cosine(bounds=[20, 30]) @@ -80,21 +87,31 @@ def test_dsp_cosine_func(self): class TestPhaseRegression(unittest.TestCase): - def test_fit_phase1d(self): w = np.zeros(500) w[1] = 1 - self.assertTrue(np.isclose(fourier.fit_phase(w, .002), .002)) + self.assertTrue(np.isclose(fourier.fit_phase(w, 0.002), 0.002)) def test_fit_phase2d(self): w = np.zeros((500, 2)) w[1, 0], w[2, 1] = (1, 1) - self.assertTrue(np.all(np.isclose(fourier.fit_phase(w, .002, axis=0), np.array([.002, .004])))) - self.assertTrue(np.all(np.isclose(fourier.fit_phase(w.transpose(), .002), np.array([.002, .004])))) + self.assertTrue( + np.all( + np.isclose( + fourier.fit_phase(w, 0.002, axis=0), np.array([0.002, 0.004]) + ) + ) + ) + self.assertTrue( + np.all( + np.isclose( + fourier.fit_phase(w.transpose(), 0.002), np.array([0.002, 0.004]) + ) + ) + ) class TestShift(unittest.TestCase): - def test_shift_already_fft(self): for ns in [500, 501]: w = scipy.signal.ricker(ns, 10) @@ -117,27 +134,43 @@ def test_shift_2d(self): ns = 500 w = scipy.signal.ricker(ns, 10) w = np.tile(w, (100, 1)).transpose() - self.assertTrue(np.all(np.isclose(fourier.fshift(w, 1, axis=0), np.roll(w, 1, axis=0)))) - self.assertTrue(np.all(np.isclose(fourier.fshift(w, 1, axis=1), np.roll(w, 1, axis=1)))) + self.assertTrue( + np.all(np.isclose(fourier.fshift(w, 1, axis=0), np.roll(w, 1, axis=0))) + ) + self.assertTrue( + np.all(np.isclose(fourier.fshift(w, 1, axis=1), np.roll(w, 1, axis=1))) + ) # # test with individual shifts for each trace self.assertTrue( - np.all(np.isclose(fourier.fshift(w, np.ones(w.shape[1]), axis=0), np.roll(w, 1, axis=0)))) + np.all( + np.isclose( + fourier.fshift(w, np.ones(w.shape[1]), axis=0), + np.roll(w, 1, axis=0), + ) + ) + ) self.assertTrue( - np.all(np.isclose(fourier.fshift(w, np.ones(w.shape[0]), axis=1), np.roll(w, 1, axis=1)))) + np.all( + np.isclose( + fourier.fshift(w, np.ones(w.shape[0]), axis=1), + np.roll(w, 1, axis=1), + ) + ) + ) class TestSmooth(unittest.TestCase): - def test_smooth_lp(self): np.random.seed(458) - a = np.random.rand(500,) + a = np.random.rand( + 500, + ) a_ = smooth.lp(a, [0.1, 0.15]) - res = fourier.hp(np.pad(a_, 100, mode='edge'), 1, [0.1, 0.15])[100:-100] + res = fourier.hp(np.pad(a_, 100, mode="edge"), 1, [0.1, 0.15])[100:-100] self.assertTrue((utils.rms(a) / utils.rms(res)) > 500) class TestFFT(unittest.TestCase): - def test_spectral_convolution(self): sig = np.random.randn(20, 500) w = np.hanning(25) @@ -145,12 +178,12 @@ def test_spectral_convolution(self): s = np.convolve(sig[0, :], w) self.assertTrue(np.all(np.isclose(s, c[0, :-1]))) - c = fourier.convolve(sig, w, mode='same') - s = np.convolve(sig[0, :], w, mode='same') + c = fourier.convolve(sig, w, mode="same") + s = np.convolve(sig[0, :], w, mode="same") self.assertTrue(np.all(np.isclose(c[0, :], s))) - c = fourier.convolve(sig, w[:-1], mode='same') - s = np.convolve(sig[0, :], w[:-1], mode='same') + c = fourier.convolve(sig, w[:-1], mode="same") + s = np.convolve(sig[0, :], w[:-1], mode="same") self.assertTrue(np.all(np.isclose(c[0, :], s))) def test_nech_optim(self): @@ -196,29 +229,41 @@ def test_fexpand(self): def test_fscale(self): # test for an even number of samples - res = [0, 100, 200, 300, 400, 500, -400, -300, -200, -100], + res = ([0, 100, 200, 300, 400, 500, -400, -300, -200, -100],) self.assertTrue(np.all(np.abs(fourier.fscale(10, 0.001) - res) < 1e-6)) # test for an odd number of samples - res = [0, 90.9090909090909, 181.818181818182, 272.727272727273, 363.636363636364, - 454.545454545455, -454.545454545455, -363.636363636364, -272.727272727273, - -181.818181818182, -90.9090909090909], + res = ( + [ + 0, + 90.9090909090909, + 181.818181818182, + 272.727272727273, + 363.636363636364, + 454.545454545455, + -454.545454545455, + -363.636363636364, + -272.727272727273, + -181.818181818182, + -90.9090909090909, + ], + ) self.assertTrue(np.all(np.abs(fourier.fscale(11, 0.001) - res) < 1e-6)) def test_filter_lp_hp(self): # test 1D time serie: subtracting lp filter removes DC ts1 = np.random.rand(500) - out1 = fourier.lp(ts1, 1, [.1, .2]) + out1 = fourier.lp(ts1, 1, [0.1, 0.2]) self.assertTrue(np.mean(ts1 - out1) < 0.001) # test 2D case along the last dimension ts = np.tile(ts1, (11, 1)) - out = fourier.lp(ts, 1, [.1, .2]) + out = fourier.lp(ts, 1, [0.1, 0.2]) self.assertTrue(np.allclose(out, out1)) # test 2D case along the first dimension ts = np.tile(ts1[:, np.newaxis], (1, 11)) - out = fourier.lp(ts, 1, [.1, .2], axis=0) + out = fourier.lp(ts, 1, [0.1, 0.2], axis=0) self.assertTrue(np.allclose(np.transpose(out), out1)) # test 1D time serie: subtracting lp filter removes DC - out2 = fourier.hp(ts1, 1, [.1, .2]) + out2 = fourier.hp(ts1, 1, [0.1, 0.2]) self.assertTrue(np.allclose(out1, ts1 - out2)) def test_dft(self): @@ -239,21 +284,28 @@ def test_dft(self): _n0, _n1, nt = (10, 11, 30) x = np.random.rand(_n0 * _n1, nt) X_ = np.fft.fft(np.fft.fft(x.reshape(_n0, _n1, nt), axis=0), axis=1) - r, c = [v.flatten() for v in np.meshgrid(np.arange( - _n0) / _n0, np.arange(_n1) / _n1, indexing='ij')] + r, c = [ + v.flatten() + for v in np.meshgrid( + np.arange(_n0) / _n0, np.arange(_n1) / _n1, indexing="ij" + ) + ] nk, nl = (_n0, _n1) X = fourier.dft2(x, r, c, nk, nl) assert np.all(np.isclose(X, X_)) class TestWindowGenerator(unittest.TestCase): - def test_window_simple(self): wg = utils.WindowGenerator(ns=500, nswin=100, overlap=50) sl = list(wg.firstlast) self.assertTrue(wg.nwin == len(sl) == 9) - self.assertTrue(np.all(np.array([s[0] for s in sl]) == np.arange(0, wg.nwin) * 50)) - self.assertTrue(np.all(np.array([s[1] for s in sl]) == np.arange(0, wg.nwin) * 50 + 100)) + self.assertTrue( + np.all(np.array([s[0] for s in sl]) == np.arange(0, wg.nwin) * 50) + ) + self.assertTrue( + np.all(np.array([s[1] for s in sl]) == np.arange(0, wg.nwin) * 50 + 100) + ) wg = utils.WindowGenerator(ns=500, nswin=100, overlap=10) sl = list(wg.firstlast) @@ -271,7 +323,9 @@ def test_nwindows_computation(self): def test_firstlast_slices(self): # test also the indexing versus direct slicing - my_sig = np.random.rand(500,) + my_sig = np.random.rand( + 500, + ) wg = utils.WindowGenerator(ns=500, nswin=100, overlap=50) # 1) get the window by my_rms = np.zeros((wg.nwin,)) @@ -298,7 +352,9 @@ def test_tscale(self): class TestFrontDetection(unittest.TestCase): def test_rises_falls(self): # test 1D case with a long pulse and a dirac - a = np.zeros(500,) + a = np.zeros( + 500, + ) a[80:120] = 1 a[200] = 1 # rising fronts @@ -317,13 +373,19 @@ def test_rises_falls(self): a[1, 280:320] = 1 a[1, 400] = 1 # rising fronts - self.assertTrue(np.all(utils.rises(a) == np.array([[0, 0, 1, 1], [80, 200, 280, 400]]))) + self.assertTrue( + np.all(utils.rises(a) == np.array([[0, 0, 1, 1], [80, 200, 280, 400]])) + ) # falling fronts - self.assertTrue(np.all(utils.falls(a) == np.array([[0, 0, 1, 1], [120, 201, 320, 401]]))) + self.assertTrue( + np.all(utils.falls(a) == np.array([[0, 0, 1, 1], [120, 201, 320, 401]])) + ) # both ind, val = utils.fronts(a) self.assertTrue(all(ind[0] == np.array([0, 0, 0, 0, 1, 1, 1, 1]))) - self.assertTrue(all(ind[1] == np.array([80, 120, 200, 201, 280, 320, 400, 401]))) + self.assertTrue( + all(ind[1] == np.array([80, 120, 200, 201, 280, 320, 400, 401])) + ) self.assertTrue(all(val == np.array([1, -1, 1, -1, 1, -1, 1, -1]))) def test_rises_analog(self): @@ -334,7 +396,6 @@ def test_rises_analog(self): class TestVoltage(unittest.TestCase): - def test_fk(self): """ creates a couple of plane waves and separate them using the velocity HP filter @@ -348,19 +409,34 @@ def test_fk(self): data_v2 = fourier.fshift(data, offset / v2 / sr) noise = np.random.randn(ntr, ns) / 60 - fk = voltage.fk(data_v1 + data_v2 + noise, si=sr, dx=dx, vbounds=[1200, 1500], - ntr_pad=10, ntr_tap=15, lagc=.25) - fknoise = voltage.fk(noise, si=sr, dx=dx, vbounds=[1200, 1500], - ntr_pad=10, ntr_tap=15, lagc=.25) + fk = voltage.fk( + data_v1 + data_v2 + noise, + si=sr, + dx=dx, + vbounds=[1200, 1500], + ntr_pad=10, + ntr_tap=15, + lagc=0.25, + ) + fknoise = voltage.fk( + noise, si=sr, dx=dx, vbounds=[1200, 1500], ntr_pad=10, ntr_tap=15, lagc=0.25 + ) # at least 90% of the traces should be below 50dB and 98% below 40 dB - assert np.mean(20 * np.log10(utils.rms(fk - data_v1 - fknoise)) < -50) > .9 - assert np.mean(20 * np.log10(utils.rms(fk - data_v1 - fknoise)) < -40) > .98 + assert np.mean(20 * np.log10(utils.rms(fk - data_v1 - fknoise)) < -50) > 0.9 + assert np.mean(20 * np.log10(utils.rms(fk - data_v1 - fknoise)) < -40) > 0.98 # test the K option kbands = np.sin(np.arange(ns) / ns * 8 * np.pi) / 10 - fkk = voltage.fk(data_v1 + data_v2 + kbands, si=sr, dx=dx, vbounds=[1200, 1500], - ntr_pad=40, ntr_tap=15, lagc=.25, - kfilt={'bounds': [0, .01], 'btype': 'hp'}) - assert np.mean(20 * np.log10(utils.rms(fkk - data_v1)) < -40) > .9 + fkk = voltage.fk( + data_v1 + data_v2 + kbands, + si=sr, + dx=dx, + vbounds=[1200, 1500], + ntr_pad=40, + ntr_tap=15, + lagc=0.25, + kfilt={"bounds": [0, 0.01], "btype": "hp"}, + ) + assert np.mean(20 * np.log10(utils.rms(fkk - data_v1)) < -40) > 0.9 # from easyqc.gui import viewseis # a = viewseis(data_v1 + data_v2 + kbands, .002, title='input') # b = viewseis(fkk, .002, title='output') @@ -368,24 +444,57 @@ def test_fk(self): class TestCadzow(unittest.TestCase): - def test_trajectory_matrixes(self): - assert np.all(cadzow.traj_matrix_indices(4) == np.array([[1, 0], [2, 1], [3, 2]])) + assert np.all( + cadzow.traj_matrix_indices(4) == np.array([[1, 0], [2, 1], [3, 2]]) + ) assert np.all(cadzow.traj_matrix_indices(3) == np.array([[1, 0], [2, 1]])) class TestStack(unittest.TestCase): - def test_simple_stack(self): ntr, ns = (24, 400) data = np.zeros((ntr, ns), dtype=np.float32) word = np.flipud(np.floor(np.arange(ntr) / 3)) data += word[:, np.newaxis] * 10 stack, fold = voltage.stack(data, word=word) - assert (np.all(fold == 3)) - assert (np.all(np.squeeze(np.unique(stack, axis=-1)) == np.arange(8) * 10)) + assert np.all(fold == 3) + assert np.all(np.squeeze(np.unique(stack, axis=-1)) == np.arange(8) * 10) # test with a header - header = {'toto': np.random.randn(ntr)} + header = {"toto": np.random.randn(ntr)} stack, hstack = voltage.stack(data, word=word, header=header) - assert (list(hstack.keys()) == ['toto', 'fold']) - assert (np.all(hstack['fold'] == 3)) + assert list(hstack.keys()) == ["toto", "fold"] + assert np.all(hstack["fold"] == 3) + + +class TestSpikeTrains(unittest.TestCase): + def test_spikes_venn3(self): + rng = np.random.default_rng() + # make sure each 'spiketrain' goes up to around 30000 samples + samples_tuple = ( + np.cumsum(rng.poisson(30, 1000)), + np.cumsum(rng.poisson(20, 1500)), + np.cumsum(rng.poisson(15, 2000)), + ) + # used reduced number of channels for speed and to increase collision chance + channels_tuple = ( + rng.integers(20, size=(1000,)), + rng.integers(20, size=(1500,)), + rng.integers(20, size=(2000,)), + ) + # check that spikes have been accounted for exactly once + venn_info = spiketrains.spikes_venn3( + samples_tuple, channels_tuple, num_channels=20 + ) + assert ( + venn_info["100"] + venn_info["101"] + venn_info["110"] + venn_info["111"] + == 1000 + ) + assert ( + venn_info["010"] + venn_info["011"] + venn_info["110"] + venn_info["111"] + == 1500 + ) + assert ( + venn_info["001"] + venn_info["101"] + venn_info["011"] + venn_info["111"] + == 2000 + ) diff --git a/src/tests/unit/cpu/test_spikeglx.py b/src/tests/unit/cpu/test_spikeglx.py index edcdb70..9f37bb6 100644 --- a/src/tests/unit/cpu/test_spikeglx.py +++ b/src/tests/unit/cpu/test_spikeglx.py @@ -501,10 +501,11 @@ def testSaveSubset(self): def test_write_meta_file(self): meta = spikeglx.read_meta_data(Path(TEST_PATH).joinpath('sample3A_g0_t0.imec.ap.meta')) - with tempfile.NamedTemporaryFile() as file_mdtest: - spikeglx.write_meta_data(meta, file_mdtest.name) - _meta = spikeglx.read_meta_data(file_mdtest.name) - self.assertEqual(meta, _meta) + with tempfile.TemporaryDirectory() as tmpdir: + temp_meta = Path(tmpdir) / "sample.meta" + spikeglx.write_meta_data(meta, temp_meta) + _meta = spikeglx.read_meta_data(temp_meta) + self.assertEqual(meta, _meta) class TestsBasicReader(unittest.TestCase): @@ -515,13 +516,14 @@ def test_read_flat_binary_float32(self): # here we expect no scaling to V applied and no sync trace as the format is float32 kwargs = dict(ns=60000, nc=384, fs=30000, dtype=np.float32) data = np.random.randn(kwargs['ns'], kwargs['nc']).astype(np.float32) - with tempfile.NamedTemporaryFile() as tf: - with open(tf.name, mode='w') as fp: + with tempfile.TemporaryDirectory() as tmpdir: + temp_bin = Path(tmpdir) / "sample.bin" + with open(temp_bin, "w") as fp: data.tofile(fp) - sr = spikeglx.Reader(tf.name, **kwargs) - assert np.all(sr[:, :] == data) - assert sr.nsync == 0 - assert np.all(sr.sample2volts == 1) + with spikeglx.Reader(temp_bin, **kwargs) as sr: + assert np.all(sr[:, :] == data) + assert sr.nsync == 0 + assert np.all(sr.sample2volts == 1) def test_read_flat_binary_int16_with_sync(self): # here we expect scaling on all channels but the sync channel @@ -532,18 +534,19 @@ def test_read_flat_binary_int16_with_sync(self): data = np.random.randn(kwargs['ns'], kwargs['nc']) / s2v data[:, -1] = 1 data = data.astype(np.int16) - with tempfile.NamedTemporaryFile() as tf: - with open(tf.name, mode='w') as fp: + with tempfile.TemporaryDirectory() as tmpdir: + temp_bin = Path(tmpdir) / "sample.bin" + with open(temp_bin, "w") as fp: data.tofile(fp) # test for both arguments specifed and auto-detection of filesize / nchannels for neuropixel for kw in (kwargs, {}): with self.subTest(kwargs=kw): - sr = spikeglx.Reader(tf.name, **kw) - print(sr.shape, kw) - assert sr.nsync == 1 - np.testing.assert_allclose( - sr[:, :-1], data[:, :-1].astype(np.float32) * neuropixel.S2V_AP, rtol=1e-5) - np.testing.assert_array_equal(sr.sample2volts, s2v) + with spikeglx.Reader(temp_bin, **kw) as sr: + print(sr.shape, kw) + assert sr.nsync == 1 + np.testing.assert_allclose( + sr[:, :-1], data[:, :-1].astype(np.float32) * neuropixel.S2V_AP, rtol=1e-5) + np.testing.assert_array_equal(sr.sample2volts, s2v) def test_read_flat_binary_int16_no_sync(self): # here we expect scaling on all channels but the sync channel @@ -552,18 +555,19 @@ def test_read_flat_binary_int16_no_sync(self): s2v = np.ones(384) * neuropixel.S2V_AP data = np.random.randn(kwargs['ns'], kwargs['nc']) / s2v data = data.astype(np.int16) - with tempfile.NamedTemporaryFile() as tf: - with open(tf.name, mode='w') as fp: + with tempfile.TemporaryDirectory() as tmpdir: + temp_bin = Path(tmpdir) / "sample.bin" + with open(temp_bin, mode='w') as fp: data.tofile(fp) # test for both arguments specifed and auto-detection of filesize / nchannels for neuropixel for kw in (kwargs, {}): with self.subTest(kwargs=kw): - sr = spikeglx.Reader(tf.name, **kw) - print(sr.shape, kw) - np.testing.assert_allclose( - sr[:, :], data[:, :].astype(np.float32) * neuropixel.S2V_AP, rtol=1e-5) - assert sr.nsync == 0 - np.testing.assert_array_equal(sr.sample2volts, s2v) + with spikeglx.Reader(temp_bin, **kw) as sr: + print(sr.shape, kw) + np.testing.assert_allclose( + sr[:, :], data[:, :].astype(np.float32) * neuropixel.S2V_AP, rtol=1e-5) + assert sr.nsync == 0 + np.testing.assert_array_equal(sr.sample2volts, s2v) def test_load_meta_file_only(self): # here we load only a meta-file