-
Notifications
You must be signed in to change notification settings - Fork 0
/
sim.py
103 lines (90 loc) · 3.34 KB
/
sim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import logging
import numpy as np
from artiq import *
import miqro
logger = logging.getLogger(__name__)
class MiqroSim:
"""MIQRO simulator."""
def __init__(self, events):
self.events = events
self.tau = 4 * ns
# state
self.profile = np.zeros((16, 32), "i4,u2,u2")
self.window = np.zeros((1 << 10, 2), "i2")
self.cfg = np.zeros(16, "u1")
def get_rf(self):
ev = np.array(self.events, "i8,u4,u4").ravel()
ev.sort(order=["f0"])
addr = 0
for t, a, d in ev:
if a == miqro.PHASER_ADDR_MIQRO_MEM_ADDR | 0x80:
addr = d
elif a == miqro.PHASER_ADDR_MIQRO_MEM_DATA | 0x80:
self._handle_mem(addr, d)
addr += 1
elif 0 <= a - 0x100 < 3:
self._handle_cfg(a - 0x100, d)
else:
raise ValueError("addr", a)
if a - 0x100 == 0:
yield self._handle_trigger(t, d)
def _handle_mem(self, addr, data):
channel = addr >> 15
if channel != 0:
raise NotImplementedError
if addr & miqro.PHASER_MIQRO_SEL_PROFILE:
mem = self.profile
else:
mem = self.window
mem = mem.view("u4").ravel()
mem[addr & 0x3FFF] = data
def _handle_cfg(self, a, d):
split = [0, 4, 10, 16]
for osc in range(*split[a : a + 2]):
idx = (osc - split[a]) * 5
if a == 0:
idx += 10
profile = (d >> idx) & 0x1F
self.cfg[osc] = profile
def _handle_trigger(self, t, d):
logger.info(f"trigger t={t}, {t*rtio_period:g} s")
window = self._get_window(d & 0x3FF)
ts = int(t * (rtio_period / self.tau))
sig = self._get_sum(ts, window.shape[0]) * window / (1 << 31)
return ts, sig
def _get_sum(self, ts, n):
sum = np.zeros(n, "c8")
for osc in range(16):
f, a, p = self.profile[osc, self.cfg[osc]]
if a == 0:
continue
logging.info(
f"osc {osc}: profile {self.cfg[osc]} ({f / (1 << 32) / self.tau / MHz:g} MHz, {a / (1 << 16):g} @{p / (1 << 16):g} turn)"
)
p1 = (f * (ts - 1) + (p << 16)).astype(np.int32)
p1 = (
np.cumsum(np.lib.stride_tricks.as_strided(np.array(f), (n,), (0,))) + p1
)
sum += a * np.exp(2j * np.pi / (1 << 32) * p1)
return sum
def _get_window(self, start):
header = self.window[start].view("u4").ravel()[0]
length = header & 0x3FF
rate = ((header >> 10) & 0x3FF) + 1
shift = (header >> 22) & 0x3F
order = (header >> 28) & 0x3
head = (header >> 30) & 1
tail = (header >> 31) & 1
if not (head and tail):
raise NotImplementedError
duration = (length + order) * rate - order
logger.info(
f"window start={start:#x} len={length:#x} order={order} rate={rate}: {duration * self.tau / us:g} µs duration"
)
window = self.window[start + 1 : start + length + 1]
window = np.repeat(window[:, 0] + 1j * window[:, 1], rate)
rect = np.ones(rate)
for _ in range(order):
window = np.convolve(window, rect)
assert window.shape[0] == duration
return window / (1 << shift)