From bf816b6a192076e8ca6425a526b1870f92747164 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 29 Feb 2024 16:56:53 +0100 Subject: [PATCH] add pandas arrow verified sources --- sources/arrow_pandas/__init__.py | 3 ++ sources/arrow_pandas/example_resources.py | 51 +++++++++++++++++++ sources/arrow_pandas/requirements.txt | 2 + sources/arrow_pandas_pipeline.py | 12 +++++ tests/arrow_pandas/__init__.py | 0 .../arrow_pandas/test_arrow_pandas_source.py | 20 ++++++++ 6 files changed, 88 insertions(+) create mode 100644 sources/arrow_pandas/__init__.py create mode 100644 sources/arrow_pandas/example_resources.py create mode 100644 sources/arrow_pandas/requirements.txt create mode 100644 sources/arrow_pandas_pipeline.py create mode 100644 tests/arrow_pandas/__init__.py create mode 100644 tests/arrow_pandas/test_arrow_pandas_source.py diff --git a/sources/arrow_pandas/__init__.py b/sources/arrow_pandas/__init__.py new file mode 100644 index 000000000..366d0db62 --- /dev/null +++ b/sources/arrow_pandas/__init__.py @@ -0,0 +1,3 @@ +""" +This source provides a simple starting point for loading data from arrow tables or pandas dataframes +""" diff --git a/sources/arrow_pandas/example_resources.py b/sources/arrow_pandas/example_resources.py new file mode 100644 index 000000000..721dff11f --- /dev/null +++ b/sources/arrow_pandas/example_resources.py @@ -0,0 +1,51 @@ +import dlt + +from typing import Generator + +from dlt.common import pendulum +import pandas as pd +import pyarrow as pa + +# this is example data, you will get this from somewhere on your resource function +EXAMPLE_ORDERS_DATA_FRAME = pd.DataFrame( + { + "order_id": [1, 2, 3], + "customer_id": [1, 2, 3], + "ordered_at": [ + pendulum.DateTime(2021, 1, 1, 4, 5, 6), + pendulum.DateTime(2021, 1, 3, 4, 5, 6), + pendulum.DateTime(2021, 1, 6, 4, 5, 6), + ], + "order_amount": [100.0, 200.0, 300.0], + } +) + +EXAMPLE_CUSTOMERS_DATA_FRAME = pd.DataFrame( + { + "customer_id": [1, 2, 3], + "name": ["Alice", "Bob", "Charlie"], + "age": [25, 30, 35], + } +) + + +@dlt.resource(name="orders", write_disposition="append") +def orders() -> Generator[pd.DataFrame, None, None]: + # we can yield dataframes here, you will usually read them from a file or + # receive them from another library + yield EXAMPLE_ORDERS_DATA_FRAME + + +@dlt.resource( + name="customers", + write_disposition="merge", + primary_key="customer_id", + merge_key="customer_id", +) +def customers() -> Generator[pd.DataFrame, None, None]: + # we can yield arrow tables here, you will usually read them from a file or + # receive them from another library + + # here we convert our dataframe to an arrow table, usually you would just yield the + # dataframe if you have it, this is for demonstration purposes + yield pa.Table.from_pandas(EXAMPLE_ORDERS_DATA_FRAME) diff --git a/sources/arrow_pandas/requirements.txt b/sources/arrow_pandas/requirements.txt new file mode 100644 index 000000000..536506745 --- /dev/null +++ b/sources/arrow_pandas/requirements.txt @@ -0,0 +1,2 @@ +pandas = "^2.0.0" +dlt>=0.4.0 \ No newline at end of file diff --git a/sources/arrow_pandas_pipeline.py b/sources/arrow_pandas_pipeline.py new file mode 100644 index 000000000..151f372a5 --- /dev/null +++ b/sources/arrow_pandas_pipeline.py @@ -0,0 +1,12 @@ +"""Very simple pipeline, to be used as a starting point for pandas or arrow pipelines. + +""" + +import dlt +from arrow_pandas.example_resources import orders, customers + + +if __name__ == "__main__": + pipeline = dlt.pipeline("orders_pipeline", destination="duckdb") + # run both resources + pipeline.run([orders, customers]) diff --git a/tests/arrow_pandas/__init__.py b/tests/arrow_pandas/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/arrow_pandas/test_arrow_pandas_source.py b/tests/arrow_pandas/test_arrow_pandas_source.py new file mode 100644 index 000000000..ee2982964 --- /dev/null +++ b/tests/arrow_pandas/test_arrow_pandas_source.py @@ -0,0 +1,20 @@ +import dlt +import pytest + +from sources.arrow_pandas.example_resources import orders, customers +from tests.utils import ALL_DESTINATIONS, assert_load_info, load_table_counts + + +@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS) +def test_example_resources(destination_name: str) -> None: + """Simple test for the example resources.""" + + p = dlt.pipeline("orders_pipeline", destination=destination_name, full_refresh=True) + orders.apply_hints(incremental=dlt.sources.incremental("ordered_at")) + + # run pipeline + info = p.run([orders(), customers()]) + + # check that the data was loaded + assert_load_info(info) + assert load_table_counts(p, "orders", "customers") == {"orders": 3, "customers": 3}