-
Notifications
You must be signed in to change notification settings - Fork 0
/
bounded_iterator.py
40 lines (27 loc) · 1.01 KB
/
bounded_iterator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from threading import BoundedSemaphore
from typing import Iterable, Iterator, TypeVar
T = TypeVar("T")
class BoundedIterator(Iterator):
"""Limits the number of values to yield until yielded values are
acknowledged.
"""
def __init__(self, bound, it: Iterable[T]):
self._it = iter(it)
self._sem = BoundedSemaphore(bound)
def __iter__(self) -> Iterator[T]:
return self
def __next__(self) -> T:
return self.next()
def next(self, timeout=None) -> T:
"""Returns the next value from the iterable.
This method is not thread-safe.
:raises TimeoutError: if timeout is given and no value is acknowledged in the mean time.
"""
if not self._sem.acquire(timeout=timeout):
raise TimeoutError("Too many values un-acknowledged.")
return next(self._it)
def processed(self):
"""Acknowledges one value allowing another one to be yielded.
This method is thread-safe.
"""
self._sem.release()