Skip to content

Commit

Permalink
remove unused type ignores
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Dec 3, 2024
1 parent c29447c commit 39d66af
Show file tree
Hide file tree
Showing 28 changed files with 56 additions and 52 deletions.
8 changes: 4 additions & 4 deletions airbyte_cdk/connector_builder/message_grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _cursor_field_to_nested_and_composite_field(

is_nested_key = isinstance(field[0], str)
if is_nested_key:
return [field] # type: ignore # the type of field is expected to be List[str] here
return [field]

raise ValueError(f"Unknown type for cursor field `{field}")

Expand Down Expand Up @@ -234,7 +234,7 @@ def _get_message_groups(
at_least_one_page_in_group = False
elif message.type == MessageType.LOG and message.log.message.startswith( # type: ignore[union-attr] # None doesn't have 'message'
SliceLogger.SLICE_LOG_PREFIX
): # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
):
# parsing the first slice
current_slice_descriptor = self._parse_slice_description(message.log.message) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
elif message.type == MessageType.LOG:
Expand Down Expand Up @@ -281,7 +281,7 @@ def _get_message_groups(
elif (
message.type == MessageType.CONTROL
and message.control.type == OrchestratorType.CONNECTOR_CONFIG # type: ignore[union-attr] # None doesn't have 'type'
): # type: ignore[union-attr] # AirbyteMessage with MessageType.CONTROL has control.type
):
yield message.control
elif message.type == MessageType.STATE:
latest_state_message = message.state # type: ignore[assignment]
Expand Down Expand Up @@ -356,7 +356,7 @@ def _close_page(
request=current_page_request,
response=current_page_response,
records=deepcopy(current_page_records), # type: ignore [arg-type]
) # type: ignore
)
)
current_page_records.clear()

Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from airbyte_cdk.connector import TConfig
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.logger import init_logger
from airbyte_cdk.models import ( # type: ignore [attr-defined]
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteMessageSerializer,
Expand Down Expand Up @@ -281,7 +281,7 @@ def set_up_secret_filter(config: TConfig, connection_specification: Mapping[str,

@staticmethod
def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
return orjson.dumps(AirbyteMessageSerializer.dump(airbyte_message)).decode() # type: ignore[no-any-return] # orjson.dumps(message).decode() always returns string
return orjson.dumps(AirbyteMessageSerializer.dump(airbyte_message)).decode()

@classmethod
def extract_state(cls, args: List[str]) -> Optional[Any]:
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def format(self, record: logging.LogRecord) -> str:
log_message = AirbyteMessage(
type=Type.LOG, log=AirbyteLogMessage(level=airbyte_level, message=message)
)
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode() # type: ignore[no-any-return] # orjson.dumps(message).decode() always returns string
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode()

@staticmethod
def extract_extra_args_from_record(record: logging.LogRecord) -> Mapping[str, Any]:
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def read(
if len(stream_name_to_exception) > 0:
error_message = generate_failed_streams_error_message(
{key: [value] for key, value in stream_name_to_exception.items()}
) # type: ignore # for some reason, mypy can't figure out the types for key and value
)
logger.info(error_message)
# We still raise at least one exception when a stream raises an exception because the platform currently relies
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
expand_refs(schema)
schema.pop("description", None) # description added from the docstring
return schema # type: ignore[no-any-return]
return schema
11 changes: 8 additions & 3 deletions airbyte_cdk/sources/connector_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import copy
from dataclasses import dataclass
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Union, cast

from airbyte_cdk.models import (
AirbyteMessage,
Expand All @@ -15,6 +15,7 @@
StreamDescriptor,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.models.airbyte_protocol import AirbyteGlobalState, AirbyteStateBlob


@dataclass(frozen=True)
Expand Down Expand Up @@ -118,8 +119,12 @@ def _extract_from_state_message(
is_global = cls._is_global_state(state)

if is_global:
global_state = state[0].global_ # type: ignore # We verified state is a list in _is_global_state
shared_state = copy.deepcopy(global_state.shared_state, {}) # type: ignore[union-attr] # global_state has shared_state
# We already validate that this is a global state message, not None:
global_state = cast(AirbyteGlobalState, state[0].global_)
# global_state has shared_state, also not None:
shared_state: AirbyteStateBlob = cast(
AirbyteStateBlob, copy.deepcopy(global_state.shared_state, {})
)
streams = {
HashableStreamDescriptor(
name=per_stream_state.stream_descriptor.name,
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_grant_type(self) -> str:
return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context

def get_refresh_request_body(self) -> Mapping[str, Any]:
return self._refresh_request_body.eval(self.config) # type: ignore # eval should return a Mapping in this context
return self._refresh_request_body.eval(self.config)

def get_token_expiry_date(self) -> pendulum.DateTime:
return self._token_expiry_date # type: ignore # _token_expiry_date is a pendulum.DateTime. It is never None despite what mypy thinks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __init__(
initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
logger=self.logger,
slice_logger=self._slice_logger,
message_repository=self.message_repository, # type: ignore # message_repository is always instantiated with a value by factory
message_repository=self.message_repository,
)

def read(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def set_initial_state(self, stream_state: StreamState) -> None:
"""
self._cursor = (
stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore [union-attr]
) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
)

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Expand All @@ -161,7 +161,7 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
record,
stream_slice.get(start_field), # type: ignore [arg-type]
stream_slice.get(end_field), # type: ignore [arg-type]
) # type: ignore # we know that stream_slices for these cursors will use a string representing an unparsed date
)
and is_highest_observed_cursor_value
):
self._highest_observed_cursor_field_value = record_cursor_value
Expand Down Expand Up @@ -372,7 +372,7 @@ def _get_request_options(
if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore [union-attr]
self._partition_field_end.eval(self.config)
) # type: ignore # field_name is always casted to an interpolated string
)
return options

def should_be_synced(self, record: Record) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/interpolation/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class StreamPartitionAccessEnvironment(SandboxedEnvironment):
def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool:
if attr in ["_partition"]:
return True
return super().is_safe_attribute(obj, attr, value) # type: ignore # for some reason, mypy says 'Returning Any from function declared to return "bool"'
return super().is_safe_attribute(obj, attr, value)


class JinjaInterpolation(Interpolation):
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/interpolation/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def duration(datestring: str) -> Union[datetime.timedelta, isodate.Duration]:
Usage:
`"{{ now_utc() - duration('P1D') }}"`
"""
return parse_duration(datestring) # type: ignore # mypy thinks this returns Any for some reason
return parse_duration(datestring)


def format_datetime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def __init__(
self._disable_retries = disable_retries
self._disable_cache = disable_cache
self._disable_resumable_full_refresh = disable_resumable_full_refresh
self._message_repository = message_repository or InMemoryMessageRepository( # type: ignore
self._message_repository = message_repository or InMemoryMessageRepository(
self._evaluate_log_level(emit_connector_builder_messages)
)

Expand Down Expand Up @@ -645,7 +645,7 @@ def create_legacy_to_per_partition_state_migration(
declarative_stream.incremental_sync, # type: ignore # was already checked. Migration can be applied only to incremental streams.
config,
declarative_stream.parameters, # type: ignore # different type is expected here Mapping[str, Any], got Dict[str, Any]
) # type: ignore # The retriever type was already checked
)

def create_session_token_authenticator(
self, model: SessionTokenAuthenticatorModel, config: Config, name: str, **kwargs: Any
Expand Down Expand Up @@ -675,7 +675,7 @@ def create_session_token_authenticator(
return ModelToComponentFactory.create_bearer_authenticator(
BearerAuthenticatorModel(type="BearerAuthenticator", api_token=""), # type: ignore # $parameters has a default value
config,
token_provider=token_provider, # type: ignore # $parameters defaults to None
token_provider=token_provider,
)
else:
return ModelToComponentFactory.create_api_key_authenticator(
Expand Down Expand Up @@ -822,7 +822,6 @@ def create_concurrent_cursor_from_datetime_based_cursor(
input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
is_sequential_state=True,
cursor_granularity=cursor_granularity,
# type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
)

start_date_runtime_value: Union[InterpolatedString, str, MinMaxDatetime]
Expand Down Expand Up @@ -896,7 +895,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
message_repository=self._message_repository,
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
Expand Down Expand Up @@ -1709,7 +1708,7 @@ def create_oauth_authenticator(
refresh_token=model.refresh_token,
scopes=model.scopes,
token_expiry_date=model.token_expiry_date,
token_expiry_date_format=model.token_expiry_date_format, # type: ignore
token_expiry_date_format=model.token_expiry_date_format,
token_expiry_is_time_of_expiration=bool(model.token_expiry_date_format),
token_refresh_endpoint=model.token_refresh_endpoint,
config=config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def _get_request_option(
config=self.config
): value
}
) # type: ignore # field_name is always casted to an interpolated string
)
return params

def stream_slices(self) -> Iterable[StreamSlice]:
Expand Down Expand Up @@ -164,7 +164,7 @@ def stream_slices(self) -> Iterable[StreamSlice]:
extra_fields = [
[field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr]
for field_path in parent_stream_config.extra_fields
] # type: ignore # extra_fields is always casted to an interpolated string
]

# read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
# not support either substreams or RFR, but something that needs to be considered once we do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def backoff_time(
for backoff_strategy in self.backoff_strategies:
backoff = backoff_strategy.backoff_time(
response_or_exception=response_or_exception, attempt_count=attempt_count
) # type: ignore # attempt_count maintained for compatibility with low code CDK
)
if backoff:
return backoff
return backoff
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,21 @@ def _create_error_message(self, response: requests.Response) -> Optional[str]:
"""
return self.error_message.eval( # type: ignore [no-any-return, union-attr]
self.config, response=self._safe_response_json(response), headers=response.headers
) # type: ignore # error_message is always cast to an interpolated string
)

def _response_matches_predicate(self, response: requests.Response) -> bool:
return (
bool(
self.predicate.condition # type: ignore [union-attr]
and self.predicate.eval( # type: ignore [union-attr]
None, # type: ignore [arg-type]
response=self._safe_response_json(response), # type: ignore [arg-type]
response=self._safe_response_json(response),
headers=response.headers,
)
)
if self.predicate
else False
) # type: ignore # predicate is always cast to an interpolated string
)

def _response_contains_error_message(self, response: requests.Response) -> bool:
if not self.error_message_contains:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,5 @@ def _get_request_options(
if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get( # type: ignore [union-attr]
self._partition_field_end.eval(self.config)
) # type: ignore # field_name is always casted to an interpolated string
)
return options
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/declarative/spec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
AdvancedAuth,
ConnectorSpecification,
ConnectorSpecificationSerializer,
) # type: ignore [attr-defined]
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import AuthFlow


Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/sources/file_based/file_types/csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _get_headers(self, fp: IOBase, config_format: CsvFormat, dialect_name: str)
"""
# Note that this method assumes the dialect has already been registered if we're parsing the headers
if isinstance(config_format.header_definition, CsvHeaderUserProvided):
return config_format.header_definition.column_names # type: ignore # should be CsvHeaderUserProvided given the type
return config_format.header_definition.column_names

if isinstance(config_format.header_definition, CsvHeaderAutogenerated):
self._skip_rows(
Expand Down Expand Up @@ -229,7 +229,7 @@ def parse_records(
if discovered_schema:
property_types = {
col: prop["type"] for col, prop in discovered_schema["properties"].items()
} # type: ignore # discovered_schema["properties"] is known to be a mapping
}
deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
else:
deduped_property_types = {}
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def should_be_synced(self, record: Record) -> bool:
:return: True if the record's cursor value falls within the sync boundaries
"""
try:
record_cursor_value: CursorValueType = self._extract_cursor_value(record) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
record_cursor_value: CursorValueType = self._extract_cursor_value(record)
except ValueError:
self._log_for_record_without_cursor_value()
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def parse_timestamp(self, timestamp: int) -> datetime:
raise ValueError(
f"DateTime object was expected but got {type(dt_object)} from pendulum.parse({timestamp})"
)
return dt_object # type: ignore # we are manually type checking because pendulum.parse may return different types
return dt_object


class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter):
Expand Down Expand Up @@ -178,7 +178,7 @@ def parse_timestamp(self, timestamp: str) -> datetime:
raise ValueError(
f"DateTime object was expected but got {type(dt_object)} from pendulum.parse({timestamp})"
)
return dt_object # type: ignore # we are manually type checking because pendulum.parse may return different types
return dt_object


class CustomFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter):
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def as_airbyte_stream(self) -> AirbyteStream:
# If we can offer incremental we always should. RFR is always less reliable than incremental which uses a real cursor value
if self.supports_incremental:
stream.source_defined_cursor = self.source_defined_cursor
stream.supported_sync_modes.append(SyncMode.incremental) # type: ignore
stream.supported_sync_modes.append(SyncMode.incremental)
stream.default_cursor_field = self._wrapped_cursor_field()

keys = Stream._wrapped_primary_key(self.primary_key)
Expand Down
Loading

0 comments on commit 39d66af

Please sign in to comment.