Skip to content

Commit

Permalink
Include data from parent resource in child resource: ported to a new …
Browse files Browse the repository at this point in the history
…version
  • Loading branch information
burnash committed Feb 19, 2024
1 parent 16c5a7a commit c6015fe
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 2 deletions.
31 changes: 29 additions & 2 deletions sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ def setup_incremental_object_from_config(config):
)


def make_parent_key_name(resource_name, field_name):
return f"_{resource_name}_{field_name}"


@dlt.source
def rest_api_source(config: RESTAPIConfig):
"""
Expand Down Expand Up @@ -285,6 +289,15 @@ def rest_api_resources(config: RESTAPIConfig):

# TODO: Remove _resolved_param from endpoint_resource
resolved_param: ResolvedParam = endpoint_resource.pop("_resolved_param", None)
include_from_parent: list[str] = endpoint_resource.pop(
"include_from_parent", []
)
if not resolved_param and include_from_parent:
raise ValueError(
f"Resource {resource_name} has include_from_parent but is not "
"dependent on another resource"
)

resource_kwargs = remove_key(endpoint_resource, "endpoint")

incremental_object, incremental_param = setup_incremental_object(
Expand Down Expand Up @@ -338,13 +351,27 @@ def paginate_dependent_resource(
items = items or []
for item in items:
formatted_path = path.format(**{param_name: item[field_path]})
parent_resource_name = resolved_param.resolve_config.resource_name

yield from client.paginate(
parent_record = (
{
make_parent_key_name(parent_resource_name, key): item[key]
for key in include_from_parent
}
if include_from_parent
else None
)

for child_page in client.paginate(
method=method,
path=formatted_path,
params=params,
paginator=paginator,
)
):
if parent_record:
for child_record in child_page:
child_record.update(parent_record)
yield child_page

resources[resource_name] = dlt.resource(
paginate_dependent_resource,
Expand Down
1 change: 1 addition & 0 deletions sources/rest_api_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def load_github():
}
},
},
"include_from_parent": ["id"],
},
],
}
Expand Down
Empty file added tests/rest_api/__init__.py
Empty file.
51 changes: 51 additions & 0 deletions tests/rest_api/test_rest_api_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts
import pytest
import dlt

from sources.rest_api import rest_api_source


@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
def test_rest_api_source(destination_name: str) -> None:
pipeline = dlt.pipeline(
pipeline_name="rest_api",
destination=destination_name,
dataset_name="rest_api_data",
full_refresh=True,
)

config = {
"client": {
"base_url": "https://pokeapi.co/api/v2/",
},
"resource_defaults": {
"endpoint": {
"params": {
"limit": 1000,
},
}
},
"resources": [
"pokemon",
"berry",
"location",
],
}

load_info = pipeline.run(rest_api_source(config))
print(load_info)
assert_load_info(load_info)
table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
table_counts = load_table_counts(pipeline, *table_names)

assert table_counts.keys() == {"pokemon", "berry", "location"}

assert table_counts["pokemon"] == 1302
assert table_counts["berry"] == 64
assert table_counts["location"] == 1036


# TODO: Add incorrect config test
# - incorrect default_resource (missing endpoint, nested params)
# - incorrect resources
# - incorrect key (default_resource)

0 comments on commit c6015fe

Please sign in to comment.