diff --git a/tools/reprostim-timesync-stimuli b/tools/reprostim-timesync-stimuli index c2e8413..d339552 100755 --- a/tools/reprostim-timesync-stimuli +++ b/tools/reprostim-timesync-stimuli @@ -37,8 +37,13 @@ from psychopy import visual, core, event, clock # Enum for the audio libs class AudioLib(str, Enum): + # PsychoPy SoundDevice audio lib + PSYCHOPY_SOUNDDEVICE = "psychopy_sounddevice" + # PsychoPy SoundPTB audio lib + PSYCHOPY_PTB = "psychopy_ptb" + # sounddevice audio lib + # http://python-sounddevice.readthedocs.io/ SOUNDDEVICE = "sounddevice" - PTB = "ptb" # Enum for the mode of the script operation @@ -133,7 +138,8 @@ def do_init(logfn: str) -> bool: return True -def do_main(mode: Mode, logfn: str, display: int, mute: bool, +def do_main(mode: Mode, logfn: str, display: int, + sound_codec: str, mute: bool, ntrials: int, duration: float, interval: float, keep_soundcode: bool) -> int: logger.info("main script started") @@ -141,7 +147,7 @@ def do_main(mode: Mode, logfn: str, display: int, mute: bool, # late sound init from soundcode import (beep, list_audio_devices, save_soundcode, play_sound, - SoundCodeInfo) + SoundCodec, SoundCodeInfo) #soundcode.logger.setLevel(logging.DEBUG) @@ -222,7 +228,10 @@ def do_main(mode: Mode, logfn: str, display: int, mute: bool, store_soundcode(sound_file, sound_data, logfn) safe_remove(sound_file) sound_data = acqNum - sound_file, sound_info = save_soundcode(code_uint16=sound_data) + sound_file, sound_info_ = save_soundcode( + code_uint16=sound_data, + codec=sound_codec) + sound_info = sound_info_ logger.debug(f" {sound_info}") target_time = t_start + acqNum * interval @@ -239,9 +248,19 @@ def do_main(mode: Mode, logfn: str, display: int, mute: bool, if not mute: d = play_sound(sound_file, async_=True) sound_time, sound_time_str = get_times() - rec['sound_time'] = sound_time - rec['sound_time_str'] = sound_time_str - rec['sound_data'] = sound_data + rec['s_time'] = sound_time + rec['s_time_str'] = sound_time_str + rec['s_data'] = sound_data + rec['s_codec'] = sound_codec + rec['s_f0'] = sound_info.f0 + rec['s_f1'] = sound_info.f1 + rec['s_pre_delay'] = sound_info.pre_delay + rec['s_post_delay'] = sound_info.post_delay + rec['s_duration'] = sound_info.duration + if sound_codec == SoundCodec.NFE: + rec['s_freq'] = sound_info.nfe_freq + rec['s_df'] = sound_info.nfe_df + # NOTE: should we add codec info to the log? # like f0, f1, sampleRate, bit_duration, duration, etc @@ -291,8 +310,14 @@ def do_main(mode: Mode, logfn: str, display: int, mute: bool, @click.option('-a', '--audio-lib', type=click.Choice([a.value for a in AudioLib], case_sensitive=False), - default=AudioLib.SOUNDDEVICE, + default=AudioLib.PSYCHOPY_SOUNDDEVICE, help='Specify audio library to be used.') +@click.option('-c', '--sound-codec', + type=click.Choice(['fsk', 'nfe'], + case_sensitive=False), + default='fsk', + help='Specify sound codec to produce audio code ' + '(default: fsk).') @click.option('-m', '--mute', is_flag=True, default=False, help="Disable sound codes generation (default is False).") @click.option('-t', '--trials', default=300, type=int, @@ -314,6 +339,7 @@ def main(ctx, mode: str, output_prefix: str, display: int, audio_lib: str, + sound_codec: str, mute: bool, trials: int, duration: float, @@ -346,7 +372,8 @@ def main(ctx, mode: str, logger.error() return -1 - res = do_main(mode, output, display, mute, + res = do_main(mode, output, display, + sound_codec, mute, trials, duration, interval, keep_soundcode) diff --git a/tools/soundcode.py b/tools/soundcode.py index 015ea82..d596548 100644 --- a/tools/soundcode.py +++ b/tools/soundcode.py @@ -127,6 +127,18 @@ def get_bytes(self) -> bytes: def get_str(self) -> str: return self.value.decode("utf-8") + def get_uint(self) -> int: + c = len(self.value) + if c == 2: + return self.get_uint16() + elif c == 4: + return self.get_uint32() + elif c == 8: + return self.get_uint64() + else: + raise ValueError(f"Data length must be 2, 4, or 8 bytes, " + f"but was {c}") + def get_uint16(self) -> int: if len(self.value) != 2: raise ValueError(f"Data length for uint16 must be 2 bytes, " @@ -164,8 +176,16 @@ def set_uint64(self, i: int): class SoundCodec(str, Enum): + # Frequency Shift Keying (FSK) where binary data is + # encoded as two different frequencies f0 and f1 with + # a fixed bit duration (baud rate or bit_rate). FSK = "fsk" + # Numerical Frequency Encoding (NFE) numbers are mapped + # directly to specific frequencies + # can encode only some numeric hash. + NFE = "nfe" + # Class to provide general information about sound code class SoundCodeInfo: @@ -173,43 +193,58 @@ def __init__(self): self.codec = None self.f1 = None self.f0 = None + self.nfe_df = None self.sample_rate = None self.bit_duration = None + self.nfe_duration = None + self.nfe_freq = None self.bit_count = None self.volume = None self.duration = None + self.pre_delay = None + self.post_delay = None # to string def __str__(self): return (f"SoundCodeInfo(codec={self.codec}, " f"f1={self.f1}, " f"f0={self.f0}, " + f"nfe_df={self.nfe_df}, " f"rate={self.sample_rate}, " f"bit_duration={self.bit_duration}, " f"bit_count={self.bit_count}, " + f"nfe_freq={self.nfe_freq}, " f"volume={self.volume}, " - f"duration={self.duration})") + f"duration={self.duration})" + f"pre_delay={self.pre_delay}, " + f"post_delay={self.post_delay}") # Class to generate/parse QR-like sound codes with FSK modulation -class SoundCodeFsk: +class SoundCodeEngine: def __init__(self, - f1=1000, - f0=5000, + codec=SoundCodec.FSK, + f0=1000, + f1=5000, + nfe_df=100, # used only in NFE sample_rate=44100, - bit_duration=0.0070, + bit_duration=0.0070, # used only in FSK #bit_duration=0.014, + nfe_duration=0.3, # used only in NFE volume=0.95, pre_delay=0.1, #pre_f=1780, pre_f=0, post_delay=0.1, - post_f=0 #3571 + post_f=0 #3571 ): - self.f1 = f1 + self.codec = codec self.f0 = f0 + self.f1 = f1 + self.nfe_df = nfe_df self.sample_rate = sample_rate self.bit_duration = bit_duration + self.nfe_duration = nfe_duration if volume < 0.0 or volume > 1.0: raise ValueError("Volume must be between 0.0 and 1.0.") self.volume = volume @@ -218,8 +253,15 @@ def __init__(self, self.post_delay = post_delay self.post_f = post_f + def generate_sin(self, freq_hz, duration_sec): + t = np.linspace(0, duration_sec, + int(self.sample_rate * duration_sec), + endpoint=False) + signal = self.volume * np.sin(2 * np.pi * freq_hz * t) + return signal - def generate(self, data) -> (np.array, SoundCodeInfo): + + def generate_fsk(self, data) -> (np.array, SoundCodeInfo): logger.debug(f"audio config : f1={self.f1} Hz, f0={self.f0} Hz, rate={self.sample_rate} Hz, bit duration={self.bit_duration} sec, volume={self.volume}") t = np.linspace(0, self.bit_duration, int(self.sample_rate * self.bit_duration), @@ -232,17 +274,11 @@ def generate(self, data) -> (np.array, SoundCodeInfo): # generate pre-signal if any if self.pre_delay > 0: - t_pre = np.linspace(0, self.pre_delay, - int(self.sample_rate * self.pre_delay), - endpoint=False) - pre_signal = self.volume * np.sin(2 * np.pi * self.pre_f * t_pre) + pre_signal = self.generate_sin(self.pre_f, self.pre_delay) # generate post-signal if any if self.post_delay > 0: - t_post = np.linspace(0, self.post_delay, - int(self.sample_rate * self.post_delay), - endpoint=False) - post_signal = self.volume * np.sin(2 * np.pi * self.post_f * t_post) + post_signal = self.generate_sin(self.post_f, self.post_delay) # generate data signal properly c: int = 0 @@ -275,12 +311,72 @@ def generate(self, data) -> (np.array, SoundCodeInfo): sci.volume = self.volume sci.duration = (c * self.bit_duration + self.pre_delay + self.post_delay) + sci.pre_delay = self.pre_delay + sci.post_delay = self.post_delay logger.debug(f"audio raw bits: count={c}, {sb}") logger.debug(f"audio duration: {sci.duration:.6f} seconds") return (fsk_signal, sci) - def play(self, data): + + def generate_nfe(self, data) -> (np.array, SoundCodeInfo): + # Create NFE signal + nfe_signal = np.array([]) + pre_signal = np.array([]) + post_signal = np.array([]) + + # generate pre-signal if any + if self.pre_delay > 0: + pre_signal = self.generate_sin(self.pre_f, self.pre_delay) + + # generate post-signal if any + if self.post_delay > 0: + post_signal = self.generate_sin(self.post_f, self.post_delay) + + n = data.get_uint() + c = int((self.f1 - self.f0) / self.nfe_df) + 1 + freq = self.f0 + (n % c) * self.nfe_df + logger.debug(f" n={n}, c={c}, freq={freq}") + nfe_signal = self.generate_sin(freq, self.nfe_duration) + + # concatenate pre-signal, data signal, and post-signal + if self.pre_delay > 0 or self.post_delay > 0: + nfe_signal = np.concatenate((pre_signal, nfe_signal, post_signal)) + + # Normalize the signal for 100% volume + if self.volume==1.0: + nfe_signal /= np.max(np.abs(nfe_signal)) + + sci: SoundCodeInfo = SoundCodeInfo() + sci.codec = SoundCodec.NFE + sci.f1 = self.f1 + sci.f0 = self.f0 + sci.nfe_df = self.nfe_df + sci.sample_rate = self.sample_rate + sci.nfe_duration = self.nfe_duration + sci.nfe_freq = freq + sci.volume = self.volume + sci.duration = (self.nfe_duration + + self.pre_delay + self.post_delay) + sci.pre_delay = self.pre_delay + sci.post_delay = self.post_delay + + + logger.debug(f"audio duration: {sci.duration:.6f} seconds") + return nfe_signal, sci + + + def generate(self, data) -> (np.array, SoundCodeInfo): + if self.codec == SoundCodec.FSK: + return self.generate_fsk(data) + elif self.codec == SoundCodec.NFE: + return self.generate_nfe(data) + else: + raise ValueError(f"Unsupported codec: {self.codec}") + + + # play sound data with sounddevice + def play_data_sd(self, data): ts = time.perf_counter() fsk_signal, sci = self.generate(data) ts = time.perf_counter() - ts @@ -302,6 +398,8 @@ def save(self, data, filename): # Save the signal to a WAV file write(filename, self.sample_rate, (fsk_signal * 32767).astype(np.int16)) + return sci + def parse(self, filename): # Read the WAV file @@ -407,6 +505,7 @@ def save_soundcode(fname: str = None, code_uint64: int = None, code_str: str = None, code_bytes: bytes = None, + codec: SoundCodec = SoundCodec.FSK, engine=None) -> (str, SoundCodeInfo): logger.debug(f"save_sound(fname={fname}...)") if not fname: @@ -429,7 +528,7 @@ def save_soundcode(fname: str = None, raise ValueError("No code data provided.") if not engine: - engine = SoundCodeFsk() + engine = SoundCodeEngine(codec=codec) sci: SoundCodeInfo = engine.save(data, fname) logger.debug(f" -> {fname}") return (fname, sci)