Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the slow performance on Windows #101

Merged
merged 12 commits into from
Nov 16, 2023
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ build-backend = "poetry.core.masonry.api"
[tool.pytest]
addopts = "-n auto"
testpaths = ["tests"]

[tool.pytest.ini_options]
markers = ["raises"]
6 changes: 3 additions & 3 deletions qasync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ def run_forever(self):
self.__log_debug("Starting Qt event loop")
asyncio.events._set_running_loop(self)
rslt = -1
if hasattr(self.__app, "exec_"):
rslt = self.__app.exec_()
else:
if hasattr(self.__app, "exec"):
rslt = self.__app.exec()
else:
rslt = self.__app.exec_()
self.__log_debug("Qt event loop ended with result %s", rslt)
return rslt
finally:
Expand Down
133 changes: 52 additions & 81 deletions qasync/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class _IocpProactor(windows_events.IocpProactor):
def __init__(self):
self.__events = []
super(_IocpProactor, self).__init__()
self._lock = QtCore.QMutex()

def select(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -72,60 +71,36 @@ def select(self, timeout=None):
return tmp

def close(self):
self._logger.debug("Closing")
super(_IocpProactor, self).close()

# Wrap all I/O submission methods to acquire the internal lock first; listed
# in the order they appear in the base class source code.

def recv(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv(conn, nbytes, flags)

def recv_into(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv_into(conn, buf, flags)

def recvfrom(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recvfrom(conn, nbytes, flags)

def recvfrom_into(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recvfrom_into(conn, buf, flags)

def sendto(self, conn, buf, flags=0, addr=None):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).sendto(conn, buf, flags, addr)

def send(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).send(conn, buf, flags)

def accept(self, listener):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept(listener)

def connect(self, conn, address):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).connect(conn, address)

def sendfile(self, sock, file, offset, count):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).sendfile(sock, file, offset, count)

def accept_pipe(self, pipe):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept_pipe(pipe)

# connect_pipe() does not actually use the delayed completion machinery.

# This takes care of wait_for_handle() too.
def _wait_for_handle(self, handle, timeout, _is_cancel):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self)._wait_for_handle(
handle, timeout, _is_cancel
)
if self._iocp is None:
# already closed
return

# Cancel remaining registered operations.
for fut, ov, obj, callback in list(self._cache.values()):
if fut.cancelled():
# Nothing to do with cancelled futures
pass
elif isinstance(fut, windows_events._WaitCancelFuture):
# _WaitCancelFuture must not be cancelled
pass
else:
try:
fut.cancel()
except OSError as exc:
if self._loop is not None:
context = {
"message": "Cancelling a future failed",
"exception": exc,
"future": fut,
}
if fut._source_traceback:
context["source_traceback"] = fut._source_traceback
self._loop.call_exception_handler(context)

self._results = []

_winapi.CloseHandle(self._iocp)
self._iocp = None

def _poll(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
Expand All @@ -140,30 +115,29 @@ def _poll(self, timeout=None):
if ms >= UINT32_MAX:
raise ValueError("timeout too big")

with QtCore.QMutexLocker(self._lock):
while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0
while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0

err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue

if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))

# Remove unregistered futures
for ov in self._unregistered:
Expand All @@ -179,11 +153,9 @@ def __init__(self, proactor, parent):
self.__stop = False
self.__proactor = proactor
self.__sig_events = parent.sig_events
self.__semaphore = QtCore.QSemaphore()

def start(self):
super().start()
self.__semaphore.acquire()

def stop(self):
self.__stop = True
Expand All @@ -192,7 +164,6 @@ def stop(self):

def run(self):
self._logger.debug("Thread started")
self.__semaphore.release()

while not self.__stop:
events = self.__proactor.select(0.01)
Expand Down
8 changes: 8 additions & 0 deletions tests/test_qeventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,14 @@ def test_regression_bug13(loop, sock_pair):
c_sock, s_sock = sock_pair
client_done, server_done = asyncio.Future(), asyncio.Future()

if os.name == "nt":
# On Windows, `loop.add_reader` and `loop.add_writer`
# are not supported by Python's `asyncio` due to platform limitations.
# Though `qasync` does provide those methods on Windows,
# it doesn't guarantee safety against race conditions like on Unix.
# https://docs.python.org/3/library/asyncio-platforms.html
return

async def server_coro():
s_reader, s_writer = await asyncio.open_connection(sock=s_sock)

Expand Down