From 879f78bac9ca4343ffd9ffdc25637fee39a06450 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 10 Nov 2024 11:55:33 -0800 Subject: [PATCH] apply additional autofixes --- .../destinations/vector_db_based/document_processor.py | 4 +--- .../sources/concurrent_source/concurrent_read_processor.py | 3 +-- .../requesters/error_handlers/composite_error_handler.py | 7 ++++--- airbyte_cdk/sources/streams/call_rate.py | 1 + airbyte_cdk/test/catalog_builder.py | 2 +- 5 files changed, 8 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/destinations/vector_db_based/document_processor.py b/airbyte_cdk/destinations/vector_db_based/document_processor.py index 6d5baf71..0b47833e 100644 --- a/airbyte_cdk/destinations/vector_db_based/document_processor.py +++ b/airbyte_cdk/destinations/vector_db_based/document_processor.py @@ -161,9 +161,7 @@ def process(self, record: AirbyteRecordMessage) -> tuple[list[Chunk], str | None ) for chunk_document in self._split_document(doc) ] - id_to_delete = ( - doc.metadata.get(METADATA_RECORD_ID_FIELD, None) - ) + id_to_delete = doc.metadata.get(METADATA_RECORD_ID_FIELD, None) return chunks, id_to_delete def _generate_document(self, record: AirbyteRecordMessage) -> Document | None: diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index 7594d481..b8d7ffe3 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -213,8 +213,7 @@ def is_done(self) -> bool: 3. All partitions for all streams are closed """ is_done = all( - self._is_stream_done(stream_name) - for stream_name in self._stream_name_to_instance + self._is_stream_done(stream_name) for stream_name in self._stream_name_to_instance ) if is_done and self._exceptions_per_stream_name: error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py index 2a120d4a..2cd739b9 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -72,9 +72,10 @@ def interpret_response( if matched_error_resolution.response_action == ResponseAction.SUCCESS: return matched_error_resolution - if ( - matched_error_resolution.response_action in {ResponseAction.RETRY, ResponseAction.IGNORE} - ): + if matched_error_resolution.response_action in { + ResponseAction.RETRY, + ResponseAction.IGNORE, + }: return matched_error_resolution if matched_error_resolution: return matched_error_resolution diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 96394b6a..95b5e148 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -23,6 +23,7 @@ # prevents mypy from complaining about missing session attributes in LimiterMixin if TYPE_CHECKING: from collections.abc import Mapping + MIXIN_BASE = requests.Session else: MIXIN_BASE = object diff --git a/airbyte_cdk/test/catalog_builder.py b/airbyte_cdk/test/catalog_builder.py index 8fa088dd..f12aab38 100644 --- a/airbyte_cdk/test/catalog_builder.py +++ b/airbyte_cdk/test/catalog_builder.py @@ -78,5 +78,5 @@ def with_stream( def build(self) -> ConfiguredAirbyteCatalog: return ConfiguredAirbyteCatalog( - streams=[builder.build() for builder in self._streams] + streams=[builder.build() for builder in self._streams], )