Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add config component resolver #149

Merged
merged 15 commits into from
Dec 12, 2024
Merged
58 changes: 55 additions & 3 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2920,16 +2920,20 @@ definitions:
description: A list of potentially nested fields indicating the full path where value will be added or updated.
type: array
items:
- type: string
anyOf:
- type: string
- type: integer
interpolation_context:
- config
- components_values
- stream_template_config
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", 1, "name"]
- ["data", "{{ components_values.name }}"]
- ["data", "*", "record"]
- ["*", "**", "name"]
value:
title: Value
description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.
Expand Down Expand Up @@ -2974,6 +2978,52 @@ definitions:
- type
- retriever
- components_mapping
StreamConfig:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Stream Config
description: (This component is experimental. Use at your own risk.) Describes how to get streams config from the source config.
type: object
required:
- type
- configs_pointer
properties:
type:
type: string
enum: [StreamConfig]
configs_pointer:
title: Configs Pointer
description: A list of potentially nested fields indicating the full path in source config file where streams configs located.
type: array
items:
- type: string
interpolation_context:
- parameters
examples:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
$parameters:
type: object
additionalProperties: true
ConfigComponentsResolver:
type: object
description: (This component is experimental. Use at your own risk.) Component resolve and populates stream templates with components fetched from the source config.
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
properties:
type:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
"$ref": "#/definitions/ComponentMappingDefinition"
$parameters:
type: object
additionalProperties: true
required:
- type
- stream_config
- components_mapping
DynamicDeclarativeStream:
type: object
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
Expand All @@ -2988,7 +3038,9 @@ definitions:
components_resolver:
title: Components Resolver
description: Component resolve and populates stream templates with components values.
"$ref": "#/definitions/HttpComponentsResolver"
anyOf:
- "$ref": "#/definitions/HttpComponentsResolver"
- "$ref": "#/definitions/ConfigComponentsResolver"
required:
- type
- stream_template
Expand Down
10 changes: 8 additions & 2 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pkgutil
from copy import deepcopy
from importlib import metadata
from typing import Any, Dict, Iterator, List, Mapping, Optional
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set

import yaml
from jsonschema.exceptions import ValidationError
Expand Down Expand Up @@ -313,6 +313,7 @@ def _dynamic_stream_configs(
) -> List[Dict[str, Any]]:
dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
dynamic_stream_configs: List[Dict[str, Any]] = []
seen_dynamic_streams: Set[str] = set()

for dynamic_definition in dynamic_stream_definitions:
components_resolver_config = dynamic_definition["components_resolver"]
Expand Down Expand Up @@ -350,7 +351,12 @@ def _dynamic_stream_configs(
if "type" not in dynamic_stream:
dynamic_stream["type"] = "DeclarativeStream"

dynamic_stream_configs.append(dynamic_stream)
# Ensure that each stream is created with a unique name
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
name = dynamic_stream.get("name")

if name not in seen_dynamic_streams:
seen_dynamic_streams.add(name)
dynamic_stream_configs.append(dynamic_stream)

return dynamic_stream_configs

Expand Down
130 changes: 87 additions & 43 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
"DeclarativeStream.schema_loader": "JsonFileSchemaLoader",
# DynamicDeclarativeStream
"DynamicDeclarativeStream.stream_template": "DeclarativeStream",
"DynamicDeclarativeStream.components_resolver": "HttpComponentsResolver",
"DynamicDeclarativeStream.components_resolver": "ConfigComponentResolver",
# HttpComponentsResolver
"HttpComponentsResolver.retriever": "SimpleRetriever",
"HttpComponentsResolver.components_mapping": "ComponentMappingDefinition",
# ConfigComponentResolver
"ConfigComponentsResolver.stream_config": "StreamConfig",
"ConfigComponentsResolver.components_mapping": "ComponentMappingDefinition",
# DefaultErrorHandler
"DefaultErrorHandler.response_filters": "HttpResponseFilter",
# DefaultPaginator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
HttpComponentsResolver as HttpComponentsResolverModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConfigComponentsResolver as ConfigComponentsResolverModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
StreamConfig as StreamConfigModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
HttpRequester as HttpRequesterModel,
)
Expand Down Expand Up @@ -348,6 +354,8 @@
from airbyte_cdk.sources.declarative.resolvers import (
ComponentMappingDefinition,
HttpComponentsResolver,
ConfigComponentsResolver,
StreamConfig,
)
from airbyte_cdk.sources.declarative.retrievers import (
AsyncRetriever,
Expand Down Expand Up @@ -479,6 +487,8 @@ def _init_mappings(self) -> None:
WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header,
AsyncRetrieverModel: self.create_async_retriever,
HttpComponentsResolverModel: self.create_http_components_resolver,
ConfigComponentsResolverModel: self.create_config_components_resolver,
StreamConfigModel: self.create_stream_config,
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
}

Expand Down Expand Up @@ -1812,8 +1822,8 @@ def create_record_selector(
self,
model: RecordSelectorModel,
config: Config,
name: str,
*,
name: str,
transformations: List[RecordTransformation],
decoder: Optional[Decoder] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -2292,3 +2302,41 @@ def create_http_components_resolver(
components_mapping=components_mapping,
parameters=model.parameters or {},
)

@staticmethod
def create_stream_config(
model: StreamConfigModel, config: Config, **kwargs: Any
) -> StreamConfig:
model_configs_pointer: List[Union[InterpolatedString, str]] = (
[x for x in model.configs_pointer] if model.configs_pointer else []
)

return StreamConfig(
configs_pointer=model_configs_pointer,
parameters=model.parameters or {},
)

def create_config_components_resolver(
self, model: ConfigComponentsResolverModel, config: Config
) -> Any:
stream_config = self._create_component_from_model(
model.stream_config, config=config, parameters=model.parameters or {}
)

components_mapping = [
self._create_component_from_model(
model=components_mapping_definition_model,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(
components_mapping_definition_model.value_type
),
config=config,
)
for components_mapping_definition_model in model.components_mapping
]

return ConfigComponentsResolver(
stream_config=stream_config,
config=config,
components_mapping=components_mapping,
parameters=model.parameters or {},
)
7 changes: 5 additions & 2 deletions airbyte_cdk/sources/declarative/resolvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

from airbyte_cdk.sources.declarative.resolvers.components_resolver import ComponentsResolver, ComponentMappingDefinition, ResolvedComponentMappingDefinition
from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import HttpComponentsResolver
from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ConfigComponentsResolver, StreamConfig
from airbyte_cdk.sources.declarative.models import HttpComponentsResolver as HttpComponentsResolverModel
from airbyte_cdk.sources.declarative.models import ConfigComponentsResolver as ConfigComponentsResolverModel

COMPONENTS_RESOLVER_TYPE_MAPPING = {
"HttpComponentsResolver": HttpComponentsResolverModel
"HttpComponentsResolver": HttpComponentsResolverModel,
"ConfigComponentsResolver": ConfigComponentsResolverModel
}

__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "COMPONENTS_RESOLVER_TYPE_MAPPING"]
__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "StreamConfig", "ConfigComponentsResolver", "COMPONENTS_RESOLVER_TYPE_MAPPING"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from copy import deepcopy
from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Union

import dpath
from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
ComponentMappingDefinition,
ComponentsResolver,
ResolvedComponentMappingDefinition,
)
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass
class StreamConfig:
"""
Identifies stream config details for dynamic schema extraction and processing.
"""

configs_pointer: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.configs_pointer = [
InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer
]


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass
class ConfigComponentsResolver(ComponentsResolver):
"""
Resolves and populates stream templates with components fetched via source config.

Attributes:
stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
config (Config): Configuration object for the resolver.
components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
"""

stream_config: StreamConfig
config: Config
components_mapping: List[ComponentMappingDefinition]
parameters: InitVar[Mapping[str, Any]]
_resolved_components: List[ResolvedComponentMappingDefinition] = field(
init=False, repr=False, default_factory=list
)

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
"""
Initializes and parses component mappings, converting them to resolved definitions.

Args:
parameters (Mapping[str, Any]): Parameters for interpolation.
"""

for component_mapping in self.components_mapping:
if isinstance(component_mapping.value, (str, InterpolatedString)):
interpolated_value = (
InterpolatedString.create(component_mapping.value, parameters=parameters)
if isinstance(component_mapping.value, str)
else component_mapping.value
)

field_path = [
InterpolatedString.create(path, parameters=parameters)
for path in component_mapping.field_path
]

self._resolved_components.append(
ResolvedComponentMappingDefinition(
field_path=field_path,
value=interpolated_value,
value_type=component_mapping.value_type,
parameters=parameters,
)
)
else:
raise ValueError(
f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
)

@property
def _stream_config(self):
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self.stream_config.configs_pointer
]
stream_config = dpath.get(self.config, path, default=[])

if not isinstance(stream_config, list):
stream_config = [stream_config]

return stream_config

def resolve_components(
self, stream_template_config: Dict[str, Any]
) -> Iterable[Dict[str, Any]]:
"""
Resolves components in the stream template configuration by populating values.

Args:
stream_template_config (Dict[str, Any]): Stream template to populate.

Yields:
Dict[str, Any]: Updated configurations with resolved components.
"""
kwargs = {"stream_template_config": stream_template_config}

for components_values in self._stream_config:
updated_config = deepcopy(stream_template_config)
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]

for resolved_component in self._resolved_components:
valid_types = (
(resolved_component.value_type,) if resolved_component.value_type else None
)
value = resolved_component.value.eval(
self.config, valid_types=valid_types, **kwargs
)

path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]

dpath.set(updated_config, path, value)

yield updated_config
Loading
Loading