forked from schufo/plla-tisvs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
testx.py
238 lines (197 loc) · 6.87 KB
/
testx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""
This file is a modified version of https://github.com/sigsep/open-unmix-pytorch/blob/master/test.py
"""
import torch
import numpy as np
import argparse
import soundfile as sf
import norbert
import json
from pathlib import Path
import scipy.signal
import resampy
import model
import utils
import warnings
import tqdm
from contextlib import redirect_stderr
import io
import model_utls
def load_model(target, model_name='umxhq', device='cpu'):
"""
target model path can be either <target>.pth, or <target>-sha256.pth
(as used on torchub)
"""
model_path = Path(model_name).expanduser()
if not model_path.exists():
# model path does not exist, use hubconf model
try:
# disable progress bar
err = io.StringIO()
with redirect_stderr(err):
return torch.hub.load(
'sigsep/open-unmix-pytorch',
model_name.split('/')[1],
target=target,
device=device,
pretrained=True
)
print(err.getvalue())
except AttributeError:
raise NameError('Model does not exist on torchhub')
# assume model is a path to a local model_name direcotry
else:
# load model from disk
with open(Path(model_path, target + '.json'), 'r') as stream:
results = json.load(stream)
target_model_path = next(Path(model_path).glob("%s*.pth" % target))
state = torch.load(
target_model_path,
map_location=device
)
architecture = results['args']['architecture']
model_class = model_utls.ModelLoader.get_model(architecture)
unmix = model_class.from_config(results['args'])
unmix.load_state_dict(state)
unmix.stft.center = True
unmix.eval()
unmix.to(device)
return unmix
def istft(X, rate=44100, n_fft=4096, n_hopsize=1024):
t, audio = scipy.signal.istft(
X / (n_fft / 2),
rate,
nperseg=n_fft,
noverlap=n_fft - n_hopsize,
boundary=True
)
return audio
def separate(
inputs,
targets,
model_name='umxhq',
niter=1, softmask=False, alpha=1.0,
residual_model=False, device='cpu', args=None):
"""
Performing the separation on audio input
Parameters
----------
inputs: tuple of mixture and side info: (np.ndarray [shape=(nb_samples, nb_channels, nb_timesteps)], torch.tensor)
mixture audio
(comment by Kilian: it looks like the expected np.ndarray shape is actually (nb_timesteps, nb_channels),
the torch tensor audio_torch then gets the shape (nb_samples, nb_channels, nb_timesteps))
targets: list of str
a list of the separation targets.
Note that for each target a separate model is expected
to be loaded.
model_name: str
name of torchhub model or path to model folder, defaults to `umxhq`
niter: int
Number of EM steps for refining initial estimates in a
post-processing stage, defaults to 1.
softmask: boolean
if activated, then the initial estimates for the sources will
be obtained through a ratio mask of the mixture STFT, and not
by using the default behavior of reconstructing waveforms
by using the mixture phase, defaults to False
alpha: float
changes the exponent to use for building ratio masks, defaults to 1.0
residual_model: boolean
computes a residual target, for custom separation scenarios
when not all targets are available, defaults to False
device: str
set torch device. Defaults to `cpu`.
Returns
-------
estimates: `dict` [`str`, `np.ndarray`]
dictionary of all restimates as performed by the separation model.
"""
mix = inputs[0]
text = inputs[1].to(device)
# convert numpy audio to torch
audio_torch = torch.tensor(mix.T[None, ...]).float().to(device)
model_names = [model_name]
if model_name == 'trained_models/umx':
for i in range(len(targets) - 1):
model_names.append('trained_models/umx')
source_names = []
V = []
for j, target in enumerate(tqdm.tqdm(targets)):
unmix_target = load_model(
target=target,
model_name=model_names[j],
device=device
)
if args != None:
if args.optimal_path_attention:
unmix_target.optimal_path_alphas = True
with torch.no_grad():
if args.alignment_from:
attention_weights = inputs[2].to(device)
Vj = unmix_target((audio_torch, text, attention_weights)).cpu().detach().numpy()
else:
Vj = unmix_target((audio_torch, text)).cpu().detach().numpy()
if softmask:
# only exponentiate the model if we use softmask
Vj = Vj**alpha
# output is nb_frames, nb_samples, nb_channels, nb_bins
V.append(Vj[:, 0, ...]) # remove sample dim
source_names += [target]
V = np.transpose(np.array(V), (1, 3, 2, 0))
X = unmix_target.stft(audio_torch).detach().cpu().numpy()
# convert to complex numpy type
X = X[..., 0] + X[..., 1]*1j
X = X[0].transpose(2, 1, 0)
if residual_model or len(targets) == 1:
V = norbert.residual_model(V, X, alpha if softmask else 1)
source_names += (['residual'] if len(targets) > 1
else ['accompaniment'])
Y = norbert.wiener(V, X.astype(np.complex128), niter,
use_softmask=softmask)
estimates = {}
for j, name in enumerate(source_names):
audio_hat = istft(
Y[..., j].T,
n_fft=unmix_target.stft.n_fft,
n_hopsize=unmix_target.stft.n_hop
)
estimates[name] = audio_hat.T
return estimates
def inference_args(parser, remaining_args):
inf_parser = argparse.ArgumentParser(
description=__doc__,
parents=[parser],
add_help=True,
formatter_class=argparse.RawDescriptionHelpFormatter
)
inf_parser.add_argument(
'--softmask',
dest='softmask',
action='store_true',
help=('if enabled, will initialize separation with softmask.'
'otherwise, will use mixture phase with spectrogram')
)
inf_parser.add_argument(
'--niter',
type=int,
default=1,
help='number of iterations for refining results.'
)
inf_parser.add_argument(
'--alpha',
type=float,
default=1.0,
help='exponent in case of softmask separation'
)
inf_parser.add_argument(
'--samplerate',
type=int,
default=44100,
help='model samplerate'
)
inf_parser.add_argument(
'--residual-model',
action='store_true',
help='create a model for the residual'
)
return inf_parser.parse_args()