Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make full CDK codebase compliant with mypy #105

Merged
merged 37 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
55f6b4f
run mypy on all files, including unchanged
aaronsteers Dec 3, 2024
4d48d3f
fix mypy syntax
aaronsteers Dec 3, 2024
d3ea724
chore: `poetry add --dev types-requests`
aaronsteers Dec 3, 2024
3b9b389
chore: `poetry add --dev types-python-dateutil`
aaronsteers Dec 3, 2024
69fc701
chore: `poetry add --dev types-PyYaml`
aaronsteers Dec 3, 2024
f168266
chore: replace deprecated `deprecated` refs with PEP 702 `deprecated`
aaronsteers Dec 3, 2024
9c27d7e
chore: add missing cachetools stubs
aaronsteers Dec 3, 2024
9b08030
chore: remove unused `Deprecated` dependency
aaronsteers Dec 3, 2024
f54eb7c
chore: fix orjson imports
aaronsteers Dec 3, 2024
8202ce5
chore: misc mypy typing fixes
aaronsteers Dec 3, 2024
7e50af9
ci: remove stale if condition
aaronsteers Dec 3, 2024
5c65b46
fix deprecated syntax
aaronsteers Dec 3, 2024
e02629f
Merge branch 'main' into aj/chore/make-mypy-compliant
aaronsteers Dec 3, 2024
70bda72
chore: deprecated decorator readability and syntax fixes
aaronsteers Dec 3, 2024
22afbf6
allow unquoted class name via future annotations
aaronsteers Dec 3, 2024
df00054
fix test expecting ValueError
aaronsteers Dec 3, 2024
d7d1fda
add future annotations import
aaronsteers Dec 3, 2024
192e905
chore: pr review and cleanup
aaronsteers Dec 3, 2024
93e4534
more noqas
aaronsteers Dec 3, 2024
7f05353
fix inadvertent tuple
aaronsteers Dec 3, 2024
65dae78
add mypy target directory
aaronsteers Dec 3, 2024
92e6994
ci: remove non-interactive flag
aaronsteers Dec 3, 2024
3eff135
chore: fix or noqa remaining mypy issues (now 100% pass)
aaronsteers Dec 3, 2024
d68ac34
chore: fix mypy task in poe
aaronsteers Dec 3, 2024
59d585e
chore: fix type: `set[str]` instead of `Template`
aaronsteers Dec 3, 2024
40f9bca
revert: undo added guard statement
aaronsteers Dec 3, 2024
c8c0ef4
revert: undo added guard statement (2)
aaronsteers Dec 3, 2024
c29447c
add missing noqas
aaronsteers Dec 3, 2024
39d66af
remove unused type ignores
aaronsteers Dec 3, 2024
d3c2d10
ci: comment-out source-s3 and destination-pinecone
aaronsteers Dec 3, 2024
b77d957
ci: fix skip indicator
aaronsteers Dec 3, 2024
72d3e44
Merge branch 'main' into aj/chore/make-mypy-compliant
aaronsteers Dec 4, 2024
763ad95
Merge branch 'main' into aj/chore/make-mypy-compliant
aaronsteers Dec 4, 2024
eb2ea3c
comment-out deprecated decorator in "hot" codepath, with link to foll…
aaronsteers Dec 4, 2024
b62f634
comment-out flaky chargebee test
aaronsteers Dec 4, 2024
6873151
update from `main`
aaronsteers Dec 4, 2024
bd2e5af
add deprecated notice to docstring
aaronsteers Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ updates:
interval: daily
labels:
- chore
open-pull-requests-limit: 8 # default is 5
open-pull-requests-limit: 8 # default is 5

- package-ecosystem: github-actions
open-pull-requests-limit: 5 # default is 5
open-pull-requests-limit: 5 # default is 5
directory: "/"
commit-message:
prefix: "ci(deps): "
Expand All @@ -29,5 +29,5 @@ updates:
minor-and-patch:
applies-to: version-updates
update-types:
- patch
- minor
- patch
- minor
9 changes: 5 additions & 4 deletions .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ jobs:
cdk_extra: n/a
- connector: source-chargebee
cdk_extra: n/a
- connector: source-s3
cdk_extra: file-based
- connector: destination-pinecone
cdk_extra: vector-db-based
# These two are behind in CDK updates and can't be used as tests until they are updated:
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
# - connector: source-s3
# cdk_extra: file-based
# - connector: destination-pinecone
# cdk_extra: vector-db-based
- connector: destination-motherduck
cdk_extra: sql
# ZenDesk currently failing (as of 2024-12-02)
Expand Down
13 changes: 5 additions & 8 deletions .github/workflows/publish_sdm_connector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ on:
workflow_dispatch:
inputs:
version:
description:
The version to publish, ie 1.0.0 or 1.0.0-dev1.
If omitted, and if run from a release branch, the version will be
inferred from the git tag.
If omitted, and if run from a non-release branch, then only a SHA-based
Docker tag will be created.
description: The version to publish, ie 1.0.0 or 1.0.0-dev1.
If omitted, and if run from a release branch, the version will be
inferred from the git tag.
If omitted, and if run from a non-release branch, then only a SHA-based
Docker tag will be created.
required: false
dry_run:
description: If true, the workflow will not push to DockerHub.
Expand All @@ -24,7 +23,6 @@ jobs:
build:
runs-on: ubuntu-latest
steps:

- name: Detect Release Tag Version
if: startsWith(github.ref, 'refs/tags/v')
run: |
Expand Down Expand Up @@ -167,7 +165,6 @@ jobs:
tags: |
airbyte/source-declarative-manifest:${{ env.VERSION }}


- name: Build and push ('latest' tag)
# Only run if version is set and IS_PRERELEASE is false
if: env.VERSION != '' && env.IS_PRERELEASE == 'false' && github.event.inputs.dry_run == 'false'
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/pytest_matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ on:
branches:
- main
paths:
- 'airbyte_cdk/**'
- 'unit_tests/**'
- 'poetry.lock'
- 'pyproject.toml'
- "airbyte_cdk/**"
- "unit_tests/**"
- "poetry.lock"
- "pyproject.toml"
pull_request:

jobs:
Expand Down
13 changes: 3 additions & 10 deletions .github/workflows/python_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,7 @@ jobs:
- name: Install dependencies
run: poetry install --all-extras

# Job-specifc step(s):
# Job-specific step(s):

# For now, we run mypy only on modified files
- name: Get changed Python files
id: changed-py-files
uses: tj-actions/changed-files@v43
with:
files: "airbyte_cdk/**/*.py"
- name: Run mypy on changed files
if: steps.changed-py-files.outputs.any_changed == 'true'
run: poetry run mypy ${{ steps.changed-py-files.outputs.all_changed_files }} --config-file mypy.ini --install-types --non-interactive
- name: Run mypy
run: poetry run mypy --config-file mypy.ini airbyte_cdk
16 changes: 11 additions & 5 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pathlib import Path
from typing import Any, cast

from orjson import orjson
import orjson

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import (
Expand Down Expand Up @@ -72,7 +72,7 @@ def __init__(
super().__init__(
catalog=catalog,
config=config,
state=state,
state=state, # type: ignore [arg-type]
path_to_yaml="manifest.yaml",
)

Expand Down Expand Up @@ -152,18 +152,24 @@ def handle_remote_manifest_command(args: list[str]) -> None:
)


def create_declarative_source(args: list[str]) -> ConcurrentDeclarativeSource:
def create_declarative_source(
args: list[str],
) -> ConcurrentDeclarativeSource: # type: ignore [type-arg]
"""Creates the source with the injected config.

This essentially does what other low-code sources do at build time, but at runtime,
with a user-provided manifest in the config. This better reflects what happens in the
connector builder.
"""
try:
config: Mapping[str, Any] | None
catalog: ConfiguredAirbyteCatalog | None
state: list[AirbyteStateMessage]
config, catalog, state = _parse_inputs_into_config_catalog_state(args)
if "__injected_declarative_manifest" not in config:
if config is None or "__injected_declarative_manifest" not in config:
raise ValueError(
f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
f"of the config but config only has keys: {list(config.keys() if config else [])}"
)
return ConcurrentDeclarativeSource(
config=config,
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from copy import copy
from typing import Any, List, MutableMapping

from orjson import orjson
import orjson

from airbyte_cdk.models import (
AirbyteControlConnectorConfigMessage,
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
from typing import Any, List, Mapping, Optional, Tuple

from orjson import orjson
import orjson

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.connector_builder.connector_builder_handler import (
Expand Down
20 changes: 10 additions & 10 deletions airbyte_cdk/connector_builder/message_grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _cursor_field_to_nested_and_composite_field(

is_nested_key = isinstance(field[0], str)
if is_nested_key:
return [field] # type: ignore # the type of field is expected to be List[str] here
return [field]

raise ValueError(f"Unknown type for cursor field `{field}")

Expand Down Expand Up @@ -232,9 +232,9 @@ def _get_message_groups(
current_slice_descriptor = self._parse_slice_description(message.log.message) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
current_slice_pages = []
at_least_one_page_in_group = False
elif message.type == MessageType.LOG and message.log.message.startswith(
elif message.type == MessageType.LOG and message.log.message.startswith( # type: ignore[union-attr] # None doesn't have 'message'
SliceLogger.SLICE_LOG_PREFIX
): # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
):
# parsing the first slice
current_slice_descriptor = self._parse_slice_description(message.log.message) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
elif message.type == MessageType.LOG:
Expand Down Expand Up @@ -274,14 +274,14 @@ def _get_message_groups(
if message.trace.type == TraceType.ERROR: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has trace.type
yield message.trace
elif message.type == MessageType.RECORD:
current_page_records.append(message.record.data) # type: ignore[union-attr] # AirbyteMessage with MessageType.RECORD has record.data
current_page_records.append(message.record.data) # type: ignore[arg-type, union-attr] # AirbyteMessage with MessageType.RECORD has record.data
records_count += 1
schema_inferrer.accumulate(message.record)
datetime_format_inferrer.accumulate(message.record)
elif (
message.type == MessageType.CONTROL
and message.control.type == OrchestratorType.CONNECTOR_CONFIG
): # type: ignore[union-attr] # AirbyteMessage with MessageType.CONTROL has control.type
and message.control.type == OrchestratorType.CONNECTOR_CONFIG # type: ignore[union-attr] # None doesn't have 'type'
):
yield message.control
elif message.type == MessageType.STATE:
latest_state_message = message.state # type: ignore[assignment]
Expand Down Expand Up @@ -310,8 +310,8 @@ def _need_to_close_page(
and message.type == MessageType.LOG
and (
MessageGrouper._is_page_http_request(json_message)
or message.log.message.startswith("slice:")
) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
or message.log.message.startswith("slice:") # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
)
)

@staticmethod
Expand Down Expand Up @@ -355,8 +355,8 @@ def _close_page(
StreamReadPages(
request=current_page_request,
response=current_page_response,
records=deepcopy(current_page_records),
) # type: ignore
records=deepcopy(current_page_records), # type: ignore [arg-type]
)
)
current_page_records.clear()

Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping

from orjson import orjson
import orjson

from airbyte_cdk.connector import Connector
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/destinations/vector_db_based/embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def embedding_dimensions(self) -> int:
class OpenAIEmbedder(BaseOpenAIEmbedder):
def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
super().__init__(
OpenAIEmbeddings(
OpenAIEmbeddings( # type: ignore [call-arg]
openai_api_key=config.openai_key, max_retries=15, disallowed_special=()
),
chunk_size,
Expand All @@ -118,7 +118,7 @@ class AzureOpenAIEmbedder(BaseOpenAIEmbedder):
def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int):
# Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request
super().__init__(
OpenAIEmbeddings(
OpenAIEmbeddings( # type: ignore [call-arg]
openai_api_key=config.openai_key,
chunk_size=16,
max_retries=15,
Expand Down
16 changes: 12 additions & 4 deletions airbyte_cdk/destinations/vector_db_based/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,19 @@ def write(
yield message
elif message.type == Type.RECORD:
record_chunks, record_id_to_delete = self.processor.process(message.record)
self.chunks[(message.record.namespace, message.record.stream)].extend(record_chunks)
if record_id_to_delete is not None:
self.ids_to_delete[(message.record.namespace, message.record.stream)].append(
record_id_to_delete
self.chunks[
( # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]"
message.record.namespace, # type: ignore [union-attr] # record not None
message.record.stream, # type: ignore [union-attr] # record not None
)
].extend(record_chunks)
if record_id_to_delete is not None:
self.ids_to_delete[
( # type: ignore [index] # expected "tuple[str, str]", got "tuple[str | Any | None, str | Any]"
message.record.namespace, # type: ignore [union-attr] # record not None
message.record.stream, # type: ignore [union-attr] # record not None
)
].append(record_id_to_delete)
self.number_of_chunks += len(record_chunks)
if self.number_of_chunks >= self.batch_size:
self._process_batch()
Expand Down
15 changes: 8 additions & 7 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
from typing import Any, DefaultDict, Iterable, List, Mapping, Optional
from urllib.parse import urlparse

import orjson
import requests
from orjson import orjson
from requests import PreparedRequest, Response, Session

from airbyte_cdk.connector import TConfig
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.logger import init_logger
from airbyte_cdk.models import ( # type: ignore [attr-defined]
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteMessageSerializer,
Expand Down Expand Up @@ -248,17 +248,18 @@ def handle_record_counts(
case Type.RECORD:
stream_message_count[
HashableStreamDescriptor(
name=message.record.stream, namespace=message.record.namespace
name=message.record.stream, # type: ignore[union-attr] # record has `stream`
namespace=message.record.namespace, # type: ignore[union-attr] # record has `namespace`
)
] += 1.0 # type: ignore[union-attr] # record has `stream` and `namespace`
] += 1.0
case Type.STATE:
stream_descriptor = message_utils.get_stream_descriptor(message)

# Set record count from the counter onto the state message
message.state.sourceStats = message.state.sourceStats or AirbyteStateStats() # type: ignore[union-attr] # state has `sourceStats`
message.state.sourceStats.recordCount = stream_message_count.get(
message.state.sourceStats.recordCount = stream_message_count.get( # type: ignore[union-attr] # state has `sourceStats`
stream_descriptor, 0.0
) # type: ignore[union-attr] # state has `sourceStats`
)

# Reset the counter
stream_message_count[stream_descriptor] = 0.0
Expand All @@ -280,7 +281,7 @@ def set_up_secret_filter(config: TConfig, connection_specification: Mapping[str,

@staticmethod
def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
return orjson.dumps(AirbyteMessageSerializer.dump(airbyte_message)).decode() # type: ignore[no-any-return] # orjson.dumps(message).decode() always returns string
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
return orjson.dumps(AirbyteMessageSerializer.dump(airbyte_message)).decode()

@classmethod
def extract_state(cls, args: List[str]) -> Optional[Any]:
Expand Down
4 changes: 2 additions & 2 deletions airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging.config
from typing import Any, Callable, Mapping, Optional, Tuple

from orjson import orjson
import orjson

from airbyte_cdk.models import (
AirbyteLogMessage,
Expand Down Expand Up @@ -78,7 +78,7 @@ def format(self, record: logging.LogRecord) -> str:
log_message = AirbyteMessage(
type=Type.LOG, log=AirbyteLogMessage(level=airbyte_level, message=message)
)
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode() # type: ignore[no-any-return] # orjson.dumps(message).decode() always returns string
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode()

@staticmethod
def extract_extra_args_from_record(record: logging.LogRecord) -> Mapping[str, Any]:
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def read(
if len(stream_name_to_exception) > 0:
error_message = generate_failed_streams_error_message(
{key: [value] for key, value in stream_name_to_exception.items()}
) # type: ignore # for some reason, mypy can't figure out the types for key and value
)
logger.info(error_message)
# We still raise at least one exception when a stream raises an exception because the platform currently relies
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
expand_refs(schema)
schema.pop("description", None) # description added from the docstring
return schema # type: ignore[no-any-return]
return schema
Loading
Loading