From 853d5889bb091971ddb6362c460024992508544e Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 4 May 2023 11:31:25 +0300 Subject: [PATCH 1/2] Close bug: Unable to create more than 10-15 writers to a Logbroker topic from one YDB driver using TopicAPI from YDB Python SDK versions 3.2.2 and 3.3.0 #289 --- CHANGELOG.md | 2 ++ tests/topics/test_topic_writer.py | 16 ++++++++++++++ ydb/_grpc/grpcwrapper/common_utils.py | 32 ++++++++++++++++++--------- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61a2c4d9..65a8d787 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Fixed start many sync writers/readers in parallel + ## 3.3.0 ## * Added support to set many topics and topic reader settings for read in one reader * Added ydb.TopicWriterInitInfo, ydb.TopicWriteResult as public types diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index d93ff2e2..327cb81e 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -1,3 +1,6 @@ +from __future__ import annotations +from typing import List + import pytest import ydb.aio @@ -196,3 +199,16 @@ def test_write_encoded(self, driver_sync: ydb.Driver, topic_path: str, codec): writer.write("a" * 1000) writer.write("b" * 1000) writer.write("c" * 1000) + + def test_start_many_sync_writers_in_parallel(self, driver_sync: ydb.Driver, topic_path): + target_count = 100 + writers = [] # type: List[ydb.TopicWriter] + for i in range(target_count): + writer = driver_sync.topic_client.writer(topic_path) + writers.append(writer) + + for i, writer in enumerate(writers): + writer.write(str(i)) + + for writer in writers: + writer.close() diff --git a/ydb/_grpc/grpcwrapper/common_utils.py b/ydb/_grpc/grpcwrapper/common_utils.py index faec03c2..1e9e83ca 100644 --- a/ydb/_grpc/grpcwrapper/common_utils.py +++ b/ydb/_grpc/grpcwrapper/common_utils.py @@ -2,6 +2,7 @@ import abc import asyncio +import concurrent.futures import contextvars import datetime import functools @@ -112,15 +113,16 @@ def __next__(self): class SyncIteratorToAsyncIterator: - def __init__(self, sync_iterator: Iterator): + def __init__(self, sync_iterator: Iterator, executor: concurrent.futures.Executor): self._sync_iterator = sync_iterator + self._executor = executor def __aiter__(self): return self async def __anext__(self): try: - res = await to_thread(self._sync_iterator.__next__) + res = await to_thread(self._sync_iterator.__next__, executor=self._executor) return res except StopAsyncIteration: raise StopIteration() @@ -149,12 +151,17 @@ class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO): convert_server_grpc_to_wrapper: Callable[[Any], Any] _connection_state: str _stream_call: Optional[Union[grpc.aio.StreamStreamCall, "grpc._channel._MultiThreadedRendezvous"]] + _wait_executor: Optional[concurrent.futures.ThreadPoolExecutor] def __init__(self, convert_server_grpc_to_wrapper): self.from_client_grpc = asyncio.Queue() self.convert_server_grpc_to_wrapper = convert_server_grpc_to_wrapper self._connection_state = "new" self._stream_call = None + self._wait_executor = None + + def __del__(self): + self._wait_executor.shutdown(wait=False) async def start(self, driver: SupportedDriverType, stub, method): if asyncio.iscoroutinefunction(driver.__call__): @@ -168,6 +175,12 @@ def close(self): if self._stream_call: self._stream_call.cancel() + self._clean_executor() + + def _clean_executor(self): + if self._wait_executor: + self._wait_executor.shutdown() + async def _start_asyncio_driver(self, driver: ydb.aio.Driver, stub, method): requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc) stream_call = await driver( @@ -180,14 +193,11 @@ async def _start_asyncio_driver(self, driver: ydb.aio.Driver, stub, method): async def _start_sync_driver(self, driver: ydb.Driver, stub, method): requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc) - stream_call = await to_thread( - driver, - requests_iterator, - stub, - method, - ) + self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + + stream_call = await to_thread(driver, requests_iterator, stub, method, executor=self._wait_executor) self._stream_call = stream_call - self.from_server_grpc = SyncIteratorToAsyncIterator(stream_call.__iter__()) + self.from_server_grpc = SyncIteratorToAsyncIterator(stream_call.__iter__(), self._wait_executor) async def receive(self) -> Any: # todo handle grpc exceptions and convert it to internal exceptions @@ -255,7 +265,7 @@ def callback_from_asyncio(callback: Union[Callable, Coroutine]) -> [asyncio.Futu return loop.run_in_executor(None, callback) -async def to_thread(func, /, *args, **kwargs): +async def to_thread(func, *args, executor: Optional[concurrent.futures.Executor], **kwargs): """Asynchronously run function *func* in a separate thread. Any *args and **kwargs supplied for this function are directly passed @@ -271,7 +281,7 @@ async def to_thread(func, /, *args, **kwargs): loop = asyncio.get_running_loop() ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) - return await loop.run_in_executor(None, func_call) + return await loop.run_in_executor(executor, func_call) def proto_duration_from_timedelta(t: Optional[datetime.timedelta]) -> Optional[ProtoDuration]: From 85882de2a6934d26859b1c136e09308f6ab79150 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 4 May 2023 15:23:14 +0300 Subject: [PATCH 2/2] fix check wait_executor --- ydb/_grpc/grpcwrapper/common_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/_grpc/grpcwrapper/common_utils.py b/ydb/_grpc/grpcwrapper/common_utils.py index 1e9e83ca..a7058048 100644 --- a/ydb/_grpc/grpcwrapper/common_utils.py +++ b/ydb/_grpc/grpcwrapper/common_utils.py @@ -161,7 +161,7 @@ def __init__(self, convert_server_grpc_to_wrapper): self._wait_executor = None def __del__(self): - self._wait_executor.shutdown(wait=False) + self._clean_executor(wait=False) async def start(self, driver: SupportedDriverType, stub, method): if asyncio.iscoroutinefunction(driver.__call__): @@ -175,11 +175,11 @@ def close(self): if self._stream_call: self._stream_call.cancel() - self._clean_executor() + self._clean_executor(wait=True) - def _clean_executor(self): + def _clean_executor(self, wait: bool): if self._wait_executor: - self._wait_executor.shutdown() + self._wait_executor.shutdown(wait) async def _start_asyncio_driver(self, driver: ydb.aio.Driver, stub, method): requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc)