Skip to content

Commit

Permalink
Incremental table hints and incremental in resource decorator (#2033)
Browse files Browse the repository at this point in the history
* Incremental table hints and incremental in resource decorator

* Extract incremental settings to a dict in table schema
* Support passing incremental settings to @resource decorator

* Fix type errors

* Reset incremental from_hints when set in resource decorator

* Column hint

* Merge multiple hints

* Test non column match

* adds make_hints test

* Accept TIncrementalconfig in make hints

* bool only incremental hint

* Test jsonpath simple field name

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
steinitzu and rudolfix authored Nov 30, 2024
1 parent 09914a3 commit 61c2ed9
Show file tree
Hide file tree
Showing 25 changed files with 577 additions and 87 deletions.
6 changes: 5 additions & 1 deletion dlt/common/destination/typing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional

from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition, TTableReferenceParam
from dlt.common.schema.typing import (
_TTableSchemaBase,
TWriteDisposition,
TTableReferenceParam,
)


class PreparedTableSchema(_TTableSchemaBase, total=False):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union

from dlt.common.schema.typing import TColumnNames
from dlt.common.typing import TSortOrder
from dlt.extract.items import TTableHintTemplate
from dlt.common.typing import TSortOrder, TTableHintTemplate, TColumnNames

TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
Expand All @@ -19,10 +17,12 @@ class IncrementalColumnState(TypedDict):

class IncrementalArgs(TypedDict, total=False):
cursor_path: str
initial_value: Optional[str]
last_value_func: Optional[LastValueFunc[str]]
initial_value: Optional[Any]
last_value_func: Optional[Union[LastValueFunc[str], Literal["min", "max"]]]
"""Last value callable or name of built in function"""
primary_key: Optional[TTableHintTemplate[TColumnNames]]
end_value: Optional[str]
end_value: Optional[Any]
row_order: Optional[TSortOrder]
allow_external_schedulers: Optional[bool]
lag: Optional[Union[float, int]]
on_cursor_value_missing: Optional[OnCursorValueMissing]
3 changes: 1 addition & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@
)
from dlt.common.schema import Schema
from dlt.common.schema.typing import (
TColumnNames,
TColumnSchema,
TWriteDispositionConfig,
TSchemaContract,
)
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.time import ensure_pendulum_datetime, precise_time
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize, TColumnNames
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import TLoaderFileFormat
from dlt.common.utils import RowCounts, merge_row_counts
Expand Down
5 changes: 2 additions & 3 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from dlt.common.data_types import TDataType
from dlt.common.normalizers.typing import TNormalizersConfig
from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat
from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat, TColumnNames

try:
from pydantic import BaseModel as _PydanticBaseModel
Expand Down Expand Up @@ -132,8 +132,6 @@ class TColumnPropInfo(NamedTuple):
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
]
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""


class TColumnType(TypedDict, total=False):
Expand Down Expand Up @@ -166,6 +164,7 @@ class TColumnSchema(TColumnSchemaBase, total=False):
variant: Optional[bool]
hard_delete: Optional[bool]
dedup_sort: Optional[TSortOrder]
incremental: Optional[bool]


TTableSchemaColumns = Dict[str, TColumnSchema]
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,17 @@ def merge_diff(table: TTableSchema, table_diff: TPartialTableSchema) -> TPartial
* table hints are added or replaced from diff
* nothing gets deleted
"""

incremental_a_col = get_first_column_name_with_prop(
table, "incremental", include_incomplete=True
)
if incremental_a_col:
incremental_b_col = get_first_column_name_with_prop(
table_diff, "incremental", include_incomplete=True
)
if incremental_b_col:
table["columns"][incremental_a_col].pop("incremental")

# add new columns when all checks passed
updated_columns = merge_columns(table["columns"], table_diff["columns"])
table.update(table_diff)
Expand Down
7 changes: 7 additions & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Iterator,
Generator,
NamedTuple,
Sequence,
)

from typing_extensions import (
Expand Down Expand Up @@ -112,6 +113,8 @@ class SecretSentinel:

TSecretStrValue = Annotated[str, SecretSentinel]

TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""
TDataItem: TypeAlias = Any
"""A single data item as extracted from data source"""
TDataItems: TypeAlias = Union[TDataItem, List[TDataItem]]
Expand All @@ -126,6 +129,10 @@ class SecretSentinel:
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv", "reference"]
"""known loader file formats"""

TDynHintType = TypeVar("TDynHintType")
TFunHintTemplate = Callable[[TDataItem], TDynHintType]
TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]]


class ConfigValueSentinel(NamedTuple):
"""Class to create singleton sentinel for config and secret injected value"""
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

from dlt.common.destination import PreparedTableSchema
from dlt.common.pendulum import timezone
from dlt.common.schema.typing import (
TColumnNames,
TTableSchemaColumns,
)
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/lancedb/lancedb_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/qdrant/qdrant_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.extract import DltResource
from dlt.destinations.utils import get_resource_for_adapter

Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/weaviate/weaviate_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, Any, Literal, Set, get_args

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.extract import DltResource, resource as make_resource
from dlt.destinations.utils import get_resource_for_adapter

Expand Down
19 changes: 15 additions & 4 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION
from dlt.common.schema.schema import Schema
from dlt.common.schema.typing import (
TColumnNames,
TFileFormat,
TWriteDisposition,
TWriteDispositionConfig,
Expand All @@ -43,7 +42,8 @@
)
from dlt.common.storages.exceptions import SchemaNotFoundError
from dlt.common.storages.schema_storage import SchemaStorage
from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems
from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems, TColumnNames

from dlt.common.utils import get_callable_name, get_module_name, is_inner_callable

from dlt.extract.hints import make_hints
Expand All @@ -70,6 +70,7 @@
TSourceFunParams,
)
from dlt.extract.resource import DltResource, TUnboundDltResource, TDltResourceImpl
from dlt.extract.incremental import TIncrementalConfig


@configspec
Expand Down Expand Up @@ -446,6 +447,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> TDltResourceImpl: ...

Expand All @@ -468,6 +470,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl]: ...

Expand All @@ -490,6 +493,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
standalone: Literal[True] = True,
) -> Callable[
Expand All @@ -515,6 +519,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> TDltResourceImpl: ...

Expand All @@ -536,6 +541,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
standalone: bool = False,
data_from: TUnboundDltResource = None,
Expand Down Expand Up @@ -632,6 +638,7 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl:
table_format=table_format,
file_format=file_format,
references=references,
incremental=incremental,
)

resource = _impl_cls.from_data(
Expand All @@ -643,6 +650,10 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl:
cast(DltResource, data_from),
True,
)

if incremental:
# Reset the flag to allow overriding by incremental argument
resource.incremental._from_hints = False
# If custom nesting level was specified then
# we need to add it to table hints so that
# later in normalizer dlt/common/normalizers/json/relational.py
Expand Down Expand Up @@ -681,7 +692,7 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl:
return _wrap

def decorator(
f: Callable[TResourceFunParams, Any]
f: Callable[TResourceFunParams, Any],
) -> Callable[TResourceFunParams, TDltResourceImpl]:
if not callable(f):
if data_from:
Expand Down Expand Up @@ -1023,7 +1034,7 @@ def get_source() -> DltSource:


def defer(
f: Callable[TDeferredFunParams, TBoundItems]
f: Callable[TDeferredFunParams, TBoundItems],
) -> Callable[TDeferredFunParams, TDeferred[TBoundItems]]:
@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> TDeferred[TBoundItems]:
Expand Down
7 changes: 3 additions & 4 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Sequence as C_Sequence
from copy import copy
import itertools
from typing import Iterator, List, Dict, Any, Optional
from typing import Iterator, List, Dict, Any, Optional, Mapping
import yaml

from dlt.common.configuration.container import Container
Expand All @@ -17,13 +17,12 @@
WithStepInfo,
reset_resource_state,
)
from dlt.common.typing import DictStrAny
from dlt.common.typing import DictStrAny, TColumnNames
from dlt.common.runtime import signals
from dlt.common.runtime.collector import Collector, NULL_COLLECTOR
from dlt.common.schema import Schema, utils
from dlt.common.schema.typing import (
TAnySchemaColumns,
TColumnNames,
TSchemaContract,
TTableFormat,
TWriteDispositionConfig,
Expand All @@ -39,7 +38,7 @@

from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints
from dlt.extract.incremental import IncrementalResourceWrapper
from dlt.extract.incremental import IncrementalResourceWrapper, Incremental
from dlt.extract.pipe_iterator import PipeIterator
from dlt.extract.source import DltSource
from dlt.extract.resource import DltResource
Expand Down
Loading

0 comments on commit 61c2ed9

Please sign in to comment.