-
Notifications
You must be signed in to change notification settings - Fork 112
/
meta-manager
executable file
·122 lines (85 loc) · 3.59 KB
/
meta-manager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
import os
import argparse
import json
import datetime
def Annotation(**kwargs):
t = {}
if 'width' in kwargs:
t['freq_upper_edge'] = round(kwargs['freq']+kwargs['width']/2)
t['freq_lower_edge'] = round(kwargs['freq']-kwargs['width']/2)
del kwargs['width']
del kwargs['freq']
if 'count' in kwargs:
t['sample_count'] = kwargs['count']
del kwargs['count']
if 'start' in kwargs:
t['sample_start'] = kwargs['start']
del kwargs['start']
for a in kwargs:
t[a] = kwargs[a]
return {"core:"+k: v for k, v in t.items()}
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="store_true",
help="increase output verbosity")
parser.add_argument("-d", "--debug", action="store_true")
parser.add_argument("-w", "--write", help="write new file")
parser.add_argument("-p", "--peaks", help="add peaks from file")
filters = parser.add_argument_group('filters')
filters.add_argument("--f-min", metavar="freq", type=int, help="minmum frequency")
filters.add_argument("--f-max", metavar="freq", type=int, help="maximum frequency")
filters.add_argument("--t-max", metavar="seconds", type=int, help="maxiumum time")
filters.add_argument("--c-max", metavar="count", type=int, help="maxiumum number of annotations")
parser.add_argument("metafile", help="filename")
args = parser.parse_args()
with open(args.metafile) as f:
data = json.load(f)
assert(len(data["captures"]) == 1)
assert(data["captures"][0]["core:sample_start"] == 0)
assert(data["captures"])
rate = int(data["global"]["core:sample_rate"])
dt = data["captures"][0]["core:datetime"]
freq = int(data["captures"][0]["core:frequency"])
print(f'{args.metafile}: [{data["global"]["core:datatype"]}] {rate/1e6:.9g}Msps @{freq/1e9:.9g}GHz')
# Remove empty annotations
ann = [a for a in data["annotations"] if len(a) > 0]
print(f'in {len(ann)} annotations')
if args.peaks is not None:
with open(args.peaks) as f:
for line in f:
try:
sample, fftbin, _ = line.split(",")
sample = int(sample)
fftbin = int(fftbin)
except main:
pass
bins = 8192
freq = freq + rate * (fftbin / bins - 0.5)
width = rate / bins
a = Annotation(comment="Peak", description="", freq=freq, width=width, start=sample, count=bins)
ann.append(a)
if args.f_max is not None:
ann = [a for a in ann if a["core:freq_upper_edge"] > args.f_max]
if args.f_min is not None:
ann = [a for a in ann if a["core:freq_lower_edge"] < args.f_min]
if args.t_max is not None:
ann = [a for a in ann if a["core:sample_start"] < args.t_max*rate]
if args.c_max is not None:
ann = sorted(ann, key=lambda v: v["core:sample_start"])[:args.c_max]
data["annotations"] = ann
fmin = min([a["core:freq_lower_edge"] for a in data["annotations"]])
fmax = max([a["core:freq_upper_edge"] for a in data["annotations"]])
tmin = min([a["core:sample_start"] for a in data["annotations"]])
tmax = max([a["core:sample_start"] for a in data["annotations"]])
if False:
for a in data["annotations"]:
print(f'{a["core:sample_start"]}')
pass
def s2t(samples): # convert samples to time
return samples / rate
print(f'out {len(data["annotations"])} annotations')
print(f' {fmin/1e9:.6f} - {fmax/1e9:.6f} GHz / {s2t(tmin):.3f} - {s2t(tmax):.3f} sec')
if args.write:
os.rename(args.metafile, args.metafile+".bak")
with open(args.metafile, "w") as f:
json.dump(data, f, indent=0)