forked from danielballan/adaptive-with-delay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ioc.py
31 lines (24 loc) · 924 Bytes
/
ioc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python3
import random
from caproto.server import pvproperty, PVGroup, ioc_arg_parser, run
import numpy
class RandomWalkIOC(PVGroup):
pos = pvproperty(value=-5.0)
dt = pvproperty(value=3.0)
x = pvproperty(value=0.0)
@x.startup
async def x(self, instance, async_lib):
'Periodically update the value'
while True:
# compute next value
x = numpy.exp(-self.pos.value**2)
# update the ChannelData instance and notify any subscribers
await instance.write(value=x)
# Let the async library wait for the next iteration
await async_lib.library.sleep(self.dt.value)
if __name__ == '__main__':
ioc_options, run_options = ioc_arg_parser(
default_prefix='random_walk:',
desc='Run an IOC with a random-walking value.')
ioc = RandomWalkIOC(**ioc_options)
run(ioc.pvdb, **run_options)