Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: verified sources tests #318

Merged
merged 29 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
eb2197c
run chess tests
AstrakhantsevaAA Jan 8, 2024
f6571a6
[fix] chess tests: convert dates from bigint to timestamps
AstrakhantsevaAA Feb 5, 2024
d1f2c3e
[fix] stripe tests: convert dates from bigint to timestamps, re-creat…
AstrakhantsevaAA Feb 5, 2024
383cb33
black chess
AstrakhantsevaAA Feb 5, 2024
ca373a3
run GA tests
AstrakhantsevaAA Feb 5, 2024
484a1cb
[fix] update GA test data
AstrakhantsevaAA Feb 5, 2024
e744c3b
run GA tests
AstrakhantsevaAA Feb 6, 2024
19f7c00
change initial date to "2015-08-14"
AstrakhantsevaAA Feb 6, 2024
cb853ea
GA lint
AstrakhantsevaAA Feb 6, 2024
f1e52d1
run GS tests
AstrakhantsevaAA Feb 6, 2024
8d0259a
[fix] remove a type which doesn't exist in a pipeline
AstrakhantsevaAA Feb 6, 2024
da8a74c
[fix] comment date_test__v_bool in sql query
AstrakhantsevaAA Feb 6, 2024
b668313
hubspot lint + run tests
AstrakhantsevaAA Feb 6, 2024
4d54f3c
[fix] hubspot: tests history as range
AstrakhantsevaAA Feb 6, 2024
69982c8
[fix] jira tests: decrease number of issues
AstrakhantsevaAA Feb 6, 2024
a705101
lint jira test
AstrakhantsevaAA Feb 12, 2024
74a8375
[fix] fix stripe again
AstrakhantsevaAA Feb 12, 2024
42c395d
[fix] replace destinations to duckdb. rename pipelines and datasets
AstrakhantsevaAA Feb 13, 2024
d8318cc
airtable: add some defaults to run it
AstrakhantsevaAA Feb 13, 2024
dd7fca3
asana: fix the api client version
AstrakhantsevaAA Feb 13, 2024
74796d3
small refactoring
AstrakhantsevaAA Feb 13, 2024
49c1466
small refactoring
AstrakhantsevaAA Feb 13, 2024
d546b21
google analytics: small refactoring
AstrakhantsevaAA Feb 13, 2024
a626425
google sheets: small refactoring
AstrakhantsevaAA Feb 13, 2024
59c70dd
hubspot: small refactoring
AstrakhantsevaAA Feb 13, 2024
57b7b75
hubspot: small refactoring
AstrakhantsevaAA Feb 13, 2024
e8d0beb
stripe: small refactoring
AstrakhantsevaAA Feb 13, 2024
5a6bdb6
fix typing, delete extra args
AstrakhantsevaAA Feb 13, 2024
4da86c7
black
AstrakhantsevaAA Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions sources/chess/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"""A source loading player profiles and games from chess.com api"""

from typing import Callable, Iterator, List, Sequence, Dict, Any
from typing import Any, Callable, Dict, Iterator, List, Sequence

import dlt
from dlt.common import pendulum
from dlt.common.typing import TDataItem
from dlt.sources import DltResource
from dlt.sources.helpers import requests
from .helpers import get_url_with_retry, get_path_with_retry, validate_month_string

from .helpers import get_path_with_retry, get_url_with_retry, validate_month_string
from .settings import UNOFFICIAL_CHESS_API_URL


Expand All @@ -35,7 +35,13 @@ def source(
)


@dlt.resource(write_disposition="replace")
@dlt.resource(
write_disposition="replace",
columns={
"last_online": {"data_type": "timestamp"},
"joined": {"data_type": "timestamp"},
},
)
def players_profiles(players: List[str]) -> Iterator[TDataItem]:
"""
Yields player profiles for a list of player usernames.
Expand Down Expand Up @@ -68,7 +74,9 @@ def players_archives(players: List[str]) -> Iterator[List[TDataItem]]:
yield data.get("archives", [])


@dlt.resource(write_disposition="append")
@dlt.resource(
write_disposition="append", columns={"end_time": {"data_type": "timestamp"}}
)
def players_games(
players: List[str], start_month: str = None, end_month: str = None
) -> Iterator[Callable[[], List[TDataItem]]]:
Expand Down Expand Up @@ -104,7 +112,6 @@ def _get_archive(url: str) -> List[TDataItem]:
raise

# enumerate the archives
url: str = None
for url in archives:
# the `url` format is https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM}
if start_month and url[-7:] < start_month:
Expand All @@ -114,8 +121,7 @@ def _get_archive(url: str) -> List[TDataItem]:
# do not download archive again
if url in checked_archives:
continue
else:
checked_archives.append(url)
checked_archives.append(url)
# get the filtered archive
yield _get_archive(url)

Expand All @@ -131,9 +137,7 @@ def players_online_status(players: List[str]) -> Iterator[TDataItem]:
"""
# we'll use unofficial endpoint to get online status, the official seems to be removed
for player in players:
status = get_url_with_retry(
"%suser/popup/%s" % (UNOFFICIAL_CHESS_API_URL, player)
)
status = get_url_with_retry(f"{UNOFFICIAL_CHESS_API_URL}user/popup/{player}")
# return just relevant selection
yield {
"username": player,
Expand All @@ -158,9 +162,6 @@ def chess_dlt_config_example(
Returns:
DltResource: Returns a resource yielding the configured values.
"""
print(secret_str)
print(secret_dict)
print(config_int)

# returns a resource yielding the configured values - it is just a test
return dlt.resource([secret_str, secret_dict, config_int], name="config_values")
1 change: 1 addition & 0 deletions sources/chess/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dlt.common.typing import StrAny
from dlt.sources.helpers import requests

from .settings import OFFICIAL_CHESS_API_URL


Expand Down
2 changes: 1 addition & 1 deletion sources/chess_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def load_players_online_status() -> None:

pipeline = dlt.pipeline(
pipeline_name="chess_pipeline",
destination="postgres",
destination="duckdb",
dataset_name="chess_players_games_data",
)
data = source(["magnuscarlsen", "vincentkeymer", "dommarajugukesh", "rpragchess"])
Expand Down
17 changes: 5 additions & 12 deletions sources/google_analytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,17 @@
from typing import Iterator, List, Optional, Union

import dlt
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TDataItem, DictStrAny

from apiclient.discovery import Resource
from dlt.common.typing import DictStrAny, TDataItem
from dlt.sources import DltResource
from dlt.sources.credentials import GcpOAuthCredentials, GcpServiceAccountCredentials
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import GetMetadataRequest, Metadata

from .helpers.data_processing import to_dict
from .helpers import basic_report

from .helpers.data_processing import to_dict
from .settings import START_DATE

from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
GetMetadataRequest,
Metadata,
)
from apiclient.discovery import Resource


@dlt.source(max_table_nesting=2)
def google_analytics(
Expand Down
2 changes: 1 addition & 1 deletion sources/google_analytics/settings.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Google analytics source settings and constants"""

START_DATE = "2000-01-01"
START_DATE = "2015-08-14"
2 changes: 1 addition & 1 deletion sources/google_analytics/setup_script_gcp_oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
10. Add your own email as a test user."""

import dlt
from dlt.common.configuration.inject import with_config
from dlt.common.configuration.exceptions import ConfigFieldMissingException
from dlt.common.configuration.inject import with_config
from dlt.sources.credentials import GcpOAuthCredentials


Expand Down
9 changes: 4 additions & 5 deletions sources/google_sheets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
"""Loads Google Sheets data from tabs, named and explicit ranges. Contains the main source functions."""

from typing import Sequence, Union, Iterable
from typing import Iterable, Sequence, Union

import dlt
from dlt.common import logger
from dlt.sources.credentials import GcpServiceAccountCredentials, GcpOAuthCredentials
from dlt.sources import DltResource
from dlt.sources.credentials import GcpOAuthCredentials, GcpServiceAccountCredentials

from .helpers import api_calls
from .helpers.api_calls import api_auth
from .helpers.data_processing import (
ParsedRange,
get_data_types,
get_range_headers,
get_spreadsheet_id,
process_range,
)
from .helpers.api_calls import api_auth
from .helpers import api_calls


@dlt.source
Expand Down
2 changes: 1 addition & 1 deletion sources/google_sheets/setup_script_gcp_oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
10. Add your own email as a test user."""

import dlt
from dlt.common.configuration.inject import with_config
from dlt.common.configuration.exceptions import ConfigFieldMissingException
from dlt.common.configuration.inject import with_config
from dlt.sources.credentials import GcpOAuthCredentials


Expand Down
24 changes: 9 additions & 15 deletions sources/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,24 @@
>>> resources = hubspot(api_key="your_api_key")
"""

from typing import Any, Dict, List, Literal, Sequence, Iterator
from typing import Any, Dict, Iterator, List, Literal, Sequence
from urllib.parse import quote

import dlt
from dlt.common import pendulum
from dlt.common.typing import TDataItems, TDataItem
from dlt.common.typing import TDataItems
from dlt.sources import DltResource

from .helpers import (
fetch_data,
_get_property_names,
fetch_property_history,
)
from .helpers import _get_property_names, fetch_data, fetch_property_history
from .settings import (
ALL,
CRM_OBJECT_ENDPOINTS,
DEFAULT_COMPANY_PROPS,
DEFAULT_CONTACT_PROPS,
DEFAULT_DEAL_PROPS,
DEFAULT_PRODUCT_PROPS,
DEFAULT_TICKET_PROPS,
DEFAULT_QUOTE_PROPS,
DEFAULT_TICKET_PROPS,
OBJECT_TYPE_PLURAL,
STARTDATE,
WEB_ANALYTICS_EVENTS_ENDPOINT,
Expand Down Expand Up @@ -204,13 +200,11 @@ def crm_objects(

if len(props) > 2000:
raise ValueError(
(
"Your request to Hubspot is too long to process. "
"Maximum allowed query length is 2000 symbols, while "
f"your list of properties `{props[:200]}`... is {len(props)} "
"symbols long. Use the `props` argument of the resource to "
"set the list of properties to extract from the endpoint."
)
"Your request to Hubspot is too long to process. "
"Maximum allowed query length is 2000 symbols, while "
f"your list of properties `{props[:200]}`... is {len(props)} "
"symbols long. Use the `props` argument of the resource to "
"set the list of properties to extract from the endpoint."
)

params = {"properties": props, "limit": 100}
Expand Down
3 changes: 2 additions & 1 deletion sources/hubspot/helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Hubspot source helpers"""

import urllib.parse
from typing import Iterator, Dict, Any, List, Optional, Iterable, Tuple
from typing import Any, Dict, Iterator, List, Optional

from dlt.sources.helpers import requests

from .settings import OBJECT_TYPE_PLURAL

BASE_URL = "https://api.hubapi.com/"
Expand Down
18 changes: 9 additions & 9 deletions sources/hubspot_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ def load_crm_data() -> None:
# Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination
p = dlt.pipeline(
pipeline_name="hubspot_pipeline",
dataset_name="hubspot",
destination="redshift",
dataset_name="hubspot_dataset",
destination="duckdb",
)

# Run the pipeline with the HubSpot source connector
Expand All @@ -40,8 +40,8 @@ def load_crm_data_with_history() -> None:
# Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination
p = dlt.pipeline(
pipeline_name="hubspot_pipeline",
dataset_name="hubspot",
destination="postgres",
dataset_name="hubspot_dataset",
destination="duckdb",
)

# Configure the source with `include_history` to enable property history load, history is disabled by default
Expand All @@ -65,8 +65,8 @@ def load_crm_objects_with_custom_properties() -> None:
# pipeline to create the dataset in your destination
p = dlt.pipeline(
pipeline_name="hubspot_pipeline",
dataset_name="hubspot",
destination="postgres",
dataset_name="hubspot_dataset",
destination="duckdb",
)

source = hubspot()
Expand Down Expand Up @@ -99,9 +99,9 @@ def load_web_analytics_events(

# Create a DLT pipeline object with the pipeline name, dataset name, and destination database type
p = dlt.pipeline(
pipeline_name="hubspot",
dataset_name="hubspot",
destination="postgres",
pipeline_name="hubspot_pipeline",
dataset_name="hubspot_dataset",
destination="duckdb",
full_refresh=False,
)

Expand Down
12 changes: 4 additions & 8 deletions sources/stripe_analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
""" This source uses Stripe API and dlt to load data such as Customer, Subscription, Event etc. to the database and to calculate the MRR and churn rate. """

from typing import Any, Dict, Generator, Optional, Tuple, Iterable
from typing import Any, Dict, Generator, Iterable, Optional, Tuple

import dlt
import stripe
from dlt.common import pendulum
from dlt.sources import DltResource
from dlt.common.typing import TDataItem

from dlt.sources import DltResource
from pendulum import DateTime

from .helpers import pagination, transform_date
from .metrics import calculate_mrr, churn_rate

from .settings import ENDPOINTS, INCREMENTAL_ENDPOINTS


Expand Down Expand Up @@ -46,8 +44,7 @@ def stripe_source(
def stripe_resource(
endpoint: str,
) -> Generator[Dict[Any, Any], Any, None]:
for item in pagination(endpoint, start_date, end_date):
yield item
yield from pagination(endpoint, start_date, end_date)

for endpoint in endpoints:
yield dlt.resource(
Expand Down Expand Up @@ -94,8 +91,7 @@ def incremental_resource(
),
) -> Generator[Dict[Any, Any], Any, None]:
start_value = created.last_value
for item in pagination(endpoint, start_date=start_value, end_date=end_date):
yield item
yield from pagination(endpoint, start_date=start_value, end_date=end_date)

for endpoint in endpoints:
yield dlt.resource(
Expand Down
4 changes: 2 additions & 2 deletions sources/stripe_analytics/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Stripe analytics source helpers"""

from typing import Any, Dict, Generator, Optional, Union, Iterable
from typing import Any, Dict, Iterable, Optional, Union

import stripe
from dlt.common import pendulum
from pendulum import DateTime
from dlt.common.typing import TDataItem
from pendulum import DateTime


def pagination(
Expand Down
24 changes: 19 additions & 5 deletions sources/stripe_analytics_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ def load_data_and_get_metrics() -> None:
source_event = incremental_stripe_source(endpoints=("Event",))
# Subscription is an endpoint with editable data, use stripe_source.
source_subs = stripe_source(endpoints=("Subscription",))

# convert dates to the timestamp format
source_event.resources["Event"].apply_hints(
columns={
"created": {"data_type": "timestamp"},
}
)

source_subs.resources["Subscription"].apply_hints(
columns={
"created": {"data_type": "timestamp"},
}
)

load_info = pipeline.run(data=[source_subs, source_event])
print(load_info)

Expand All @@ -126,14 +140,14 @@ def load_data_and_get_metrics() -> None:


if __name__ == "__main__":
# load only data that was created during the period between the May 1, 2023 (incl.), and the May 3, 2023 (not incl.).
load_data(start_date=datetime(2023, 5, 1), end_date=datetime(2023, 5, 3))
# load only data that was created during the period between the May 3, 2023 (incl.), and the May 5, 2023 (not incl.).
# after that, we load all new data that created after May 5, 2023
# load only data that was created during the period between the Jan 1, 2024 (incl.), and the Feb 1, 2024 (not incl.).
load_data(start_date=datetime(2024, 1, 1), end_date=datetime(2024, 2, 1))
# load only data that was created during the period between the May 3, 2023 (incl.), and the Feb 1, 2024 (not incl.).
# after that, we load all new data that created after Feb 1, 2024
load_incremental_endpoints(
endpoints=("Event",),
initial_start_date=datetime(2023, 5, 3),
end_date=datetime(2023, 5, 5),
end_date=datetime(2024, 2, 1),
)
# load Subscription and Event data, calculate metrics, store them in a database
load_data_and_get_metrics()
Loading
Loading