forked from Amanbhandula/SmokeDetector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
45 lines (32 loc) · 1.03 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
import threading
class Tasks:
loop = asyncio.new_event_loop()
@classmethod
def _run(cls):
asyncio.set_event_loop(cls.loop)
try:
cls.loop.run_forever()
finally:
cls.loop.close()
@classmethod
def do(cls, func, *args, **kwargs):
handle = cls.loop.call_soon(lambda: func(*args, **kwargs))
cls.loop._write_to_self()
return handle
@classmethod
def later(cls, func, *args, after=None, **kwargs):
handle = cls.loop.call_later(after, lambda: func(*args, **kwargs))
cls.loop._write_to_self()
return handle
@classmethod
def periodic(cls, func, *args, interval=None, **kwargs):
@asyncio.coroutine
def f():
while True:
yield from asyncio.sleep(interval)
func(*args, **kwargs)
handle = cls.loop.create_task(f())
cls.loop._write_to_self()
return handle
threading.Thread(name="tasks", target=Tasks._run, daemon=True).start()