Skip to content

Commit

Permalink
feat(HttpComponentsResolver): added stream slices to HttpComponentsRe…
Browse files Browse the repository at this point in the history
…solver (#175)

Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
darynaishchenko and octavia-squidington-iii authored Dec 20, 2024
1 parent 2ab345f commit f8054a8
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3049,6 +3049,7 @@ definitions:
interpolation_context:
- config
- components_values
- stream_slice
- stream_template_config
examples:
- ["data"]
Expand All @@ -3065,10 +3066,13 @@ definitions:
- config
- stream_template_config
- components_values
- stream_slice
examples:
- "{{ components_values['updates'] }}"
- "{{ components_values['MetaData']['LastUpdatedTime'] }}"
- "{{ config['segment_id'] }}"
- "{{ stream_slice['parent_id'] }}"
- "{{ stream_slice['extra_fields']['name'] }}"
value_type:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,7 @@ def create_http_components_resolver(
config=config,
name="",
primary_key=None,
stream_slicer=combined_slicers,
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
transformations=[],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,25 @@ def resolve_components(
"""
kwargs = {"stream_template_config": stream_template_config}

for components_values in self.retriever.read_records({}):
updated_config = deepcopy(stream_template_config)
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]

for resolved_component in self._resolved_components:
valid_types = (
(resolved_component.value_type,) if resolved_component.value_type else None
)
value = resolved_component.value.eval(
self.config, valid_types=valid_types, **kwargs
)
for stream_slice in self.retriever.stream_slices():
for components_values in self.retriever.read_records(
records_schema={}, stream_slice=stream_slice
):
updated_config = deepcopy(stream_template_config)
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]

for resolved_component in self._resolved_components:
valid_types = (
(resolved_component.value_type,) if resolved_component.value_type else None
)
value = resolved_component.value.eval(
self.config, valid_types=valid_types, **kwargs
)

path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
dpath.set(updated_config, path, value)
path = [
path.eval(self.config, **kwargs) for path in resolved_component.field_path
]
dpath.set(updated_config, path, value)

yield updated_config
yield updated_config
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,133 @@
],
}

_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM = {
"version": "6.7.0",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
"dynamic_streams": [
{
"type": "DynamicDeclarativeStream",
"stream_template": {
"type": "DeclarativeStream",
"name": "",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"ABC": {"type": "number"},
"AED": {"type": "number"},
},
"type": "object",
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
},
},
"components_resolver": {
"type": "HttpComponentsResolver",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "parent/{{ stream_partition.parent_id }}/items",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
"partition_router": {
"type": "SubstreamPartitionRouter",
"parent_stream_configs": [
{
"type": "ParentStreamConfig",
"parent_key": "id",
"partition_field": "parent_id",
"stream": {
"type": "DeclarativeStream",
"name": "parent",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "/parents",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "DpathExtractor",
"field_path": [],
},
},
},
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {"id": {"type": "integer"}},
"type": "object",
},
},
},
}
],
},
},
"components_mapping": [
{
"type": "ComponentMappingDefinition",
"field_path": ["name"],
"value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}",
},
{
"type": "ComponentMappingDefinition",
"field_path": [
"retriever",
"requester",
"path",
],
"value": "{{ stream_slice['parent_id'] }}/{{ components_values['id'] }}",
},
],
},
}
],
}


@pytest.mark.parametrize(
"components_mapping, retriever_data, stream_template_config, expected_result",
Expand All @@ -221,6 +348,44 @@ def test_http_components_resolver(
):
mock_retriever = MagicMock()
mock_retriever.read_records.return_value = retriever_data
mock_retriever.stream_slices.return_value = [{}]
config = {}

resolver = HttpComponentsResolver(
retriever=mock_retriever,
config=config,
components_mapping=components_mapping,
parameters={},
)

result = list(resolver.resolve_components(stream_template_config=stream_template_config))
assert result == expected_result


@pytest.mark.parametrize(
"components_mapping, retriever_data, stream_template_config, expected_result",
[
(
[
ComponentMappingDefinition(
field_path=[InterpolatedString.create("path", parameters={})],
value="{{stream_slice['parent_id']}}/{{components_values['id']}}",
value_type=str,
parameters={},
)
],
[{"id": "1", "field1": "data1"}, {"id": "2", "field1": "data2"}],
{"path": None},
[{"path": "1/1"}, {"path": "1/2"}, {"path": "2/1"}, {"path": "2/2"}],
)
],
)
def test_http_components_resolver_with_stream_slices(
components_mapping, retriever_data, stream_template_config, expected_result
):
mock_retriever = MagicMock()
mock_retriever.read_records.return_value = retriever_data
mock_retriever.stream_slices.return_value = [{"parent_id": 1}, {"parent_id": 2}]
config = {}

resolver = HttpComponentsResolver(
Expand Down Expand Up @@ -305,3 +470,66 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver():
str(exc_info.value)
== "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support."
)


def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream():
expected_stream_names = [
"parent_1_item_1",
"parent_1_item_2",
"parent_2_item_1",
"parent_2_item_2",
]
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/parents"),
HttpResponse(body=json.dumps([{"id": 1}, {"id": 2}])),
)
parent_ids = [1, 2]
for parent_id in parent_ids:
http_mocker.get(
HttpRequest(url=f"https://api.test.com/parent/{parent_id}/items"),
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "item_1"},
{"id": 2, "name": "item_2"},
]
)
),
)
dynamic_stream_paths = ["1/1", "2/1", "1/2", "2/2"]
for dynamic_stream_path in dynamic_stream_paths:
http_mocker.get(
HttpRequest(url=f"https://api.test.com/{dynamic_stream_path}"),
HttpResponse(body=json.dumps([{"ABC": 1, "AED": 2}])),
)

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM,
config=_CONFIG,
catalog=None,
state=None,
)

actual_catalog = source.discover(logger=source.logger, config=_CONFIG)

configured_streams = [
to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
for stream in actual_catalog.streams
]
configured_catalog = to_configured_catalog(configured_streams)

records = [
message.record
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
if message.type == Type.RECORD
]

assert len(actual_catalog.streams) == 4
assert [stream.name for stream in actual_catalog.streams] == expected_stream_names
assert len(records) == 4

actual_record_stream_names = [record.stream for record in records]
actual_record_stream_names.sort()

assert actual_record_stream_names == expected_stream_names

0 comments on commit f8054a8

Please sign in to comment.