Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: verified sources tests #318

Merged
merged 29 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
eb2197c
run chess tests
AstrakhantsevaAA Jan 8, 2024
f6571a6
[fix] chess tests: convert dates from bigint to timestamps
AstrakhantsevaAA Feb 5, 2024
d1f2c3e
[fix] stripe tests: convert dates from bigint to timestamps, re-creat…
AstrakhantsevaAA Feb 5, 2024
383cb33
black chess
AstrakhantsevaAA Feb 5, 2024
ca373a3
run GA tests
AstrakhantsevaAA Feb 5, 2024
484a1cb
[fix] update GA test data
AstrakhantsevaAA Feb 5, 2024
e744c3b
run GA tests
AstrakhantsevaAA Feb 6, 2024
19f7c00
change initial date to "2015-08-14"
AstrakhantsevaAA Feb 6, 2024
cb853ea
GA lint
AstrakhantsevaAA Feb 6, 2024
f1e52d1
run GS tests
AstrakhantsevaAA Feb 6, 2024
8d0259a
[fix] remove a type which doesn't exist in a pipeline
AstrakhantsevaAA Feb 6, 2024
da8a74c
[fix] comment date_test__v_bool in sql query
AstrakhantsevaAA Feb 6, 2024
b668313
hubspot lint + run tests
AstrakhantsevaAA Feb 6, 2024
4d54f3c
[fix] hubspot: tests history as range
AstrakhantsevaAA Feb 6, 2024
69982c8
[fix] jira tests: decrease number of issues
AstrakhantsevaAA Feb 6, 2024
a705101
lint jira test
AstrakhantsevaAA Feb 12, 2024
74a8375
[fix] fix stripe again
AstrakhantsevaAA Feb 12, 2024
42c395d
[fix] replace destinations to duckdb. rename pipelines and datasets
AstrakhantsevaAA Feb 13, 2024
d8318cc
airtable: add some defaults to run it
AstrakhantsevaAA Feb 13, 2024
dd7fca3
asana: fix the api client version
AstrakhantsevaAA Feb 13, 2024
74796d3
small refactoring
AstrakhantsevaAA Feb 13, 2024
49c1466
small refactoring
AstrakhantsevaAA Feb 13, 2024
d546b21
google analytics: small refactoring
AstrakhantsevaAA Feb 13, 2024
a626425
google sheets: small refactoring
AstrakhantsevaAA Feb 13, 2024
59c70dd
hubspot: small refactoring
AstrakhantsevaAA Feb 13, 2024
57b7b75
hubspot: small refactoring
AstrakhantsevaAA Feb 13, 2024
e8d0beb
stripe: small refactoring
AstrakhantsevaAA Feb 13, 2024
5a6bdb6
fix typing, delete extra args
AstrakhantsevaAA Feb 13, 2024
4da86c7
black
AstrakhantsevaAA Feb 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sources/airtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ to the
1. You're now ready to run the pipeline! To get started, run the following command:

```bash
python3 airtable_pipeline.py
python airtable_pipeline.py
```

1. Once the pipeline has finished running, you can verify that everything loaded correctly by using
Expand Down
19 changes: 11 additions & 8 deletions sources/airtable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Source that loads tables form Airtable.
Supports whitelisting of tables or loading of all tables from a specified base.
"""
from typing import Optional, Iterable, Iterator, List, Dict, Any
from typing import Any, Dict, Iterable, Iterator, List, Optional

import dlt
from dlt.sources import DltResource
from dlt.common.typing import TDataItem

import pyairtable
from dlt.sources import DltResource


@dlt.source
Expand All @@ -19,9 +17,13 @@ def airtable_source(
"""
Represents tables for a single Airtable base.
Args:
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table_names (Optional[List[str]]): A list of table IDs or table names to load. Unless specified otherwise, all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table_names (Optional[List[str]]): A list of table IDs or table names to load.
Unless specified otherwise, all tables in the schema are loaded.
Names are freely user-defined. IDs start with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token.
See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
"""
api = pyairtable.Api(access_token)
all_tables_url = api.build_url(f"meta/bases/{base_id}/tables")
Expand All @@ -43,7 +45,8 @@ def airtable_resource(
Represents a single airtable.
Args:
api (pyairtable.Api): The API connection object
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table (Dict[str, Any]): Metadata about an airtable, does not contain the actual records
"""
primary_key_id = table["primaryFieldId"]
Expand Down
2 changes: 1 addition & 1 deletion sources/airtable/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pyairtable~=2.1
dlt>=0.3.17
dlt>=0.3.25
84 changes: 53 additions & 31 deletions sources/airtable_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from typing import List
from typing import List, Dict, Any

import dlt

from airtable import airtable_source


def load_entire_base(base_id: str) -> None:
def load_entire_base(base_id: str, resources_to_apply_hints: Dict[str, Any]) -> None:
"""
Loads all tables from the specified Airtable base.

Args:
base_id (str): The id of the base. Obtain it, e.g. from the URL in your web browser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.

Note:
- The base_id can either be passed directly or set up in ".dlt/config.toml".
Expand All @@ -23,6 +25,13 @@ def load_entire_base(base_id: str) -> None:
# Retrieve data from Airtable using airtable_source.
airtables = airtable_source(base_id=base_id)

# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
columns={field_name: {"name": field_name, "data_type": "text"}}
)

load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)

Expand All @@ -37,6 +46,7 @@ def load_select_tables_from_base_by_id(base_id: str, table_names: List[str]) ->
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-ids
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.

Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -60,7 +70,7 @@ def load_select_tables_from_base_by_id(base_id: str, table_names: List[str]) ->


def load_select_tables_from_base_by_name(
base_id: str, table_names: List[str], resource_name: str, field_name: str
base_id: str, table_names: List[str], resources_to_apply_hints: Dict[str, Any]
) -> None:
"""
Loads specific table names from an Airtable base.
Expand All @@ -71,8 +81,7 @@ def load_select_tables_from_base_by_name(
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-idss
resource_name (str): The table name we want to apply hints.
field_name (str): The table field name for which we want to apply hints.
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.

Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -89,16 +98,19 @@ def load_select_tables_from_base_by_name(
table_names=table_names,
)

airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"data_type": "text"}},
)
# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
columns={field_name: {"name": field_name, "data_type": "text"}}
)

load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)


def load_and_customize_write_disposition(
base_id: str, table_names: List[str], resource_name: str, field_name: str
base_id: str, table_names: List[str], resources_to_apply_hints: Dict[str, Any]
) -> None:
"""
Loads data from a specific Airtable base's table with customized write disposition("merge") using field_name.
Expand All @@ -109,8 +121,8 @@ def load_and_customize_write_disposition(
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-ids
resource_name (str): The table name we want to apply hints.
field_name (str): The table field name for which we want to apply hints.
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.


Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -127,31 +139,41 @@ def load_and_customize_write_disposition(
base_id=base_id,
table_names=table_names,
)
airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"data_type": "text"}},
)

# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"name": field_name, "data_type": "text"}},
write_disposition="merge",
)

load_info = pipeline.run(airtables)
print(load_info)


if __name__ == "__main__":
base_id_example = "Please set me up!"
table_names_example = ["Please set me up!"]
resource_name_to_apply_hints = "Please set me up!"
field_name_example = "Please set me up!"

load_entire_base(base_id_example)
load_select_tables_from_base_by_id(base_id_example, table_names_example)
load_entire_base(
base_id="app7RlqvdoOmJm9XR",
resources_to_apply_hints={
"🎤 Speakers": ["Name"],
"📆 Schedule": ["Activity"],
"🪑 Attendees": ["Name"],
"💰 Budget": ["Item"],
},
)
load_select_tables_from_base_by_id(
base_id="app7RlqvdoOmJm9XR",
table_names=["tblKHM5s3AujfSbAH", "tbloBrS8PnoO63aMP"],
)
load_select_tables_from_base_by_name(
base_id_example,
table_names_example,
resource_name_to_apply_hints,
field_name_example,
"app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
resources_to_apply_hints={"💰 Budget": ["Item"]},
)
load_and_customize_write_disposition(
base_id_example,
table_names_example,
resource_name_to_apply_hints,
field_name_example,
base_id="appcChDyP0pZeC76v",
table_names=["tbl1sN4CpPv8pBll4"],
resources_to_apply_hints={"Sheet1": ["Name"]},
)
32 changes: 16 additions & 16 deletions sources/asana_dlt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ Resources that can be loaded using this verified source are:

## Initialize the pipeline with Asana source
```bash
dlt init asana_dlt bigquery
dlt init asana_dlt duckdb
```

Here, we chose BigQuery as the destination. Alternatively, you can also choose redshift, duckdb, or any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).
Here, we chose DuckDB as the destination. Alternatively, you can also choose redshift, bigquery, or any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Grab Asana credentials

Expand All @@ -29,41 +29,41 @@ To grab the Asana credentials please refer to the [full documentation here.](htt

1. Open .dlt/secrets.toml.
2. Enter the access token:

```toml
[sources.asana_dlt]
access_token = "access_token" # please set me up!
```

3. Enter credentials for your chosen destination as per the [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Run the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:

```bash
pip install -r requirements.txt

```

2. You're now ready to run the pipeline! To get started, run the following command:

```bash
python3 asana_dlt_pipeline.py
python asana_dlt_pipeline.py

```

3. Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:

```bash
dlt pipeline <pipeline_name> show
```
Note that in the above command, replace `<pipeline_name>` with the name of your pipeline. For example, if you named your pipeline "asana," you would run:

Note that in the above command, replace `<pipeline_name>` with the name of your pipeline. For example, if you named your pipeline "asana" you would run:

```bash
dlt pipeline asana show
```


💡 To explore additional customizations for this pipeline, I recommend referring to the official Asana documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the Asana documentation in [Setup Guide: Asana](https://dlthub.com/docs/dlt-ecosystem/verified-sources/asana)
25 changes: 15 additions & 10 deletions sources/asana_dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,30 @@
"""

import typing as t
from typing import Iterable, Any
from typing import Any, Iterable

import dlt
from dlt.common.typing import TDataItem

from .helpers import get_client
from .settings import (
PROJECT_FIELDS,
USER_FIELDS,
DEFAULT_START_DATE,
PROJECT_FIELDS,
REQUEST_TIMEOUT,
SECTION_FIELDS,
STORY_FIELDS,
TAG_FIELDS,
TASK_FIELDS,
STORY_FIELDS,
TEAMS_FIELD,
USER_FIELDS,
WORKSPACE_FIELDS,
)
from .helpers import get_client


@dlt.source
def asana_source(
access_token: str = dlt.secrets.value,
) -> Any: # should be Sequence[DltResource]:
def asana_source() -> Any: # should be Sequence[DltResource]:
"""
The main function that runs all the other functions to fetch data from Asana.
Args:
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
Returns:
Sequence[DltResource]: A sequence of DltResource objects containing the fetched data.
"""
Expand All @@ -57,6 +54,7 @@ def workspaces(
Fetches and returns a list of workspaces from Asana.
Args:
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Yields:
dict: The workspace data.
"""
Expand All @@ -78,6 +76,7 @@ def projects(
Args:
workspace (dict): The workspace data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The project data for the given workspace.
"""
Expand Down Expand Up @@ -105,6 +104,7 @@ def sections(
Args:
project_array (list): The project data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The sections data for the given project.
"""
Expand All @@ -131,6 +131,7 @@ def tags(
Args:
workspace (dict): The workspace data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The tags data for the given workspace.
"""
Expand Down Expand Up @@ -160,6 +161,7 @@ def tasks(
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file

modified_at (str): The date from which to fetch modified tasks.
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Yields:
dict: The task data for the given project.
"""
Expand Down Expand Up @@ -190,6 +192,7 @@ def stories(
Args:
task (dict): The task data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The stories data for the given task.
"""
Expand Down Expand Up @@ -218,6 +221,7 @@ def teams(
Args:
workspace (dict): The workspace data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The teams data for the given workspace.
"""
Expand Down Expand Up @@ -246,6 +250,7 @@ def users(
Args:
workspace (dict): The workspace data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The user data for the given workspace.
"""
Expand Down
2 changes: 2 additions & 0 deletions sources/asana_dlt/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ def get_client(
) -> AsanaClient:
"""
Returns an Asana API client.
Args:
access_token (str): The access token to authenticate the Asana API client.
Returns:
AsanaClient: The Asana API client.
"""
Expand Down
Loading
Loading