From 2b708112ddce9c0e8e9a99819de266ab30f9d538 Mon Sep 17 00:00:00 2001 From: James Ward Date: Sun, 29 Oct 2023 21:38:02 -0400 Subject: [PATCH] chore: add docstring and use `[]` rather than `list()` --- asyncio/queue.py | 68 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/asyncio/queue.py b/asyncio/queue.py index c179ead..ee45b24 100644 --- a/asyncio/queue.py +++ b/asyncio/queue.py @@ -2,17 +2,32 @@ class QueueEmpty(Exception): + """Raised when Queue.get_nowait() is called on an empty Queue.""" pass class QueueFull(Exception): + """Raised when the Queue.put_nowait() method is called on a full Queue.""" pass class Queue: + """ + A queue, useful for coordinating producer and consumer coroutines. + + If maxsize is less than or equal to zero, the queue size is infinite. If it + is an integer greater than 0, then "await put()" will block when the + queue reaches maxsize, until an item is removed by get(). + + Unlike CPython's asyncio.Queue, this implementation is backed by a list rather + than `collections.deque` because smaller boards may not have the library + implemented. + """ + def __init__(self, maxsize=0): self.maxsize = maxsize - self._queue = list() + + self._queue = [] self._join_counter = 0 self._join_event = event.Event() @@ -37,41 +52,86 @@ def _put(self, val): self._put_event.clear() async def get(self): + """ + Remove and return an item from the queue. + + If queue is empty, wait until an item is available. + """ while self.empty(): await self._put_event.wait() return self._get() def get_nowait(self): + """ + Remove and return an item from the queue. + + If queue is empty, raise QueueEmpty. + """ if self.empty(): raise QueueEmpty() return self._get() async def put(self, val): + """ + Put an item into the queue. + + If the queue is full, waits until a free + slot is available before adding item. + """ while self.full(): await self._get_event.wait() self._put(val) def put_nowait(self, val): + """ + Put an item into the queue. + + If the queue is full, raises QueueFull. + """ if self.full(): raise QueueFull() self._put(val) def qsize(self): + """ + Number of items in this queue. + """ return len(self._queue) def empty(self): + """ + Return True if the queue is empty. + """ return len(self._queue) == 0 def full(self): + """ + Return True if there are maxsize items in the queue. + """ return 0 < self.maxsize <= self.qsize() def task_done(self): - self._join_counter -= 1 + """ + Indicate that a formerly enqueued task is complete. + + If a join() is currently blocking, it will resume when all items have + been processed (meaning that a task_done() call was received for every + item that had been put() into the queue). - if self._join_counter <= 0: + Raises ValueError if called more times than there were items placed in + the queue. + """ + if self._join_counter == 0: # Can't have less than 0 - self._join_counter = 0 + raise ValueError("task_done() called too many times") + + self._join_counter -= 1 + + if self._join_counter == 0: self._join_event.set() async def join(self): + """ + Block until all items in the queue have been gotten and processed. + """ await self._join_event.wait()