diff --git a/tools/README.md b/tools/README.md new file mode 100644 index 0000000..244f231 --- /dev/null +++ b/tools/README.md @@ -0,0 +1,168 @@ +# ReproStim Tools + +## Overview + +### A. Install Singularity + +#### On Linux (Ubuntu 24.04) : + +```shell +wget -O- http://neuro.debian.net/lists/noble.de-m.libre | sudo tee /etc/apt/sources.list.d/neurodebian.sources.list +sudo apt-key adv --recv-keys --keyserver hkps://keyserver.ubuntu.com 0xA5D32F012649A5A9 + +sudo apt-get update + +sudo apt-get install singularity-container +``` + +```shell +$ singularity --version + singularity-ce version 4.1.1 +``` + +### B. Install ReproNim Containers + +#### On Linux (Ubuntu 24.04) : + +Ensure DataLad is installed: + +``` +sudo apt-get install datalad +``` + +As next step install and download ReproNim containers: + +``` +datalad install https://datasets.datalad.org/repronim/containers +datalad update +cd ./containers/images/repronim +datalad get . +``` + +Check that X11 system is used by default in Linux (Ubuntu 24.04), +psychopy will not work well with Wayland: + +``` +echo $XDG_SESSION_TYPE +``` + +It should return `x11`. If not, switch to X11: + + - At the login screen, click on your username. + - Before entering your password, look for a gear icon or "Options" button at the bottom right corner of the screen. + - Click the gear icon, and a menu should appear where you can select either "Ubuntu on Xorg" or "Ubuntu on Wayland." + - Choose "Ubuntu on Xorg" to switch to X11. + - Enter your password and log in. You should now be running the X11 session. + +### C. Run ReproNim TimeSync Script + +Make sure the current directory is one under singularity container +path created in the previous step B: + +```shell +cd ./containers/images/repronim +``` + +Run the script: + +```shell +singularity exec ./repronim-psychopy--2024.1.4.sing ${REPROSTIM_PATH}/tools/reprostim-timesync-stimuli output.log 1 +``` +Where `REPROSTIM_PATH` is the local clone of https://github.com/ReproNim/reprostim repository. + +Last script parameter is the display ID, which is `1` in this case. + +### D. Update Singularity Container Locally (Optionally) + +Optionally, you can update the container locally for development +and debugging purposes (with overlay): + +```shell +singularity overlay create \ + --size 1024 \ + repronim-psychopy--2024.1.4.overlay + +sudo singularity exec \ + --overlay repronim-psychopy--2024.1.4.overlay \ + repronim-psychopy--2024.1.4.sing \ + bash +``` +As sample install some package: + +```shell +apt-get update +apt-get install pulseaudio-utils +pactl +exit +``` + +And now run the script with overlay: + +```shell +singularity exec \ + --cleanenv --contain \ + -B /run/user/$(id -u)/pulse \ + -B ${REPROSTIM_PATH} \ + --env DISPLAY=$DISPLAY \ + --env PULSE_SERVER=unix:/run/user/$(id -u)/pulse/native \ + --overlay ./repronim-psychopy--2024.1.4.overlay \ + ./repronim-psychopy--2024.1.4.sing \ + ${REPROSTIM_PATH}/tools/reprostim-timesync-stimuli output.log 1 +``` + +Where `/run/user/321/pulse` is sample external pulseaudio device path bound to the container. Usually +when you run the script w/o binding it will report error like: + +```shell +Failed to create secure directory (/run/user/321/pulse): No such file or directory +``` + +NOTE: Make sure `PULSE_SERVER` is specified in the container environment and +points to the host pulseaudio server. e.g.: + +```shell +export PULSE_SERVER=unix:/run/user/321/pulse/native +``` + +#### Dev Notes (for local PC altogether) + +```shell +cd ~/Projects/Dartmouth/branches/datalad/containers/images/repronim +export REPROSTIM_PATH=~/Projects/Dartmouth/branches/reprostim + +singularity overlay create \ + --size 1024 \ + repronim-psychopy--2024.1.4.overlay + +sudo singularity exec \ + --overlay repronim-psychopy--2024.1.4.overlay \ + repronim-psychopy--2024.1.4.sing \ + bash + +# execute in shell +apt-get update +apt-get install portaudio19-dev pulseaudio pavucontrol pulseaudio-utils +pactl +exit + +# make sure all python packages are installed +sudo singularity exec \ + --overlay repronim-psychopy--2024.1.4.overlay \ + repronim-psychopy--2024.1.4.sing \ + python3 -m pip install pyzbar opencv-python numpy click pydantic sounddevice scipy pydub pyaudio reedsolo psychopy-sounddevice + +# and run the script +rm output.log +singularity exec \ + --cleanenv --contain \ + -B /run/user/$(id -u)/pulse \ + -B ${REPROSTIM_PATH} \ + --env DISPLAY=$DISPLAY \ + --env PULSE_SERVER=unix:/run/user/$(id -u)/pulse/native \ + --overlay ./repronim-psychopy--2024.1.4.overlay \ + ./repronim-psychopy--2024.1.4.sing \ + ${REPROSTIM_PATH}/tools/reprostim-timesync-stimuli output.log 1 + +``` + + diff --git a/tools/audio-codes-notes.md b/tools/audio-codes-notes.md new file mode 100644 index 0000000..9bac0be --- /dev/null +++ b/tools/audio-codes-notes.md @@ -0,0 +1,198 @@ +# Audio Codes Notes + +## Installation + +``` + python3.10 -m venv venv + source venv/bin/activate + pip install --upgrade pip + pip install -r audio-codes-requirements.txt +``` + + On MacOS: +``` + brew install portaudio + pip install pyaudio +``` + + On Linux: +``` + sudo apt-get install portaudio19-dev +``` + +## TODO: + + Review `psychopy` and sound API and possibly use PTB's + facilities in PsychoPy for precise audio placement in time: + https://www.psychopy.org/download.html + https://psychopy.org/api/sound/playback.html + + Look at watermark in audio. + +## PsychoPy, Sound, PTB + +### On MacOS: + +NOTE: PsychoPy (2024.2.3) current requirements limits/suggests + to Python version 3.10. + +Download and install the standalone package: +PsychoPy 2024.2.3 modern (py3.10) +https://github.com/psychopy/psychopy/releases/download/2024.2.3/StandalonePsychoPy-2024.2.3-macOS-py3.10.dmg + +Run PsychoPy and check the sound settings that `pbt` is +set as `Audio Library`. + +Make sure Python 3.10 is installed and venv is explicitly created with it: +``` + python3.10 -m venv venv + source venv/bin/activate + pip install --upgrade pip + pip install -r audio-codes-requirements.txt +``` + +NOTE: first time PsychoPy is run, it may takes a long time to setup audio + download additional dependencies. + +### On Linux (Ubuntu 22.04): + +Create some folder for `psychopy` installation and init Python 3.10 venv: +``` + python3.10 -m venv venv + source venv/bin/activate +``` +Then fetch a wxPython wheel for your platform from: +https://extras.wxpython.org/wxPython4/extras/linux/gtk3/ and +download it locally to your machine. +In my case it was `https://extras.wxpython.org/wxPython4/extras/linux/gtk3/ubuntu-22.04/wxPython-4.2.1-cp310-cp310-linux_x86_64.whl` +downloaded to `wxPython/wxPython-4.2.1-cp310-cp310-linux_x86_64.whl` . + +Install sound dependencies (1st one may fail): +``` + sudo apt-get install libusb-1.0-0-dev portaudio19-dev libasound2-dev + pip install psychtoolbox +``` + +Install wxPython in `venv` with, e.g.: +``` + pip install wxPython/wxPython-4.2.1-cp310-cp310-linux_x86_64.whl +``` + +Then install `psychopy`: +``` + pip install psychopy +``` + +Run psychopy: +``` + psychopy +``` + +NOTE: PsychoPy PTB sound was non tested on Ubuntu 22.04, due to upgrade to 24.04. + +### On Linux (Ubuntu 24.04): + +Somehow was unable to run `python3.10` directly so installed it manually together with venv: + +``` +sudo add-apt-repository ppa:deadsnakes/ppa +sudo apt update +sudo apt install python3.10 +sudo apt install python3.10-venv +``` + +After this `python3.10` failed to create `venv` so used following commans to create it: + +``` +python3.10 -m venv --without-pip venv +source venv/bin/activate +curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py +python get-pip.py +``` + +Then installed `wxPython` it can take up to 1 hour to compile it (there is no pre-built wheels for Ubuntu 24.04 ATM): + +``` +sudo apt update +sudo apt install python3-dev python3-pip libgtk-3-dev +pip install wxPython +``` + +Finally installed `psychopy`: + +``` +pip install psychopy +``` + +Applied security fix for audio scripts user, by adding current user to `audio` group: + +``` +sudo usermod -a -G audio $USER +``` + +and to adjust real-time permissions by creating a file `/etc/security/limits.d/99-realtime.conf` + +``` +sudo vi /etc/security/limits.d/99-realtime.conf +``` + +with the following content: + +``` +@audio - rtprio 99 +@audio - memlock unlimited +``` + +Then rebooted the system. + +NOTE: PsychoPy PTB still doesn't work on Ubuntu 24.04 and script produces error, TBD: + +``` +[DEBUG] play sound with psychopy ptb +Failure: No such entity +Failure: No such entity +``` + +## NeuroDebian/Singularity + +On Linux (Ubuntu 22.04) : + +Was unable to install `singularity-container` from `neurodebian` repository after multiple attempts, so finally upgraded Linux 22.04 to 24.04. + + +On Linux (Ubuntu 24.04) : + +``` +wget -O- http://neuro.debian.net/lists/noble.de-m.libre | sudo tee /etc/apt/sources.list.d/neurodebian.sources.list +sudo apt-key adv --recv-keys --keyserver hkps://keyserver.ubuntu.com 0xA5D32F012649A5A9 + +sudo apt-get update + +sudo apt-get install singularity-container +``` + +``` +singularity --version + singularity-ce version 4.1.1 +``` + +## Summary + - `PyDub` allows you to generate simple tones easily. + - FSK modulation can be achieved using `numpy` and + `scipy` to create varying frequency tones based on + binary input. + - Optionally `DTMF` encoding is implemented using + predefined frequency pairs, and you can detect + DTMF tones by analyzing the audio input. + + - Chirp SDK: + Chirp.io / https://www.sonos.com/en/home + https://github.com/chirp + + - GNU Radio - can be used to encode/modulate/demodulate. + https://www.gnuradio.org/ + Supports Frequency Shift Keying (FSK), + Phase Shift Keying (PSK), or Amplitude Modulation (AM). + + - `reedsolo` can be used for ECC (Error Correction Codes). + diff --git a/tools/audio-codes-requirements.txt b/tools/audio-codes-requirements.txt new file mode 100644 index 0000000..28f5f95 --- /dev/null +++ b/tools/audio-codes-requirements.txt @@ -0,0 +1,14 @@ +# draft requirements for audio-codes.py +pyzbar>=0.1.9 +opencv-python>=4.9.0.80 +numpy>=1.26.4 +click>=8.1.7 +pydantic>=2.7.1 +sounddevice>=0.5.1 +scipy>=1.14.1 +pydub>=0.25.1 +pyaudio>=0.2.14 +reedsolo>=1.7.0 +psychopy +psychopy-sounddevice +qrcode diff --git a/tools/audio-codes.py b/tools/audio-codes.py new file mode 100644 index 0000000..bab5a87 --- /dev/null +++ b/tools/audio-codes.py @@ -0,0 +1,428 @@ +import logging +import sys +import time +from datetime import datetime + +# NOTE: Early initialization of audio prefs is required otherwise +# use ~/.psychopy3/userPrefs.cfg [hardware] section, or load directly from +# some customPrefs.cfg file with prefs.loadFromFile API. +from psychopy import prefs + +#prefs.general['audioLib'] = 'sounddevice' +#prefs.hardware['audioDevice'] = 'HDA Intel PCH: ALC892 Digital (hw:0,1)' +#prefs.hardware['audioLib'] = ['PTB'] +prefs.hardware['audioLib'] = ['sounddevice'] +# + +# provide psychopy logs +from psychopy import logging as pl +#pl.console.setLevel(pl.NOTSET) +pl.console.setLevel(pl.DEBUG) + + +import numpy as np +import sounddevice as sd +from scipy.io.wavfile import write +from scipy.io import wavfile +from reedsolo import RSCodec + +from psychopy import core, sound, prefs +from psychtoolbox import audio + + +# init std logger +logger = logging.getLogger(__name__) +handler = logging.StreamHandler(sys.stderr) +formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') +handler.setFormatter(formatter) +logging.getLogger().addHandler(handler) +logger.setLevel(logging.DEBUG) + + +# audio barcode/qr helper functions + +def bit_enumerator(data): + if isinstance(data, DataMessage): + data = data.encode() + if isinstance(data, str): + # If data is a string, iterate over each character + for char in data: + if char not in ('0', '1'): + raise ValueError("String must only contain '0' and '1'.") + yield int(char) # Yield the bit as an integer + elif isinstance(data, bytes): + # If data is bytes, iterate over each byte + for byte in data: + for i in range(7, -1, -1): # Iterate from MSB to LSB + yield (byte >> i) & 1 # Extract and yield the bit + else: + raise TypeError("Data must be either a string or bytes. Got: " + str(type(data))) + + +# Convert a list of bits to bytes +def bits_to_bytes(detected_bits): + # Check if the length of detected_bits is a multiple of 8 + if len(detected_bits) % 8 != 0: + raise ValueError(f"Detected bits array must be aligned to 8 bits. Length={len(detected_bits)}") + + byte_array = bytearray() # Use bytearray for mutable bytes + for i in range(0, len(detected_bits), 8): + # Get the next 8 bits + byte_bits = detected_bits[i:i + 8] + + # Convert the list of bits to a byte (big-endian) + byte_value = 0 + for bit in byte_bits: + byte_value = (byte_value << 1) | bit # Shift left and add the bit + + byte_array.append(byte_value) # Append the byte to the bytearray + + return bytes(byte_array) # Convert bytearray to bytes + + +def crc8(data: bytes, polynomial: int = 0x31, init_value: int = 0x00) -> int: + crc = init_value + for byte in data: + crc ^= byte # XOR byte into the CRC + for _ in range(8): # Process each bit + if crc & 0x80: # If the highest bit is set + crc = (crc << 1) ^ polynomial # Shift left and XOR with polynomial + else: + crc <<= 1 # Just shift left + crc &= 0xFF # Keep CRC to 8 bits + return crc + + +def list_audio_devices(): + logger.debug("list_audio_devices()") + + logger.debug("[psychopy]") + logger.debug(f"audioLib : {prefs.hardware['audioLib']}") + logger.debug(f"audioDevice : {prefs.hardware['audioDevice']}") + + logger.debug("[sounddevice]") + devices = sd.query_devices() # Query all devices + for i, device in enumerate(devices): + logger.debug(f"device [{i}] : {device['name']}") + default_device = sd.default.device # Get the current default input/output devices + logger.debug(f"default in : {default_device[0]}") + logger.debug(f"default out : {default_device[1]}") + + logger.debug("[psytoolbox]") + for i, device in enumerate(audio.get_devices()): + logger.debug(f"device [{i}] : {device}") + + logger.debug("[psychopy.backend_ptb]") + # TODO: investigate why only single out device listed from + # USB capture but defult one is not shown + # logger.debug(sound.backend_ptb.getDevices()) + + + +# Class representing audio data message in big-endian +# format and encoded with Reed-Solomon error correction +# where the message is structured as follows: +# - 1st byte is CRC-8 checksum +# - 2nd byte is the length of the data +# - 3+ and the rest is the data itself +class DataMessage: + def __init__(self): + self.value: bytes = b'' + self.length: int = 0 + self.crc8: int = 0 + self.use_ecc: bool = True + self.rsc = RSCodec(4) + + def decode(self, data: bytes): + if self.use_ecc: + dec, dec_full, errata_pos_all = self.rsc.decode(data) + data = bytes(dec) + + #logger.debug(f"decoded data : {data}") + self.crc8 = data[0] + self.length = data[1] + self.value = data[2:] + n = crc8(self.value) + if self.crc8 != n: + raise ValueError(f"CRC-8 checksum mismatch: {self.crc8} <-> {n}") + + def encode(self) -> bytes: + logger.debug("size info") + logger.debug(f" - data : {len(self.value)} bytes, {self.value}") + b: bytes = bytes([self.crc8, self.length]) + self.value + logger.debug(f" - message : {len(b)} bytes, {b}") + if self.use_ecc: + b = bytes(self.rsc.encode(b)) + logger.debug(f" - ecc : {len(b)} bytes, {b}") + return b + + def get_bytes(self) -> bytes: + return self.value + + def get_str(self) -> str: + return self.value.decode("utf-8") + + def get_uint16(self) -> int: + if len(self.value) != 2: + raise ValueError(f"Data length for uint16 must be 2 bytes, " + f"but was {len(self.value)}") + return int.from_bytes(self.value, 'big') + + def get_uint32(self) -> int: + if len(self.value) != 4: + raise ValueError(f"Data length for uint32 must be 4 bytes, " + f"but was {len(self.value)}") + return int.from_bytes(self.value, 'big') + + def get_uint64(self) -> int: + if len(self.value) != 8: + raise ValueError(f"Data length for uint64 must be 8 bytes, " + f"but was {len(self.value)}") + return int.from_bytes(self.value, 'big') + + def set_bytes(self, data: bytes): + self.value = data + self.length = len(data) + self.crc8 = crc8(data) + + def set_str(self, s: str): + self.set_bytes(s.encode("utf-8")) + + def set_uint16(self, i: int): + self.set_bytes(i.to_bytes(2, 'big')) + + def set_uint32(self, i: int): + self.set_bytes(i.to_bytes(4, 'big')) + + def set_uint64(self, i: int): + self.set_bytes(i.to_bytes(8, 'big')) + + +# Class to generate/parse QR-like codes with FSK modulation +class AudioFsk: + def __init__(self, + f1=1000, + f0=5000, + sample_rate=44100, + duration=0.0070, + volume=0.75 + ): + self.f1 = f1 + self.f0 = f0 + self.sample_rate = sample_rate + self.duration = duration + if volume < 0.0 or volume > 1.0: + raise ValueError("Volume must be between 0.0 and 1.0.") + self.volume = volume + + def generate(self, data): + logger.debug(f"audio config : f1={self.f1} Hz, f0={self.f0} Hz, rate={self.sample_rate} Hz, bit duration={self.duration} sec, volume={self.volume}") + t = np.linspace(0, self.duration, + int(self.sample_rate * self.duration), + endpoint=False) + + # Create FSK signal + fsk_signal = np.array([]) + + c: int = 0 + sb: str = '' + for bit in bit_enumerator(data): + c += 1 + sb += str(bit) + if bit == 1: + fsk_signal = np.concatenate((fsk_signal, + self.volume * np.sin(2 * np.pi * self.f1 * t))) + else: + fsk_signal = np.concatenate((fsk_signal, + self.volume * np.sin(2 * np.pi * self.f0 * t))) + + # Normalize the signal for 100% volume + if self.volume==1.0: + fsk_signal /= np.max(np.abs(fsk_signal)) + logger.debug(f"audio raw bits: count={c}, {sb}") + logger.debug(f"audio duration: {c * self.duration:.6f} seconds") + return fsk_signal + + def play(self, data): + ts = time.perf_counter() + fsk_signal = self.generate(data) + ts = time.perf_counter() - ts + logger.debug(f"generate time : {ts:.6f} seconds") + + ts = time.perf_counter() + # Play the FSK signal + sd.play(fsk_signal, samplerate=self.sample_rate) + + # Wait until sound is finished playing + sd.wait() + ts = time.perf_counter() - ts + logger.debug(f"play time : {ts:.6f} seconds") + + + def save(self, data, filename): + fsk_signal = self.generate(data) + + # Save the signal to a WAV file + write(filename, self.sample_rate, + (fsk_signal * 32767).astype(np.int16)) + + def parse(self, filename): + # Read the WAV file + rate, data = wavfile.read(filename) + + # Check if audio is stereo and convert to mono if necessary + if len(data.shape) > 1: + data = data.mean(axis=1) + + # Calculate the number of samples for each bit duration + samples_per_bit = int(self.sample_rate * self.duration) + + # Prepare a list to hold the detected bits + detected_bits = [] + + # Analyze the audio in chunks + for i in range(0, len(data), samples_per_bit): + if i + samples_per_bit > len(data): + break # Avoid out of bounds + + # Extract the current chunk of audio + chunk = data[i:i + samples_per_bit] + + # Perform FFT on the chunk + fourier = np.fft.fft(chunk) + frequencies = np.fft.fftfreq(len(fourier), 1 / rate) + + # Get the magnitudes and filter out positive frequencies + magnitudes = np.abs(fourier) + positive_frequencies = frequencies[:len(frequencies) // 2] + positive_magnitudes = magnitudes[:len(magnitudes) // 2] + + # Find the peak frequency + peak_freq = positive_frequencies[np.argmax(positive_magnitudes)] + + # Determine if the peak frequency corresponds to a '1' or '0' + if abs(peak_freq - self.f1) < 50: # Frequency for '1' + detected_bits.append(1) + elif abs(peak_freq - self.f0) < 50: # Frequency for '0' + detected_bits.append(0) + + dbg_bits: str = ''.join([str(bit) for bit in detected_bits]) + logger.debug(f"detected bits : count={len(dbg_bits)}, {dbg_bits}") + return bits_to_bytes(detected_bits) + + +def beep_1(): + logger.debug("beep_1()") + logger.debug("play sound with sounddevice") + # Parameters + duration = 1.0 # seconds + frequency = 440 # Hz + sample_rate = 44100 # samples per second + + logger.debug(f"listen {frequency} Hz tone for {duration} seconds...") + + # Generate the sound wave + t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) # Time array + wave = 0.95 * np.sin(2 * np.pi * frequency * t) # Generate sine wave + + # Play the sound + sd.play(wave, samplerate=sample_rate) + sd.wait() # Wait until the sound has finished playing + logger.debug("done") + + logger.debug("save sound as beep_001.wav...") + filename = 'beep_001.wav' + write(filename, sample_rate, wave.astype(np.float32)) # Save as float32 + logger.debug("beep_1() done") + + +def beep_2(): + logger.debug("beep_2()") + af: AudioFsk = AudioFsk() + data = '1010111101011010' + '1010111101011010' + data = b'\x0A\xB1\xF5\xAA' + data = b'\x00\x00\x00\x00' + data = b'\xFF\xFF\xFF\xFF' + data = b'\xF1\xF2\xF3\xF4' + data = DataMessage() + n = 0xA001 + n = 1234 + logger.debug(f"encoded uint16: {n}") + data.set_uint16(n) + af.play(data) + logger.debug("save audio as : beep_002.wav...") + af.save(data, 'beep_002.wav') + logger.debug("beep_2() done") + + +def parse_beep_2(): + logger.debug("parse_beep_2()") + af: AudioFsk = AudioFsk() + detected_data = af.parse('beep_002.wav') + af: DataMessage = DataMessage() + af.decode(detected_data) + u16 = af.get_uint16() + ab: bytes = af.get_bytes() + logger.debug(f'detected data : {detected_data}') + logger.debug(f'parsed bytes : count={len(ab)}, {ab}') + logger.debug(f'parsed uint16 : {u16}') + logger.debug("parse_beep_2() done") + +def beep_3(): + logger.debug("beep_3()") + af: AudioFsk = AudioFsk() + data = DataMessage() + s: str = "Hello World! "+datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.debug(f"encoded str : {s}") + data.set_str(s) + af.play(data) + logger.debug("save audio as : beep_003.wav...") + af.save(data, 'beep_003.wav') + logger.debug("beep_3() done") + +def parse_beep_3(): + logger.debug("parse_beep_3()") + af: AudioFsk = AudioFsk() + detected_data = af.parse('beep_003.wav') + af: DataMessage = DataMessage() + af.decode(detected_data) + ab = af.get_bytes() + s = af.get_str() + logger.debug(f'detected data : {detected_data}') + logger.debug(f'parsed bytes : count={len(ab)}, {ab}') + logger.debug(f'parsed str : {s}') + logger.debug("parse_beep_3() done") + +def beep_4(): + logger.debug("beep_4()") + + logger.debug(f"play sound with psychopy {prefs.hardware['audioLib']}") + #snd = sound.Sound('D', secs=10.0, stereo=True) + snd = sound.Sound('beep_003.wav') + + snd.play() + core.wait(snd.duration) + logger.debug(f'Sound "{snd.sound}" has finished playing.') + + + +def main(): + logger.debug("----------------------------------------------------") + list_audio_devices() + logger.debug("----------------------------------------------------") + beep_1() + logger.debug("----------------------------------------------------") + beep_2() + logger.debug("----------------------------------------------------") + parse_beep_2() + logger.debug("----------------------------------------------------") + beep_3() + logger.debug("----------------------------------------------------") + parse_beep_3() + logger.debug("----------------------------------------------------") + beep_4() + logger.debug("audio-codes.py done") + + +if __name__ == "__main__": + main() diff --git a/tools/reprostim-timesync-stimuli b/tools/reprostim-timesync-stimuli index 3022abb..2a7fcc0 100755 --- a/tools/reprostim-timesync-stimuli +++ b/tools/reprostim-timesync-stimuli @@ -1,43 +1,84 @@ #!/usr/bin/env python3 -# TODO: review https://psychopy.org/api/sound/playback.html and possibly use PTB's -# facilities in PsychoPy for precise audio placement in time. -# from time import time, sleep t0 = time() -import glob +import click +from enum import Enum +import logging import sys import os +import shutil import json from datetime import datetime - import qrcode -from psychopy import prefs -prefs.hardware['audioLib'] = ['ptb', 'pyo','pygame'] -from psychopy import sound + + +# setup logging +logger = logging.getLogger(__name__) +handler = logging.StreamHandler(sys.stderr) +formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') +handler.setFormatter(formatter) +logging.getLogger().addHandler(handler) +logger.setLevel(logging.DEBUG) +logger.info("reprostim-timesync-stimuli script started") + +# setup psychopy logs +from psychopy import logging as pl +#pl.console.setLevel(pl.NOTSET) +pl.console.setLevel(pl.DEBUG) + + from psychopy import visual, core, event, clock +#from psychopy.hardware import keyboard + -import numpy as np +####################################################### +# Constants -# 'interval' -mode = 'event' -interval = 2 +# Enum for the mode of the script operation +class Mode(str, Enum): + # Listen for keyboard events to show codes + EVENT = "event" + # Show codes at regular intervals + INTERVAL = "interval" + # Just play a beep + BEEP = "beep" + # List audio/video devices + DEVICES = "devices" -logfn = sys.argv[1] -# logfn = "../data/{0}/run{1}_logfile.csv".format(acqNum, runNum) -if os.path.exists(logfn): - raise RuntimeError(f"Log file {logfn} already exists") + +####################################################### +# Functions def get_iso_time(t): return datetime.fromtimestamp(t).astimezone().isoformat() + +def get_output_file_name(prefix: str, + start_ts: datetime, + end_ts: datetime=None) -> str: + start_str: str = get_ts_str(start_ts) + end_str: str = get_ts_str(end_ts) if end_ts else "" + return f"{prefix}{start_str}--{end_str}.log" + def get_times(): t = time() return t, get_iso_time(t) -def mkrec(**kwargs): +def get_ts_str(ts: datetime) -> str: + ts_format = "%Y.%m.%d-%H.%M.%S.%f" + return f"{ts.strftime(ts_format)[:-3]}" + + +def log(f, rec): + s = json.dumps(rec).rstrip() + f.write(s + os.linesep) + logger.debug(f"LOG {s}") + + +def mkrec(mode, logfn, interval, **kwargs): t, tstr = get_times() kwargs.update({ "logfn": logfn, @@ -50,106 +91,307 @@ def mkrec(**kwargs): return kwargs -from psychopy import sound, core - -# Function to play a beep at a specific frequency and duration -def play_beep(frequency, duration, volume=1.0): - # Create a sound object with the specified frequency - beep = sound.Sound(frequency, secs=duration, volume=volume) - beep.play() - sound_time = get_times() - core.wait(duration) # Wait for the beep to finish - return sound_time - - -# print(json.dumps(mkrec(blah=123), indent=4)) - -f = open(logfn, "w") - -def log(rec): - f.write(json.dumps(rec).rstrip() + os.linesep) - -win = visual.Window(fullscr=True, screen=int(sys.argv[2])) -win.mouseVisible = False # hides the mouse pointer - -log(mkrec(event="started", start_time=t0, start_time_formatted=get_iso_time(t0))) - -message = visual.TextStim(win, text="""Waiting for scanner trigger.\nInstructions - for Participant...""") -message.draw() - -fixation = visual.TextStim(win, text='+') -reproinMessage = visual.TextStim(win, text="", pos=(0, -.7), - height=.05) - -win.flip() - - -fixation.draw() # Change properties of existing stim -win.flip() - -spd = 0.500 # Stimulus Presentation Duration -soa = 6.000 # Stimulus Onset Asynchrony -ntrials = 300 -iwt = 5 # Initial Wait Time between scanner trigger and first stimulus - -stim_images = [] -stim_names = [] -keys = [] # None received/expected - -clk = clock.Clock() -t_start = time() - +def check_keys(max_wait: float=0) -> list[str]: + keys: list[str] + if max_wait > 0: + keys = event.waitKeys(maxWait=max_wait) + else: + keys = event.getKeys() + logger.debug(f"keys={keys}") + return keys if keys else [] + +def safe_remove(file_name: str): + if file_name: + if os.path.isfile(file_name): # Check if it's a file + try: + os.remove(file_name) + logger.debug(f"File {file_name} deleted successfully.") + except Exception as e: + logger.error(f"Failed delete file {file_name}: {e}") + else: + logger.warning(f"File {file_name} does not exist or is not a valid file.") + +def store_soundcode(sound_file: str, sound_data: int, logfn: str): + # if sound_file exits, copy it to {logfn}soundcode_{sound_data}.wav + if os.path.isfile(sound_file): + sfile = f"{os.path.splitext(logfn)[0]}soundcode_{sound_data}.wav" + shutil.copy(sound_file, sfile) + + +####################################################### +# Main script code + +def do_init(logfn: str) -> bool: + if os.path.exists(logfn): + logger.error(f"Log file {logfn} already exists") + return False + return True + + +def do_main(mode: Mode, logfn: str, + display: int, qr_scale: float, + sound_codec: str, mute: bool, + ntrials: int, duration: float, interval: float, + keep_soundcode: bool) -> int: + logger.info("main script started") + + # late sound init + from soundcode import (beep, list_audio_devices, + save_soundcode, play_sound, + SoundCodec, SoundCodeInfo) + + #soundcode.logger.setLevel(logging.DEBUG) + + # print(json.dumps(mkrec(mode, logfn, interval, blah=123), indent=4)) + + if mode == Mode.BEEP: + for i in range(ntrials): + beep(interval*0.5, async_=True) + sleep(interval) + return 0 + + if mode == Mode.DEVICES: + list_audio_devices() + return 0 + + sound_data: int = 0 + sound_file: str = None + sound_info: SoundCodeInfo = None + f = open(logfn, "w") + + win = visual.Window(fullscr=True, screen=display) + win.mouseVisible = False # hides the mouse pointer + + log(f, mkrec(mode, logfn, interval, + event="started", start_time=t0, start_time_formatted=get_iso_time(t0))) + + message = visual.TextStim(win, text="""Waiting for scanner trigger.\nInstructions + for Participant...""") + message.draw() + + fixation = visual.TextStim(win, text='+') + reproinMessage = visual.TextStim(win, text="", pos=(0, -.7), + height=.05) -for acqNum in range(ntrials): + win.flip() - rec = mkrec( - event="trigger", - acqNum=acqNum - ) - if mode == 'event': - print("Waiting for an event") - keys = event.waitKeys(maxWait=120) # keyList=['5']) - elif mode == 'interval': - target_time = t_start + acqNum * interval - to_wait = target_time - time() - # sleep some part of it if long enough - if to_wait >= .2: - sleep(to_wait * 0.7) - # busy loop without sleep to not miss it - while time() < target_time: - pass - else: - raise ValueError(mode) - - freq = 2000 + (100*acqNum) - beep = sound.Sound(freq, secs=0.5, volume=0.8, sampleRate=44100, stereo=True) - beep.play() - rec['sound_time'] = get_times() - rec['sound_freq'] = freq - rec['keys'] = keys - tkeys, tkeys_str = get_times() - rec["keys_time"] = tkeys - rec["keys_time_str"] = tkeys_str - qr = visual.ImageStim(win, - qrcode.make(json.dumps(rec)), - pos=(0, 0) - ) - qr.size = qr.size *1 - qr.draw() + fixation.draw() # Change properties of existing stim win.flip() - tflip, tflip_str = get_times() - rec['time_flip'] = tflip - rec['time_flip_formatted'] = tflip_str - core.wait(0.5) - fixation.draw() - win.flip() - toff, toff_str = get_times() - rec['prior_time_off'] = toff - rec['prior_time_off_str'] = toff_str - log(rec) - if 'q' in keys: - break - -f.close() + + spd = 0.500 # Stimulus Presentation Duration + soa = 6.000 # Stimulus Onset Asynchrony + iwt = 5 # Initial Wait Time between scanner trigger and first stimulus + + stim_images = [] + stim_names = [] + keys = [] # None received/expected + + #kb = keyboard.Keyboard() + + clk = clock.Clock() + t_start = time() + + logger.debug(f"warming time: {(t_start-t0):.6f} sec") + logger.info(f"starting loop with {ntrials} trials...") + + + for acqNum in range(ntrials): + logger.debug(f"trial {acqNum}") + + rec = mkrec( + mode, + logfn, + interval, + event="trigger", + acqNum=acqNum + ) + + if mode == Mode.EVENT: + print("Waiting for an event") + keys = check_keys(120) # keyList=['5']) + elif mode == Mode.INTERVAL: + #keys = kb.getKeys(waitRelease=False) + keys = check_keys() + # prepare sound code file if any + if not mute: + if keep_soundcode and sound_file: + store_soundcode(sound_file, sound_data, logfn) + safe_remove(sound_file) + sound_data = acqNum + sound_file, sound_info_ = save_soundcode( + code_uint16=sound_data, + codec=sound_codec) + sound_info = sound_info_ + logger.debug(f" {sound_info}") + + target_time = t_start + acqNum * interval + to_wait = target_time - time() + # sleep some part of it if long enough + if to_wait >= .2: + sleep(to_wait * 0.7) + # busy loop without sleep to not miss it + while time() < target_time: + sleep(0) # pass CPU to other threads + else: + raise ValueError(mode) + + if not mute: + d = play_sound(sound_file, async_=True) + sound_time, sound_time_str = get_times() + rec['s_time'] = sound_time + rec['s_time_str'] = sound_time_str + rec['s_data'] = sound_data + rec['s_codec'] = sound_codec + rec['s_f0'] = sound_info.f0 + rec['s_f1'] = sound_info.f1 + rec['s_pre_delay'] = sound_info.pre_delay + rec['s_post_delay'] = sound_info.post_delay + rec['s_duration'] = sound_info.duration + if sound_codec == SoundCodec.NFE: + rec['s_freq'] = sound_info.nfe_freq + rec['s_df'] = sound_info.nfe_df + + # NOTE: should we add codec info to the log? + # like f0, f1, sampleRate, bit_duration, duration, etc + + rec['keys'] = keys + tkeys, tkeys_str = get_times() + rec["keys_time"] = tkeys + rec["keys_time_str"] = tkeys_str + qr = visual.ImageStim(win, + qrcode.make(json.dumps(rec)), + pos=(0, 0) + ) + qr.size = qr.size * qr_scale + qr.draw() + win.flip() + tflip, tflip_str = get_times() + rec['time_flip'] = tflip + rec['time_flip_formatted'] = tflip_str + core.wait(0.5) + fixation.draw() + win.flip() + toff, toff_str = get_times() + rec['prior_time_off'] = toff + rec['prior_time_off_str'] = toff_str + log(f, rec) + if 'q' in keys or 'escape' in keys: + break + + f.close() + + if keep_soundcode and sound_file: + store_soundcode(sound_file, sound_data, logfn) + # cleanup temporary sound file if any + safe_remove(sound_file) + logger.info("main script finished") + return 0 + + +@click.command(help='PsychoPy reprostim-timesync-stimuli script.') +@click.option('-m', '--mode', type=click.Choice([mode.value for mode in Mode], + case_sensitive=False), + default=Mode.EVENT, + help='Mode of operation: event, interval, or beep.') +@click.option('-o', '--output-prefix', default="output_", type=str, + help='Output log file name prefix.') +@click.option('-d', '--display', default=1, type=int, + help='Display number as an integer (default: 1).') +@click.option('-s', '--qr-scale', default=0.8, type=float, + help='Specify QR code scale factor in range 0..1. ' + 'Use 1.0 to fit full height (default: 0.8).') +@click.option('-a', '--audio-lib', + type=click.Choice(['psychopy_sounddevice', + 'psychopy_ptb', + 'sounddevice'], + case_sensitive=True), + default='psychopy_sounddevice', + help='Specify audio library to be used ' + '(default: psychopy_sounddevice).') +@click.option('-c', '--sound-codec', + type=click.Choice(['FSK', 'NFE'], + case_sensitive=True), + default='FSK', + help='Specify sound codec to produce audio code ' + '(default: FSK).') +@click.option('-m', '--mute', is_flag=True, default=False, + help="Disable sound codes generation (default is False).") +@click.option('-t', '--trials', default=300, type=int, + help='Specifies number of trials.') +@click.option('-d', '--duration', default=2, type=float, + help='Specifies script duration in seconds.') +@click.option('-i', '--interval', default=2, type=float, + help='Specifies interval value (default: 2.0).') +@click.option('-k', '--keep-soundcode', is_flag=True, default=False, + help="Store soundcode as separate .wav files " + "for debug purposes (default is False).") +@click.option('-l', '--log-level', default='DEBUG', + type=click.Choice(['DEBUG', 'INFO', + 'WARNING', 'ERROR', + 'CRITICAL']), + help='Set the logging level') +@click.pass_context +def main(ctx, mode: str, + output_prefix: str, + display: int, + qr_scale: float, + audio_lib: str, + sound_codec: str, + mute: bool, + trials: int, + duration: float, + interval: float, + keep_soundcode: bool, + log_level): + logger.setLevel(log_level) + # psychopy has similar logging levels like + # default logging module + #pl.console.setLevel(log_level) + start_ts: datetime = datetime.now() + logger.debug("reprostim-timesync-stimuli script started") + logger.debug(f" Started on : {start_ts}") + logger.debug(f" mode : {mode}") + logger.debug(f" prefix : {output_prefix}") + logger.debug(f" display : {display}") + logger.debug(f" audio_lib: {audio_lib}") + logger.debug(f" mute : {mute}") + logger.debug(f" duration : {duration}") + logger.debug(f" interval : {interval}") + + output: str = get_output_file_name(output_prefix, start_ts) + logger.debug(f" output : {output}") + + # setup environment variables + os.environ['REPROSTIM_AUDIO_LIB'] = audio_lib + os.environ['REPROSTIM_LOG_LEVEL'] = log_level + + if not do_init(output): + logger.error() + return -1 + + res = do_main(mode, output, + display, qr_scale, + sound_codec, mute, + trials, duration, interval, + keep_soundcode) + + end_ts: datetime = datetime.now() + logger.debug(f" Finished on: {end_ts}") + + # rename log file if any + output2: str = get_output_file_name(output_prefix, start_ts, end_ts) + if os.path.exists(output): + os.rename(output, output2) + logger.info(f"Output log renamed: {output} -> {output2}") + + logger.info(f"reprostim-timesync-stimuli script finished: {res}") + return res + + + +if __name__ == '__main__': + code = main() + logger.info(f"Exit on : {datetime.now()}") + logger.info(f"Exit code : {code}") + sys.exit(code) diff --git a/tools/soundcode.py b/tools/soundcode.py new file mode 100644 index 0000000..e601639 --- /dev/null +++ b/tools/soundcode.py @@ -0,0 +1,589 @@ +# TODO: Move to reprostim.soundcode module + +import logging +import os +import time +from datetime import datetime +import tempfile +from enum import Enum + +import numpy as np +import sounddevice as sd +from scipy.io.wavfile import write, read +from scipy.io import wavfile +from reedsolo import RSCodec + + +# setup logging +logger = logging.getLogger(__name__) +logger.setLevel(os.environ.get('REPROSTIM_LOG_LEVEL', 'INFO')) + +###################################### +# Setup psychopy audio library + +# Enum for the audio libs +class AudioLib(str, Enum): + # PsychoPy SoundDevice audio lib + PSYCHOPY_SOUNDDEVICE = "psychopy_sounddevice" + # PsychoPy SoundPTB audio lib + PSYCHOPY_PTB = "psychopy_ptb" + # sounddevice audio lib + # http://python-sounddevice.readthedocs.io/ + SOUNDDEVICE = "sounddevice" + +_audio_lib = os.environ.get('REPROSTIM_AUDIO_LIB', AudioLib.PSYCHOPY_SOUNDDEVICE) + +from psychopy import prefs +prefs.hardware['audioLib'] = ['sounddevice'] +if _audio_lib == AudioLib.PSYCHOPY_SOUNDDEVICE: + logger.debug("Set psychopy audio library: sounddevice") + prefs.hardware['audioLib'] = ['sounddevice'] +elif _audio_lib == AudioLib.PSYCHOPY_PTB: + logger.debug("Set psychopy audio library: ptb") + prefs.hardware['audioLib'] = ['ptb'] + +#logger.info("Using psychopy audio library: %s", prefs.hardware['audioLib']) +from psychopy import core, sound +from psychtoolbox import audio + + +###################################### +# Sound code/qr helper functions + +def bit_enumerator(data): + if isinstance(data, DataMessage): + data = data.encode() + if isinstance(data, str): + # If data is a string, iterate over each character + for char in data: + if char not in ('0', '1'): + raise ValueError("String must only contain '0' and '1'.") + yield int(char) # Yield the bit as an integer + elif isinstance(data, bytes): + # If data is bytes, iterate over each byte + for byte in data: + for i in range(7, -1, -1): # Iterate from MSB to LSB + yield (byte >> i) & 1 # Extract and yield the bit + else: + raise TypeError("Data must be either a string or bytes. Got: " + str(type(data))) + + +# Convert a list of bits to bytes +def bits_to_bytes(detected_bits): + # Check if the length of detected_bits is a multiple of 8 + if len(detected_bits) % 8 != 0: + raise ValueError(f"Detected bits array must be aligned to 8 bits. Length={len(detected_bits)}") + + byte_array = bytearray() # Use bytearray for mutable bytes + for i in range(0, len(detected_bits), 8): + # Get the next 8 bits + byte_bits = detected_bits[i:i + 8] + + # Convert the list of bits to a byte (big-endian) + byte_value = 0 + for bit in byte_bits: + byte_value = (byte_value << 1) | bit # Shift left and add the bit + + byte_array.append(byte_value) # Append the byte to the bytearray + + return bytes(byte_array) # Convert bytearray to bytes + + +def crc8(data: bytes, polynomial: int = 0x31, init_value: int = 0x00) -> int: + crc = init_value + for byte in data: + crc ^= byte # XOR byte into the CRC + for _ in range(8): # Process each bit + if crc & 0x80: # If the highest bit is set + crc = (crc << 1) ^ polynomial # Shift left and XOR with polynomial + else: + crc <<= 1 # Just shift left + crc &= 0xFF # Keep CRC to 8 bits + return crc + +###################################### +# Constants + +class SoundCodec(str, Enum): + # Frequency Shift Keying (FSK) where binary data is + # encoded as two different frequencies f0 and f1 with + # a fixed bit duration (baud rate or bit_rate). + FSK = "FSK" + + # Numerical Frequency Encoding (NFE) numbers are mapped + # directly to specific frequencies + # can encode only some numeric hash. + NFE = "NFE" + +###################################### +# Classes + +# Class representing audio data message in big-endian +# format and encoded with Reed-Solomon error correction +# where the message is structured as follows: +# - 1st byte is CRC-8 checksum +# - 2nd byte is the length of the data +# - 3+ and the rest is the data itself +class DataMessage: + def __init__(self): + self.value: bytes = b'' + self.length: int = 0 + self.crc8: int = 0 + self.use_ecc: bool = True + self.rsc = RSCodec(4) + + def decode(self, data: bytes): + if self.use_ecc: + dec, dec_full, errata_pos_all = self.rsc.decode(data) + data = bytes(dec) + + #logger.debug(f"decoded data : {data}") + self.crc8 = data[0] + self.length = data[1] + self.value = data[2:] + n = crc8(self.value) + if self.crc8 != n: + raise ValueError(f"CRC-8 checksum mismatch: {self.crc8} <-> {n}") + + def encode(self) -> bytes: + logger.debug("size info") + logger.debug(f" - data : {len(self.value)} bytes, {self.value}") + b: bytes = bytes([self.crc8, self.length]) + self.value + logger.debug(f" - message : {len(b)} bytes, {b}") + if self.use_ecc: + b = bytes(self.rsc.encode(b)) + logger.debug(f" - ecc : {len(b)} bytes, {b}") + return b + + def get_bytes(self) -> bytes: + return self.value + + def get_str(self) -> str: + return self.value.decode("utf-8") + + def get_uint(self) -> int: + c = len(self.value) + if c == 2: + return self.get_uint16() + elif c == 4: + return self.get_uint32() + elif c == 8: + return self.get_uint64() + else: + raise ValueError(f"Data length must be 2, 4, or 8 bytes, " + f"but was {c}") + + def get_uint16(self) -> int: + if len(self.value) != 2: + raise ValueError(f"Data length for uint16 must be 2 bytes, " + f"but was {len(self.value)}") + return int.from_bytes(self.value, 'big') + + def get_uint32(self) -> int: + if len(self.value) != 4: + raise ValueError(f"Data length for uint32 must be 4 bytes, " + f"but was {len(self.value)}") + return int.from_bytes(self.value, 'big') + + def get_uint64(self) -> int: + if len(self.value) != 8: + raise ValueError(f"Data length for uint64 must be 8 bytes, " + f"but was {len(self.value)}") + return int.from_bytes(self.value, 'big') + + def set_bytes(self, data: bytes): + self.value = data + self.length = len(data) + self.crc8 = crc8(data) + + def set_str(self, s: str): + self.set_bytes(s.encode("utf-8")) + + def set_uint16(self, i: int): + self.set_bytes(i.to_bytes(2, 'big')) + + def set_uint32(self, i: int): + self.set_bytes(i.to_bytes(4, 'big')) + + def set_uint64(self, i: int): + self.set_bytes(i.to_bytes(8, 'big')) + + +# Class to provide general information about sound code +class SoundCodeInfo: + def __init__(self): + self.codec = None + self.f1 = None + self.f0 = None + self.nfe_df = None + self.sample_rate = None + self.bit_duration = None + self.nfe_duration = None + self.nfe_freq = None + self.bit_count = None + self.volume = None + self.duration = None + self.pre_delay = None + self.post_delay = None + + # to string + def __str__(self): + return (f"SoundCodeInfo(codec={self.codec}, " + f"f1={self.f1}, " + f"f0={self.f0}, " + f"nfe_df={self.nfe_df}, " + f"rate={self.sample_rate}, " + f"bit_duration={self.bit_duration}, " + f"bit_count={self.bit_count}, " + f"nfe_freq={self.nfe_freq}, " + f"volume={self.volume}, " + f"duration={self.duration})" + f"pre_delay={self.pre_delay}, " + f"post_delay={self.post_delay}") + + +# Class to generate/parse QR-like sound codes with FSK modulation +class SoundCodeEngine: + def __init__(self, + codec=SoundCodec.FSK, + f0=1000, + f1=5000, + nfe_df=100, # used only in NFE + sample_rate=44100, + bit_duration=0.0070, # used only in FSK + nfe_duration=0.3, # used only in NFE + volume=0.80, + pre_delay=0.1, + pre_f=0, #1780 + post_delay=0.1, + post_f=0 #3571 + ): + self.codec = codec + self.f0 = f0 + self.f1 = f1 + self.nfe_df = nfe_df + self.sample_rate = sample_rate + self.bit_duration = bit_duration + self.nfe_duration = nfe_duration + if volume < 0.0 or volume > 1.0: + raise ValueError("Volume must be between 0.0 and 1.0.") + self.volume = volume + self.pre_delay = pre_delay + self.pre_f = pre_f + self.post_delay = post_delay + self.post_f = post_f + + def generate_sin(self, freq_hz, duration_sec): + t = np.linspace(0, duration_sec, + int(self.sample_rate * duration_sec), + endpoint=False) + signal = self.volume * np.sin(2 * np.pi * freq_hz * t) + return signal + + + def generate_fsk(self, data) -> (np.array, SoundCodeInfo): + logger.debug(f"audio config : f1={self.f1} Hz, f0={self.f0} Hz, rate={self.sample_rate} Hz, bit duration={self.bit_duration} sec, volume={self.volume}") + t = np.linspace(0, self.bit_duration, + int(self.sample_rate * self.bit_duration), + endpoint=False) + + # Create FSK signal + fsk_signal = np.array([]) + pre_signal = np.array([]) + post_signal = np.array([]) + + # generate pre-signal if any + if self.pre_delay > 0: + pre_signal = self.generate_sin(self.pre_f, self.pre_delay) + + # generate post-signal if any + if self.post_delay > 0: + post_signal = self.generate_sin(self.post_f, self.post_delay) + + # generate data signal properly + c: int = 0 + sb: str = '' + for bit in bit_enumerator(data): + c += 1 + sb += str(bit) + if bit == 1: + fsk_signal = np.concatenate((fsk_signal, + self.volume * np.sin(2 * np.pi * self.f1 * t))) + else: + fsk_signal = np.concatenate((fsk_signal, + self.volume * np.sin(2 * np.pi * self.f0 * t))) + + # concatenate pre-signal, data signal, and post-signal + if self.pre_delay > 0 or self.post_delay > 0: + fsk_signal = np.concatenate((pre_signal, fsk_signal, post_signal)) + + # Normalize the signal for 100% volume + if self.volume==1.0: + fsk_signal /= np.max(np.abs(fsk_signal)) + + sci: SoundCodeInfo = SoundCodeInfo() + sci.codec = SoundCodec.FSK + sci.f1 = self.f1 + sci.f0 = self.f0 + sci.sample_rate = self.sample_rate + sci.bit_duration = self.bit_duration + sci.bit_count = c + sci.volume = self.volume + sci.duration = (c * self.bit_duration + + self.pre_delay + self.post_delay) + sci.pre_delay = self.pre_delay + sci.post_delay = self.post_delay + + logger.debug(f"audio raw bits: count={c}, {sb}") + logger.debug(f"audio duration: {sci.duration:.6f} seconds") + return (fsk_signal, sci) + + + def generate_nfe(self, data) -> (np.array, SoundCodeInfo): + # Create NFE signal + nfe_signal = np.array([]) + pre_signal = np.array([]) + post_signal = np.array([]) + + # generate pre-signal if any + if self.pre_delay > 0: + pre_signal = self.generate_sin(self.pre_f, self.pre_delay) + + # generate post-signal if any + if self.post_delay > 0: + post_signal = self.generate_sin(self.post_f, self.post_delay) + + n = data.get_uint() + c = int((self.f1 - self.f0) / self.nfe_df) + 1 + freq = self.f0 + (n % c) * self.nfe_df + logger.debug(f" n={n}, c={c}, freq={freq}") + nfe_signal = self.generate_sin(freq, self.nfe_duration) + + # concatenate pre-signal, data signal, and post-signal + if self.pre_delay > 0 or self.post_delay > 0: + nfe_signal = np.concatenate((pre_signal, nfe_signal, post_signal)) + + # Normalize the signal for 100% volume + if self.volume==1.0: + nfe_signal /= np.max(np.abs(nfe_signal)) + + sci: SoundCodeInfo = SoundCodeInfo() + sci.codec = SoundCodec.NFE + sci.f1 = self.f1 + sci.f0 = self.f0 + sci.nfe_df = self.nfe_df + sci.sample_rate = self.sample_rate + sci.nfe_duration = self.nfe_duration + sci.nfe_freq = freq + sci.volume = self.volume + sci.duration = (self.nfe_duration + + self.pre_delay + self.post_delay) + sci.pre_delay = self.pre_delay + sci.post_delay = self.post_delay + + + logger.debug(f"audio duration: {sci.duration:.6f} seconds") + return nfe_signal, sci + + + def generate(self, data) -> (np.array, SoundCodeInfo): + if self.codec == SoundCodec.FSK: + return self.generate_fsk(data) + elif self.codec == SoundCodec.NFE: + return self.generate_nfe(data) + else: + raise ValueError(f"Unsupported codec: {self.codec}") + + + # play sound data with sounddevice + def play_data_sd(self, data): + ts = time.perf_counter() + fsk_signal, sci = self.generate(data) + ts = time.perf_counter() - ts + logger.debug(f"generate time : {ts:.6f} seconds") + + ts = time.perf_counter() + # Play the FSK signal + sd.play(fsk_signal, samplerate=self.sample_rate) + + # Wait until sound is finished playing + sd.wait() + ts = time.perf_counter() - ts + logger.debug(f"play time : {ts:.6f} seconds") + + + def save(self, data, filename): + fsk_signal, sci = self.generate(data) + + # Save the signal to a WAV file + write(filename, self.sample_rate, + (fsk_signal * 32767).astype(np.int16)) + return sci + + + def parse(self, filename): + # Read the WAV file + rate, data = wavfile.read(filename) + + # Check if audio is stereo and convert to mono if necessary + if len(data.shape) > 1: + data = data.mean(axis=1) + + # Calculate the number of samples for each bit duration + samples_per_bit = int(self.sample_rate * self.bit_duration) + + # Prepare a list to hold the detected bits + detected_bits = [] + + # Analyze the audio in chunks + for i in range(0, len(data), samples_per_bit): + if i + samples_per_bit > len(data): + break # Avoid out of bounds + + # Extract the current chunk of audio + chunk = data[i:i + samples_per_bit] + + # Perform FFT on the chunk + fourier = np.fft.fft(chunk) + frequencies = np.fft.fftfreq(len(fourier), 1 / rate) + + # Get the magnitudes and filter out positive frequencies + magnitudes = np.abs(fourier) + positive_frequencies = frequencies[:len(frequencies) // 2] + positive_magnitudes = magnitudes[:len(magnitudes) // 2] + + # Find the peak frequency + peak_freq = positive_frequencies[np.argmax(positive_magnitudes)] + + # Determine if the peak frequency corresponds to a '1' or '0' + if abs(peak_freq - self.f1) < 50: # Frequency for '1' + detected_bits.append(1) + elif abs(peak_freq - self.f0) < 50: # Frequency for '0' + detected_bits.append(0) + + dbg_bits: str = ''.join([str(bit) for bit in detected_bits]) + logger.debug(f"detected bits : count={len(dbg_bits)}, {dbg_bits}") + return bits_to_bytes(detected_bits) + +###################################### +# Public functions + +def beep(duration: float = 2.0, async_: bool = False): + logger.debug(f"beep(duration={duration})") + play_sound('A', duration, async_) + + +def list_audio_devices(): + logger.debug("list_audio_devices()") + + logger.debug("[psychopy]") + logger.debug(f"audioLib : {prefs.hardware['audioLib']}") + logger.debug(f"audioDevice : {prefs.hardware['audioDevice']}") + + logger.debug("[sounddevice]") + devices = sd.query_devices() # Query all devices + for i, device in enumerate(devices): + logger.debug(f"device [{i}] : {device['name']}") + default_device = sd.default.device # Get the current default input/output devices + logger.debug(f"default in : {default_device[0]}") + logger.debug(f"default out : {default_device[1]}") + + logger.debug("[psytoolbox]") + for i, device in enumerate(audio.get_devices()): + logger.debug(f"device [{i}] : {device}") + + logger.debug("[psychopy.backend_ptb]") + # TODO: investigate why only single out device listed from + # USB capture but defult one is not shown + # logger.debug(sound.backend_ptb.getDevices()) + + + +def _play_sound_psychopy(name: str, + duration: float = None, + volume: float = 0.8, + sample_rate: int = 44100, + async_: bool = False): + logger.debug(f"_play_sound_psychopy(name={name}, duration={duration}, async_={async_})") + snd = None + if duration: + snd = sound.Sound(name, secs=duration, sampleRate=sample_rate, + stereo=True, volume=volume) + else: + snd = sound.Sound(name, stereo=True, sampleRate=sample_rate, + volume=volume) + logger.debug(f"Play sound '{snd.sound}' with psychopy {prefs.hardware['audioLib']}") + snd.play() + logger.debug(f" sampleRate={snd.sampleRate}, duration={snd.duration}, volume={snd.volume}") + if not async_: + logger.debug("Waiting for sound to finish playing...") + core.wait(snd.duration) + logger.debug(f"Sound '{snd.sound}' has finished playing.") + + +def _play_sound_sd(name: str, + duration: float = None, + volume: float = 0.8, + sample_rate: int = 44100, + async_: bool = False): + logger.debug(f"_play_sound_sd(name={name}, duration={duration}, async_={async_})") + data = name + + if os.path.exists(name): + rate, signal = read(name) + logger.debug(f"Read sound file: {name}, rate={rate}") + # Convert from int16 to float32 + #signal = signal.astype(np.float32) / 32767 + data = signal + + sd.play(data, samplerate=sample_rate) + + +def play_sound(name: str, + duration: float = None, + volume: float = 0.8, + sample_rate: int = 44100, + async_: bool = False): + logger.debug(f"play_sound(name={name}, duration={duration}, async_={async_})") + if (_audio_lib == AudioLib.PSYCHOPY_SOUNDDEVICE or + _audio_lib == AudioLib.PSYCHOPY_PTB): + _play_sound_psychopy(name, duration, volume, sample_rate, async_) + elif _audio_lib == AudioLib.SOUNDDEVICE: + _play_sound_sd(name, duration, volume, sample_rate, async_) + else: + raise ValueError(f"Unsupported audio library: {_audio_lib}") + + +def save_soundcode(fname: str = None, + code_uint16: int = None, + code_uint32: int = None, + code_uint64: int = None, + code_str: str = None, + code_bytes: bytes = None, + codec: SoundCodec = SoundCodec.FSK, + engine=None) -> (str, SoundCodeInfo): + logger.debug(f"save_sound(fname={fname}...)") + if not fname: + fname = tempfile.mktemp( + prefix=f"soundcode_{datetime.now().strftime('%Y%m%d_%H%M%S%f')}_", + suffix=".wav") + + data = DataMessage() + if not code_uint16 is None: + data.set_uint16(code_uint16) + elif not code_uint32 is None: + data.set_uint32(code_uint32) + elif not code_uint64 is None: + data.set_uint64(code_uint64) + elif code_str: + data.set_str(code_str) + elif code_bytes: + data.set_bytes(code_bytes) + else: + raise ValueError("No code data provided.") + + if not engine: + engine = SoundCodeEngine(codec=codec) + sci: SoundCodeInfo = engine.save(data, fname) + logger.debug(f" -> {fname}") + return (fname, sci)