Skip to content

Commit

Permalink
Auto-fix lint and format issues
Browse files Browse the repository at this point in the history
  • Loading branch information
octavia-squidington-iii committed Nov 10, 2024
1 parent 9d571da commit 338f476
Show file tree
Hide file tree
Showing 372 changed files with 18,596 additions and 4,697 deletions.
13 changes: 10 additions & 3 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

class ObservedDict(dict): # type: ignore # disallow_any_generics is set to True, and dict is equivalent to dict[Any]
def __init__(
self, non_observed_mapping: MutableMapping[Any, Any], observer: ConfigObserver, update_on_unchanged_value: bool = True
self,
non_observed_mapping: MutableMapping[Any, Any],
observer: ConfigObserver,
update_on_unchanged_value: bool = True,
) -> None:
non_observed_mapping = copy(non_observed_mapping)
self.observer = observer
Expand Down Expand Up @@ -69,11 +72,15 @@ def update(self) -> None:
emit_configuration_as_airbyte_control_message(self.config)


def observe_connector_config(non_observed_connector_config: MutableMapping[str, Any]) -> ObservedDict:
def observe_connector_config(
non_observed_connector_config: MutableMapping[str, Any],
) -> ObservedDict:
if isinstance(non_observed_connector_config, ObservedDict):
raise ValueError("This connector configuration is already observed")
connector_config_observer = ConfigObserver()
observed_connector_config = ObservedDict(non_observed_connector_config, connector_config_observer)
observed_connector_config = ObservedDict(
non_observed_connector_config, connector_config_observer
)
connector_config_observer.set_config(observed_connector_config)
return observed_connector_config

Expand Down
22 changes: 17 additions & 5 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar

import yaml
from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification, ConnectorSpecificationSerializer
from airbyte_cdk.models import (
AirbyteConnectionStatus,
ConnectorSpecification,
ConnectorSpecificationSerializer,
)


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
Expand Down Expand Up @@ -53,7 +57,9 @@ def _read_json_file(file_path: str) -> Any:
try:
return json.loads(contents)
except json.JSONDecodeError as error:
raise ValueError(f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON.")
raise ValueError(
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
)

@staticmethod
def write_config(config: TConfig, config_path: str) -> None:
Expand All @@ -72,15 +78,19 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
json_spec = load_optional_package_file(package, "spec.json")

if yaml_spec and json_spec:
raise RuntimeError("Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided.")
raise RuntimeError(
"Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided."
)

if yaml_spec:
spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader)
elif json_spec:
try:
spec_obj = json.loads(json_spec)
except json.JSONDecodeError as error:
raise ValueError(f"Could not read json spec file: {error}. Please ensure that it is a valid JSON.")
raise ValueError(
f"Could not read json spec file: {error}. Please ensure that it is a valid JSON."
)
else:
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")

Expand All @@ -101,7 +111,9 @@ def write_config(config: Mapping[str, Any], config_path: str) -> None: ...

class DefaultConnectorMixin:
# can be overridden to change an input config
def configure(self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
def configure(
self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
) -> Mapping[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config
Expand Down
36 changes: 28 additions & 8 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@
from typing import Any, List, Mapping

from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
)
from airbyte_cdk.models import Type
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

Expand All @@ -34,7 +41,9 @@ class TestReadLimits:

def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
command_config = config.get("__test_read_config", {})
max_pages_per_slice = command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
max_pages_per_slice = (
command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
)
max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
return TestReadLimits(max_records, max_pages_per_slice, max_slices)
Expand Down Expand Up @@ -64,15 +73,24 @@ def read_stream(
) -> AirbyteMessage:
try:
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records)
stream_name = configured_catalog.streams[0].stream.name # The connector builder only supports a single stream
stream_read = handler.get_message_groups(source, config, configured_catalog, state, limits.max_records)
stream_name = configured_catalog.streams[
0
].stream.name # The connector builder only supports a single stream
stream_read = handler.get_message_groups(
source, config, configured_catalog, state, limits.max_records
)
return AirbyteMessage(
type=MessageType.RECORD,
record=AirbyteRecordMessage(data=dataclasses.asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()),
record=AirbyteRecordMessage(
data=dataclasses.asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=filter_secrets(f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}")
exc,
message=filter_secrets(
f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
),
)
return error.as_airbyte_message()

Expand All @@ -88,7 +106,9 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message=f"Error resolving manifest: {str(exc)}")
error = AirbyteTracedException.from_exception(
exc, message=f"Error resolving manifest: {str(exc)}"
)
return error.as_airbyte_message()


Expand Down
30 changes: 24 additions & 6 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
from typing import Any, List, Mapping, Optional, Tuple

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.connector_builder.connector_builder_handler import TestReadLimits, create_source, get_limits, read_stream, resolve_manifest
from airbyte_cdk.connector_builder.connector_builder_handler import (
TestReadLimits,
create_source,
get_limits,
read_stream,
resolve_manifest,
)
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import (
AirbyteMessage,
Expand All @@ -22,11 +28,17 @@
from orjson import orjson


def get_config_and_catalog_from_args(args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
def get_config_and_catalog_from_args(
args: List[str],
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
config_path, catalog_path, state_path = parsed_args.config, parsed_args.catalog, parsed_args.state
config_path, catalog_path, state_path = (
parsed_args.config,
parsed_args.catalog,
parsed_args.state,
)
if parsed_args.command != "read":
raise ValueError("Only read commands are allowed for Connector Builder requests.")

Expand Down Expand Up @@ -64,7 +76,9 @@ def handle_connector_builder_request(
if command == "resolve_manifest":
return resolve_manifest(source)
elif command == "test_read":
assert catalog is not None, "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
assert (
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, state, limits)
else:
raise ValueError(f"Unrecognized command {command}.")
Expand All @@ -75,14 +89,18 @@ def handle_request(args: List[str]) -> str:
limits = get_limits(config)
source = create_source(config, limits)
return orjson.dumps(
AirbyteMessageSerializer.dump(handle_connector_builder_request(source, command, config, catalog, state, limits))
AirbyteMessageSerializer.dump(
handle_connector_builder_request(source, command, config, catalog, state, limits)
)
).decode() # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage


if __name__ == "__main__":
try:
print(handle_request(sys.argv[1:]))
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message=f"Error handling request: {str(exc)}")
error = AirbyteTracedException.from_exception(
exc, message=f"Error handling request: {str(exc)}"
)
m = error.as_airbyte_message()
print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
Loading

0 comments on commit 338f476

Please sign in to comment.