Skip to content

Commit

Permalink
bug: data yields into incorrect table when nesting manually
Browse files Browse the repository at this point in the history
  • Loading branch information
joscha committed Nov 28, 2024
1 parent 9a49868 commit 0471261
Showing 1 changed file with 77 additions and 2 deletions.
79 changes: 77 additions & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import shutil
import threading
from time import sleep
from typing import Any, List, Tuple, cast
from typing import Any, Dict, Iterable, List, Sequence, Tuple, cast
from tenacity import retry_if_exception, Retrying, stop_after_attempt

import pytest
Expand All @@ -35,7 +35,7 @@
from dlt.common.schema.exceptions import TableIdentifiersFrozen
from dlt.common.schema.typing import TColumnSchema
from dlt.common.schema.utils import new_column, new_table
from dlt.common.typing import DictStrAny
from dlt.common.typing import DictStrAny, TDataItem
from dlt.common.utils import uniq_id
from dlt.common.schema import Schema

Expand Down Expand Up @@ -2877,3 +2877,78 @@ def test_push_table_with_upfront_schema() -> None:
copy_pipeline = dlt.pipeline(pipeline_name="push_table_copy_pipeline", destination="duckdb")
info = copy_pipeline.run(data, table_name="events", schema=copy_schema)
assert copy_pipeline.default_schema.version_hash != infer_hash

def test_nested_inserts_correct_target() -> None:
@dlt.resource(
primary_key="id",
columns={"id": {"data_type": "bigint"}},
)
def my_resource():
yield [
{
"id": 1000,
"fields": [
{"id": "a", "value": 1},
{"id": "b", "value": 2},
{"id": "c", "value": 3},
]
},
{
"id": 2000,
"fields": [
{"id": "a", "value": 4},
{"id": "b", "value": 5},
{"id": "c", "value": 6},
]
},
]

@dlt.transformer(
data_from=my_resource,
write_disposition="replace",
# parallelized=True,
primary_key="id",
merge_key="id"
)
def things(
my_resources: List[TDataItem],
) -> Iterable[TDataItem]:


for my_resource in my_resources:
fields: List[Dict] = my_resource.pop("fields")
yield my_resource
for field in fields:
#id = field.pop("id")
id = field["id"]
table_name = f"things_{id}"
field = { "my_resource_id": my_resource["id"] } | field
yield dlt.mark.with_hints(
item=field,
hints=dlt.mark.make_hints(
table_name=table_name,
write_disposition="replace",
)
)

@dlt.source()
def my_source(
) -> Sequence[DltResource]:
return (
things
)

pipeline_name = "pipe_" + uniq_id()
pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="duckdb")
info = pipeline.run(my_source())
assert_load_info(info)
rows = load_tables_to_dicts(pipeline, "things_c", exclude_system_cols=True)
print(rows)
assert_data_table_counts(pipeline, {"things": 1, "things_a": 1, "things_b": 1, "things_c": 1 })
assert pipeline.last_trace.last_normalize_info.row_counts == {
"_dlt_pipeline_state": 1,
"things": 2,
"things_a": 2,
"things_b": 2,
"things_c": 2,
}

0 comments on commit 0471261

Please sign in to comment.