diff --git a/pyproject.toml b/pyproject.toml index b47fc7e..312daca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,3 +51,6 @@ build-backend = "poetry.core.masonry.api" [tool.pytest] addopts = "-n auto" testpaths = ["tests"] + +[tool.pytest.ini_options] +markers = ["raises"] diff --git a/qasync/__init__.py b/qasync/__init__.py index d9dc8aa..3363e97 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -373,10 +373,10 @@ def run_forever(self): self.__log_debug("Starting Qt event loop") asyncio.events._set_running_loop(self) rslt = -1 - if hasattr(self.__app, "exec_"): - rslt = self.__app.exec_() - else: + if hasattr(self.__app, "exec"): rslt = self.__app.exec() + else: + rslt = self.__app.exec_() self.__log_debug("Qt event loop ended with result %s", rslt) return rslt finally: diff --git a/qasync/_windows.py b/qasync/_windows.py index e11a027..6150435 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -61,7 +61,6 @@ class _IocpProactor(windows_events.IocpProactor): def __init__(self): self.__events = [] super(_IocpProactor, self).__init__() - self._lock = QtCore.QMutex() def select(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" @@ -72,60 +71,36 @@ def select(self, timeout=None): return tmp def close(self): - self._logger.debug("Closing") - super(_IocpProactor, self).close() - - # Wrap all I/O submission methods to acquire the internal lock first; listed - # in the order they appear in the base class source code. - - def recv(self, conn, nbytes, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).recv(conn, nbytes, flags) - - def recv_into(self, conn, buf, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).recv_into(conn, buf, flags) - - def recvfrom(self, conn, nbytes, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).recvfrom(conn, nbytes, flags) - - def recvfrom_into(self, conn, buf, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).recvfrom_into(conn, buf, flags) - - def sendto(self, conn, buf, flags=0, addr=None): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).sendto(conn, buf, flags, addr) - - def send(self, conn, buf, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).send(conn, buf, flags) - - def accept(self, listener): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).accept(listener) - - def connect(self, conn, address): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).connect(conn, address) - - def sendfile(self, sock, file, offset, count): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).sendfile(sock, file, offset, count) - - def accept_pipe(self, pipe): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).accept_pipe(pipe) - - # connect_pipe() does not actually use the delayed completion machinery. - - # This takes care of wait_for_handle() too. - def _wait_for_handle(self, handle, timeout, _is_cancel): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self)._wait_for_handle( - handle, timeout, _is_cancel - ) + if self._iocp is None: + # already closed + return + + # Cancel remaining registered operations. + for fut, ov, obj, callback in list(self._cache.values()): + if fut.cancelled(): + # Nothing to do with cancelled futures + pass + elif isinstance(fut, windows_events._WaitCancelFuture): + # _WaitCancelFuture must not be cancelled + pass + else: + try: + fut.cancel() + except OSError as exc: + if self._loop is not None: + context = { + "message": "Cancelling a future failed", + "exception": exc, + "future": fut, + } + if fut._source_traceback: + context["source_traceback"] = fut._source_traceback + self._loop.call_exception_handler(context) + + self._results = [] + + _winapi.CloseHandle(self._iocp) + self._iocp = None def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" @@ -140,30 +115,29 @@ def _poll(self, timeout=None): if ms >= UINT32_MAX: raise ValueError("timeout too big") - with QtCore.QMutexLocker(self._lock): - while True: - # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( - # ms, threading.get_ident())) - status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) - if status is None: - break - ms = 0 + while True: + # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( + # ms, threading.get_ident())) + status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) + if status is None: + break + ms = 0 - err, transferred, key, address = status - try: - f, ov, obj, callback = self._cache.pop(address) - except KeyError: - # key is either zero, or it is used to return a pipe - # handle which should be closed to avoid a leak. - if key not in (0, _overlapped.INVALID_HANDLE_VALUE): - _winapi.CloseHandle(key) - continue - - if obj in self._stopped_serving: - f.cancel() - # Futures might already be resolved or cancelled - elif not f.done(): - self.__events.append((f, callback, transferred, key, ov)) + err, transferred, key, address = status + try: + f, ov, obj, callback = self._cache.pop(address) + except KeyError: + # key is either zero, or it is used to return a pipe + # handle which should be closed to avoid a leak. + if key not in (0, _overlapped.INVALID_HANDLE_VALUE): + _winapi.CloseHandle(key) + continue + + if obj in self._stopped_serving: + f.cancel() + # Futures might already be resolved or cancelled + elif not f.done(): + self.__events.append((f, callback, transferred, key, ov)) # Remove unregistered futures for ov in self._unregistered: @@ -179,11 +153,9 @@ def __init__(self, proactor, parent): self.__stop = False self.__proactor = proactor self.__sig_events = parent.sig_events - self.__semaphore = QtCore.QSemaphore() def start(self): super().start() - self.__semaphore.acquire() def stop(self): self.__stop = True @@ -192,7 +164,6 @@ def stop(self): def run(self): self._logger.debug("Thread started") - self.__semaphore.release() while not self.__stop: events = self.__proactor.select(0.01) diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index c1de714..7dcbd18 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -553,6 +553,14 @@ def test_regression_bug13(loop, sock_pair): c_sock, s_sock = sock_pair client_done, server_done = asyncio.Future(), asyncio.Future() + if os.name == "nt": + # On Windows, `loop.add_reader` and `loop.add_writer` + # are not supported by Python's `asyncio` due to platform limitations. + # Though `qasync` does provide those methods on Windows, + # it doesn't guarantee safety against race conditions like on Unix. + # https://docs.python.org/3/library/asyncio-platforms.html + return + async def server_coro(): s_reader, s_writer = await asyncio.open_connection(sock=s_sock)