forked from malinoff/aionursery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
aionursery.py
122 lines (101 loc) · 3.37 KB
/
aionursery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
Manage concurrent asyncio tasks.
"""
import asyncio
import textwrap
import traceback
__all__ = ('Nursery', 'NurseryClosed', 'MultiError')
class Nursery:
"""
Manages concurrent tasks.
"""
def __init__(self):
self._loop = asyncio.get_event_loop()
self._children = set()
self._pending_excs = []
self._parent_task = None
self.closed = False
def start_soon(self, coro) -> asyncio.Task:
"""
Creates a new child task inside this nursery.
Note that there's no guarantee it will ever be executed, for example,
when the parent task immediately cancels this child.
"""
if self.closed:
raise NurseryClosed
task = asyncio.ensure_future(coro, loop=self._loop)
task.add_done_callback(self._child_finished)
self._children.add(task)
return task
def cancel_remaining(self):
"""
Cancel all remaining running tasks.
"""
current_task = asyncio.Task.current_task()
for task in self._children:
if task is current_task:
continue
task.cancel()
def _child_finished(self, task):
self._children.remove(task)
try:
exc = task.exception()
except asyncio.CancelledError:
pass
else:
if exc is not None:
self._add_exc(exc)
def _add_exc(self, exc):
self._pending_excs.append(exc)
self._loop.call_soon(self._parent_task.cancel)
async def __aenter__(self):
if self.closed:
raise NurseryClosed
self._parent_task = asyncio.Task.current_task(self._loop)
return self
async def __aexit__(self, exc_type, exc, _):
if exc_type is asyncio.CancelledError and not self._pending_excs:
# Parent was cancelled, cancel all children
for child in self._children.copy():
child.cancel()
elif exc is not None and exc_type is not asyncio.CancelledError:
self._add_exc(exc)
try:
while self._children:
await asyncio.gather(*self._children, return_exceptions=True)
except asyncio.CancelledError:
pass
self.closed = True
if self._pending_excs:
raise MultiError(self._pending_excs)
def __del__(self):
assert not self._children
class NurseryClosed(Exception):
"""
Raises when somebody tries to use a closed nursery.
"""
class MultiError(Exception):
"""
Gathers multiple exceptions into one, providing a sane __str__.
All raised exceptions are available as ``exceptions`` property.
"""
def __init__(self, exceptions):
super().__init__(exceptions)
self.exceptions = exceptions
def __str__(self):
lines = [super().__str__()]
for idx, exc in enumerate(self.exceptions):
tb_lines = ''.join(traceback.format_exception(
type(exc), exc, exc.__traceback__,
))
lines += [
'Details of embedded exception {}:\n'.format(idx),
textwrap.indent(tb_lines, ' '),
]
return '\n'.join(lines)
def __iter__(self):
return iter(self.exceptions)
def __len__(self):
return len(self.exceptions)
def __getitem__(self, item):
return self.exceptions[item]