diff --git a/CHANGELOG.md b/CHANGELOG.md index 65a8d787..0d348aa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Fix exception for convert sync to async iterator * Fixed start many sync writers/readers in parallel ## 3.3.0 ## diff --git a/ydb/_grpc/grpcwrapper/common_utils.py b/ydb/_grpc/grpcwrapper/common_utils.py index a7058048..bc294025 100644 --- a/ydb/_grpc/grpcwrapper/common_utils.py +++ b/ydb/_grpc/grpcwrapper/common_utils.py @@ -112,7 +112,7 @@ def __next__(self): return item -class SyncIteratorToAsyncIterator: +class SyncToAsyncIterator: def __init__(self, sync_iterator: Iterator, executor: concurrent.futures.Executor): self._sync_iterator = sync_iterator self._executor = executor @@ -124,8 +124,8 @@ async def __anext__(self): try: res = await to_thread(self._sync_iterator.__next__, executor=self._executor) return res - except StopAsyncIteration: - raise StopIteration() + except StopIteration: + raise StopAsyncIteration() class IGrpcWrapperAsyncIO(abc.ABC): @@ -197,7 +197,7 @@ async def _start_sync_driver(self, driver: ydb.Driver, stub, method): stream_call = await to_thread(driver, requests_iterator, stub, method, executor=self._wait_executor) self._stream_call = stream_call - self.from_server_grpc = SyncIteratorToAsyncIterator(stream_call.__iter__(), self._wait_executor) + self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor) async def receive(self) -> Any: # todo handle grpc exceptions and convert it to internal exceptions