Skip to content

Commit

Permalink
Added --sound-codec argument and possibility to generate sound code w…
Browse files Browse the repository at this point in the history
…ith FSK or NFE codecs in runtime, #115.
  • Loading branch information
vmdocua committed Dec 3, 2024
1 parent 99fdcae commit edf02e2
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 27 deletions.
45 changes: 36 additions & 9 deletions tools/reprostim-timesync-stimuli
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ from psychopy import visual, core, event, clock

# Enum for the audio libs
class AudioLib(str, Enum):
# PsychoPy SoundDevice audio lib
PSYCHOPY_SOUNDDEVICE = "psychopy_sounddevice"
# PsychoPy SoundPTB audio lib
PSYCHOPY_PTB = "psychopy_ptb"
# sounddevice audio lib
# http://python-sounddevice.readthedocs.io/
SOUNDDEVICE = "sounddevice"
PTB = "ptb"


# Enum for the mode of the script operation
Expand Down Expand Up @@ -133,15 +138,16 @@ def do_init(logfn: str) -> bool:
return True


def do_main(mode: Mode, logfn: str, display: int, mute: bool,
def do_main(mode: Mode, logfn: str, display: int,
sound_codec: str, mute: bool,
ntrials: int, duration: float, interval: float,
keep_soundcode: bool) -> int:
logger.info("main script started")

# late sound init
from soundcode import (beep, list_audio_devices,
save_soundcode, play_sound,
SoundCodeInfo)
SoundCodec, SoundCodeInfo)

#soundcode.logger.setLevel(logging.DEBUG)

Expand Down Expand Up @@ -222,7 +228,10 @@ def do_main(mode: Mode, logfn: str, display: int, mute: bool,
store_soundcode(sound_file, sound_data, logfn)
safe_remove(sound_file)
sound_data = acqNum
sound_file, sound_info = save_soundcode(code_uint16=sound_data)
sound_file, sound_info_ = save_soundcode(
code_uint16=sound_data,
codec=sound_codec)
sound_info = sound_info_
logger.debug(f" {sound_info}")

target_time = t_start + acqNum * interval
Expand All @@ -239,9 +248,19 @@ def do_main(mode: Mode, logfn: str, display: int, mute: bool,
if not mute:
d = play_sound(sound_file, async_=True)
sound_time, sound_time_str = get_times()
rec['sound_time'] = sound_time
rec['sound_time_str'] = sound_time_str
rec['sound_data'] = sound_data
rec['s_time'] = sound_time
rec['s_time_str'] = sound_time_str
rec['s_data'] = sound_data
rec['s_codec'] = sound_codec
rec['s_f0'] = sound_info.f0
rec['s_f1'] = sound_info.f1
rec['s_pre_delay'] = sound_info.pre_delay
rec['s_post_delay'] = sound_info.post_delay
rec['s_duration'] = sound_info.duration
if sound_codec == SoundCodec.NFE:
rec['s_freq'] = sound_info.nfe_freq
rec['s_df'] = sound_info.nfe_df

# NOTE: should we add codec info to the log?
# like f0, f1, sampleRate, bit_duration, duration, etc

Expand Down Expand Up @@ -291,8 +310,14 @@ def do_main(mode: Mode, logfn: str, display: int, mute: bool,
@click.option('-a', '--audio-lib',
type=click.Choice([a.value for a in AudioLib],
case_sensitive=False),
default=AudioLib.SOUNDDEVICE,
default=AudioLib.PSYCHOPY_SOUNDDEVICE,
help='Specify audio library to be used.')
@click.option('-c', '--sound-codec',
type=click.Choice(['fsk', 'nfe'],
case_sensitive=False),
default='fsk',
help='Specify sound codec to produce audio code '
'(default: fsk).')
@click.option('-m', '--mute', is_flag=True, default=False,
help="Disable sound codes generation (default is False).")
@click.option('-t', '--trials', default=300, type=int,
Expand All @@ -314,6 +339,7 @@ def main(ctx, mode: str,
output_prefix: str,
display: int,
audio_lib: str,
sound_codec: str,
mute: bool,
trials: int,
duration: float,
Expand Down Expand Up @@ -346,7 +372,8 @@ def main(ctx, mode: str,
logger.error()
return -1

res = do_main(mode, output, display, mute,
res = do_main(mode, output, display,
sound_codec, mute,
trials, duration, interval,
keep_soundcode)

Expand Down
135 changes: 117 additions & 18 deletions tools/soundcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ def get_bytes(self) -> bytes:
def get_str(self) -> str:
return self.value.decode("utf-8")

def get_uint(self) -> int:
c = len(self.value)
if c == 2:
return self.get_uint16()
elif c == 4:
return self.get_uint32()
elif c == 8:
return self.get_uint64()
else:
raise ValueError(f"Data length must be 2, 4, or 8 bytes, "
f"but was {c}")

def get_uint16(self) -> int:
if len(self.value) != 2:
raise ValueError(f"Data length for uint16 must be 2 bytes, "
Expand Down Expand Up @@ -164,52 +176,75 @@ def set_uint64(self, i: int):


class SoundCodec(str, Enum):
# Frequency Shift Keying (FSK) where binary data is
# encoded as two different frequencies f0 and f1 with
# a fixed bit duration (baud rate or bit_rate).
FSK = "fsk"

# Numerical Frequency Encoding (NFE) numbers are mapped
# directly to specific frequencies
# can encode only some numeric hash.
NFE = "nfe"


# Class to provide general information about sound code
class SoundCodeInfo:
def __init__(self):
self.codec = None
self.f1 = None
self.f0 = None
self.nfe_df = None
self.sample_rate = None
self.bit_duration = None
self.nfe_duration = None
self.nfe_freq = None
self.bit_count = None
self.volume = None
self.duration = None
self.pre_delay = None
self.post_delay = None

# to string
def __str__(self):
return (f"SoundCodeInfo(codec={self.codec}, "
f"f1={self.f1}, "
f"f0={self.f0}, "
f"nfe_df={self.nfe_df}, "
f"rate={self.sample_rate}, "
f"bit_duration={self.bit_duration}, "
f"bit_count={self.bit_count}, "
f"nfe_freq={self.nfe_freq}, "
f"volume={self.volume}, "
f"duration={self.duration})")
f"duration={self.duration})"
f"pre_delay={self.pre_delay}, "
f"post_delay={self.post_delay}")


# Class to generate/parse QR-like sound codes with FSK modulation
class SoundCodeFsk:
class SoundCodeEngine:
def __init__(self,
f1=1000,
f0=5000,
codec=SoundCodec.FSK,
f0=1000,
f1=5000,
nfe_df=100, # used only in NFE
sample_rate=44100,
bit_duration=0.0070,
bit_duration=0.0070, # used only in FSK
#bit_duration=0.014,
nfe_duration=0.3, # used only in NFE
volume=0.95,
pre_delay=0.1,
#pre_f=1780,
pre_f=0,
post_delay=0.1,
post_f=0 #3571
post_f=0 #3571
):
self.f1 = f1
self.codec = codec
self.f0 = f0
self.f1 = f1
self.nfe_df = nfe_df
self.sample_rate = sample_rate
self.bit_duration = bit_duration
self.nfe_duration = nfe_duration
if volume < 0.0 or volume > 1.0:
raise ValueError("Volume must be between 0.0 and 1.0.")
self.volume = volume
Expand All @@ -218,8 +253,15 @@ def __init__(self,
self.post_delay = post_delay
self.post_f = post_f

def generate_sin(self, freq_hz, duration_sec):
t = np.linspace(0, duration_sec,
int(self.sample_rate * duration_sec),
endpoint=False)
signal = self.volume * np.sin(2 * np.pi * freq_hz * t)
return signal

def generate(self, data) -> (np.array, SoundCodeInfo):

def generate_fsk(self, data) -> (np.array, SoundCodeInfo):
logger.debug(f"audio config : f1={self.f1} Hz, f0={self.f0} Hz, rate={self.sample_rate} Hz, bit duration={self.bit_duration} sec, volume={self.volume}")
t = np.linspace(0, self.bit_duration,
int(self.sample_rate * self.bit_duration),
Expand All @@ -232,17 +274,11 @@ def generate(self, data) -> (np.array, SoundCodeInfo):

# generate pre-signal if any
if self.pre_delay > 0:
t_pre = np.linspace(0, self.pre_delay,
int(self.sample_rate * self.pre_delay),
endpoint=False)
pre_signal = self.volume * np.sin(2 * np.pi * self.pre_f * t_pre)
pre_signal = self.generate_sin(self.pre_f, self.pre_delay)

# generate post-signal if any
if self.post_delay > 0:
t_post = np.linspace(0, self.post_delay,
int(self.sample_rate * self.post_delay),
endpoint=False)
post_signal = self.volume * np.sin(2 * np.pi * self.post_f * t_post)
post_signal = self.generate_sin(self.post_f, self.post_delay)

# generate data signal properly
c: int = 0
Expand Down Expand Up @@ -275,12 +311,72 @@ def generate(self, data) -> (np.array, SoundCodeInfo):
sci.volume = self.volume
sci.duration = (c * self.bit_duration +
self.pre_delay + self.post_delay)
sci.pre_delay = self.pre_delay
sci.post_delay = self.post_delay

logger.debug(f"audio raw bits: count={c}, {sb}")
logger.debug(f"audio duration: {sci.duration:.6f} seconds")
return (fsk_signal, sci)

def play(self, data):

def generate_nfe(self, data) -> (np.array, SoundCodeInfo):
# Create NFE signal
nfe_signal = np.array([])
pre_signal = np.array([])
post_signal = np.array([])

# generate pre-signal if any
if self.pre_delay > 0:
pre_signal = self.generate_sin(self.pre_f, self.pre_delay)

# generate post-signal if any
if self.post_delay > 0:
post_signal = self.generate_sin(self.post_f, self.post_delay)

n = data.get_uint()
c = int((self.f1 - self.f0) / self.nfe_df) + 1
freq = self.f0 + (n % c) * self.nfe_df
logger.debug(f" n={n}, c={c}, freq={freq}")
nfe_signal = self.generate_sin(freq, self.nfe_duration)

# concatenate pre-signal, data signal, and post-signal
if self.pre_delay > 0 or self.post_delay > 0:
nfe_signal = np.concatenate((pre_signal, nfe_signal, post_signal))

# Normalize the signal for 100% volume
if self.volume==1.0:
nfe_signal /= np.max(np.abs(nfe_signal))

sci: SoundCodeInfo = SoundCodeInfo()
sci.codec = SoundCodec.NFE
sci.f1 = self.f1
sci.f0 = self.f0
sci.nfe_df = self.nfe_df
sci.sample_rate = self.sample_rate
sci.nfe_duration = self.nfe_duration
sci.nfe_freq = freq
sci.volume = self.volume
sci.duration = (self.nfe_duration +
self.pre_delay + self.post_delay)
sci.pre_delay = self.pre_delay
sci.post_delay = self.post_delay


logger.debug(f"audio duration: {sci.duration:.6f} seconds")
return nfe_signal, sci


def generate(self, data) -> (np.array, SoundCodeInfo):
if self.codec == SoundCodec.FSK:
return self.generate_fsk(data)
elif self.codec == SoundCodec.NFE:
return self.generate_nfe(data)
else:
raise ValueError(f"Unsupported codec: {self.codec}")


# play sound data with sounddevice
def play_data_sd(self, data):
ts = time.perf_counter()
fsk_signal, sci = self.generate(data)
ts = time.perf_counter() - ts
Expand All @@ -302,6 +398,8 @@ def save(self, data, filename):
# Save the signal to a WAV file
write(filename, self.sample_rate,
(fsk_signal * 32767).astype(np.int16))
return sci


def parse(self, filename):
# Read the WAV file
Expand Down Expand Up @@ -407,6 +505,7 @@ def save_soundcode(fname: str = None,
code_uint64: int = None,
code_str: str = None,
code_bytes: bytes = None,
codec: SoundCodec = SoundCodec.FSK,
engine=None) -> (str, SoundCodeInfo):
logger.debug(f"save_sound(fname={fname}...)")
if not fname:
Expand All @@ -429,7 +528,7 @@ def save_soundcode(fname: str = None,
raise ValueError("No code data provided.")

if not engine:
engine = SoundCodeFsk()
engine = SoundCodeEngine(codec=codec)
sci: SoundCodeInfo = engine.save(data, fname)
logger.debug(f" -> {fname}")
return (fname, sci)

0 comments on commit edf02e2

Please sign in to comment.