Skip to content

Commit

Permalink
Fix: verified sources tests (#318)
Browse files Browse the repository at this point in the history
* run chess tests

* [fix] chess tests: convert dates from bigint to timestamps

* [fix] stripe tests: convert dates from bigint to timestamps, re-created test data in account

* black chess

* run GA tests

* [fix] update GA test data

* run GA tests

* change initial date to "2015-08-14"

* GA lint

* run GS tests

* [fix] remove a type which doesn't exist in a pipeline

* [fix] comment date_test__v_bool in sql query

* hubspot lint + run tests

* [fix] hubspot: tests history as range

* [fix] jira tests: decrease number of issues

* lint jira test

* [fix] fix stripe again

* [fix] replace destinations to duckdb. rename pipelines and datasets

* airtable: add some defaults to run it

* asana: fix the api client version

* small refactoring

* small refactoring

* google analytics: small refactoring

* google sheets: small refactoring

* hubspot: small refactoring

* hubspot: small refactoring

* stripe: small refactoring

* fix typing, delete extra args

* black
  • Loading branch information
AstrakhantsevaAA authored Feb 14, 2024
1 parent f7dc545 commit 756edaa
Show file tree
Hide file tree
Showing 46 changed files with 366 additions and 292 deletions.
2 changes: 1 addition & 1 deletion sources/airtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ to the
1. You're now ready to run the pipeline! To get started, run the following command:

```bash
python3 airtable_pipeline.py
python airtable_pipeline.py
```

1. Once the pipeline has finished running, you can verify that everything loaded correctly by using
Expand Down
19 changes: 11 additions & 8 deletions sources/airtable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Source that loads tables form Airtable.
Supports whitelisting of tables or loading of all tables from a specified base.
"""
from typing import Optional, Iterable, Iterator, List, Dict, Any
from typing import Any, Dict, Iterable, Iterator, List, Optional

import dlt
from dlt.sources import DltResource
from dlt.common.typing import TDataItem

import pyairtable
from dlt.sources import DltResource


@dlt.source
Expand All @@ -19,9 +17,13 @@ def airtable_source(
"""
Represents tables for a single Airtable base.
Args:
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table_names (Optional[List[str]]): A list of table IDs or table names to load. Unless specified otherwise, all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token. See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table_names (Optional[List[str]]): A list of table IDs or table names to load.
Unless specified otherwise, all tables in the schema are loaded.
Names are freely user-defined. IDs start with "tbl". See https://support.airtable.com/docs/finding-airtable-ids
access_token (str): The personal access token.
See https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens#personal-access-tokens-basic-actions
"""
api = pyairtable.Api(access_token)
all_tables_url = api.build_url(f"meta/bases/{base_id}/tables")
Expand All @@ -43,7 +45,8 @@ def airtable_resource(
Represents a single airtable.
Args:
api (pyairtable.Api): The API connection object
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser. It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
base_id (str): The id of the base. Obtain it e.g. from the URL in your webbrowser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
table (Dict[str, Any]): Metadata about an airtable, does not contain the actual records
"""
primary_key_id = table["primaryFieldId"]
Expand Down
2 changes: 1 addition & 1 deletion sources/airtable/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pyairtable~=2.1
dlt>=0.3.17
dlt>=0.3.25
84 changes: 53 additions & 31 deletions sources/airtable_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from typing import List
from typing import List, Dict, Any

import dlt

from airtable import airtable_source


def load_entire_base(base_id: str) -> None:
def load_entire_base(base_id: str, resources_to_apply_hints: Dict[str, Any]) -> None:
"""
Loads all tables from the specified Airtable base.
Args:
base_id (str): The id of the base. Obtain it, e.g. from the URL in your web browser.
It starts with "app". See https://support.airtable.com/docs/finding-airtable-ids
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.
Note:
- The base_id can either be passed directly or set up in ".dlt/config.toml".
Expand All @@ -23,6 +25,13 @@ def load_entire_base(base_id: str) -> None:
# Retrieve data from Airtable using airtable_source.
airtables = airtable_source(base_id=base_id)

# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
columns={field_name: {"name": field_name, "data_type": "text"}}
)

load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)

Expand All @@ -37,6 +46,7 @@ def load_select_tables_from_base_by_id(base_id: str, table_names: List[str]) ->
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-ids
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.
Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -60,7 +70,7 @@ def load_select_tables_from_base_by_id(base_id: str, table_names: List[str]) ->


def load_select_tables_from_base_by_name(
base_id: str, table_names: List[str], resource_name: str, field_name: str
base_id: str, table_names: List[str], resources_to_apply_hints: Dict[str, Any]
) -> None:
"""
Loads specific table names from an Airtable base.
Expand All @@ -71,8 +81,7 @@ def load_select_tables_from_base_by_name(
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-idss
resource_name (str): The table name we want to apply hints.
field_name (str): The table field name for which we want to apply hints.
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.
Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -89,16 +98,19 @@ def load_select_tables_from_base_by_name(
table_names=table_names,
)

airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"data_type": "text"}},
)
# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
columns={field_name: {"name": field_name, "data_type": "text"}}
)

load_info = pipeline.run(airtables, write_disposition="replace")
print(load_info)


def load_and_customize_write_disposition(
base_id: str, table_names: List[str], resource_name: str, field_name: str
base_id: str, table_names: List[str], resources_to_apply_hints: Dict[str, Any]
) -> None:
"""
Loads data from a specific Airtable base's table with customized write disposition("merge") using field_name.
Expand All @@ -109,8 +121,8 @@ def load_and_customize_write_disposition(
table_names (List[str]): A list of table IDs or table names to load. Unless specified otherwise,
all tables in the schema are loaded. Names are freely user-defined. IDs start with "tbl".
See https://support.airtable.com/docs/finding-airtable-ids
resource_name (str): The table name we want to apply hints.
field_name (str): The table field name for which we want to apply hints.
resources_to_apply_hints (dict): Dict of table names and fields we want to apply hints.
Note:
- Filtering by names is less reliable than filtering on IDs because names can be changed by Airtable users.
Expand All @@ -127,31 +139,41 @@ def load_and_customize_write_disposition(
base_id=base_id,
table_names=table_names,
)
airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"data_type": "text"}},
)

# typing columns to silence warnings
for resource_name, field_names in resources_to_apply_hints.items():
for field_name in field_names:
airtables.resources[resource_name].apply_hints(
primary_key=field_name,
columns={field_name: {"name": field_name, "data_type": "text"}},
write_disposition="merge",
)

load_info = pipeline.run(airtables)
print(load_info)


if __name__ == "__main__":
base_id_example = "Please set me up!"
table_names_example = ["Please set me up!"]
resource_name_to_apply_hints = "Please set me up!"
field_name_example = "Please set me up!"

load_entire_base(base_id_example)
load_select_tables_from_base_by_id(base_id_example, table_names_example)
load_entire_base(
base_id="app7RlqvdoOmJm9XR",
resources_to_apply_hints={
"🎤 Speakers": ["Name"],
"📆 Schedule": ["Activity"],
"🪑 Attendees": ["Name"],
"💰 Budget": ["Item"],
},
)
load_select_tables_from_base_by_id(
base_id="app7RlqvdoOmJm9XR",
table_names=["tblKHM5s3AujfSbAH", "tbloBrS8PnoO63aMP"],
)
load_select_tables_from_base_by_name(
base_id_example,
table_names_example,
resource_name_to_apply_hints,
field_name_example,
"app7RlqvdoOmJm9XR",
table_names=["💰 Budget"],
resources_to_apply_hints={"💰 Budget": ["Item"]},
)
load_and_customize_write_disposition(
base_id_example,
table_names_example,
resource_name_to_apply_hints,
field_name_example,
base_id="appcChDyP0pZeC76v",
table_names=["tbl1sN4CpPv8pBll4"],
resources_to_apply_hints={"Sheet1": ["Name"]},
)
32 changes: 16 additions & 16 deletions sources/asana_dlt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ Resources that can be loaded using this verified source are:

## Initialize the pipeline with Asana source
```bash
dlt init asana_dlt bigquery
dlt init asana_dlt duckdb
```

Here, we chose BigQuery as the destination. Alternatively, you can also choose redshift, duckdb, or any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).
Here, we chose DuckDB as the destination. Alternatively, you can also choose redshift, bigquery, or any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Grab Asana credentials

Expand All @@ -29,41 +29,41 @@ To grab the Asana credentials please refer to the [full documentation here.](htt

1. Open .dlt/secrets.toml.
2. Enter the access token:

```toml
[sources.asana_dlt]
access_token = "access_token" # please set me up!
```

3. Enter credentials for your chosen destination as per the [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Run the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:

```bash
pip install -r requirements.txt

```

2. You're now ready to run the pipeline! To get started, run the following command:

```bash
python3 asana_dlt_pipeline.py
python asana_dlt_pipeline.py

```

3. Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:

```bash
dlt pipeline <pipeline_name> show
```
Note that in the above command, replace `<pipeline_name>` with the name of your pipeline. For example, if you named your pipeline "asana," you would run:

Note that in the above command, replace `<pipeline_name>` with the name of your pipeline. For example, if you named your pipeline "asana" you would run:

```bash
dlt pipeline asana show
```


💡 To explore additional customizations for this pipeline, I recommend referring to the official Asana documentation. It provides comprehensive information and guidance on how to further customize and tailor the pipeline to suit your specific needs. You can find the Asana documentation in [Setup Guide: Asana](https://dlthub.com/docs/dlt-ecosystem/verified-sources/asana)
25 changes: 15 additions & 10 deletions sources/asana_dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,30 @@
"""

import typing as t
from typing import Iterable, Any
from typing import Any, Iterable

import dlt
from dlt.common.typing import TDataItem

from .helpers import get_client
from .settings import (
PROJECT_FIELDS,
USER_FIELDS,
DEFAULT_START_DATE,
PROJECT_FIELDS,
REQUEST_TIMEOUT,
SECTION_FIELDS,
STORY_FIELDS,
TAG_FIELDS,
TASK_FIELDS,
STORY_FIELDS,
TEAMS_FIELD,
USER_FIELDS,
WORKSPACE_FIELDS,
)
from .helpers import get_client


@dlt.source
def asana_source(
access_token: str = dlt.secrets.value,
) -> Any: # should be Sequence[DltResource]:
def asana_source() -> Any: # should be Sequence[DltResource]:
"""
The main function that runs all the other functions to fetch data from Asana.
Args:
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
Returns:
Sequence[DltResource]: A sequence of DltResource objects containing the fetched data.
"""
Expand All @@ -57,6 +54,7 @@ def workspaces(
Fetches and returns a list of workspaces from Asana.
Args:
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Yields:
dict: The workspace data.
"""
Expand All @@ -78,6 +76,7 @@ def projects(
Args:
workspace (dict): The workspace data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The project data for the given workspace.
"""
Expand Down Expand Up @@ -105,6 +104,7 @@ def sections(
Args:
project_array (list): The project data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The sections data for the given project.
"""
Expand All @@ -131,6 +131,7 @@ def tags(
Args:
workspace (dict): The workspace data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The tags data for the given workspace.
"""
Expand Down Expand Up @@ -160,6 +161,7 @@ def tasks(
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
modified_at (str): The date from which to fetch modified tasks.
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Yields:
dict: The task data for the given project.
"""
Expand Down Expand Up @@ -190,6 +192,7 @@ def stories(
Args:
task (dict): The task data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The stories data for the given task.
"""
Expand Down Expand Up @@ -218,6 +221,7 @@ def teams(
Args:
workspace (dict): The workspace data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The teams data for the given workspace.
"""
Expand Down Expand Up @@ -246,6 +250,7 @@ def users(
Args:
workspace (dict): The workspace data.
access_token (str): The access token to authenticate the Asana API client, provided in the secrets file
fields (Iterable[str]): The list of workspace fields to be retrieved from Asana API.
Returns:
list[dict]: The user data for the given workspace.
"""
Expand Down
2 changes: 2 additions & 0 deletions sources/asana_dlt/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ def get_client(
) -> AsanaClient:
"""
Returns an Asana API client.
Args:
access_token (str): The access token to authenticate the Asana API client.
Returns:
AsanaClient: The Asana API client.
"""
Expand Down
Loading

0 comments on commit 756edaa

Please sign in to comment.