Skip to content

Commit

Permalink
Updated as per comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dat-a-man committed Mar 3, 2024
1 parent 98797bb commit cb5cd69
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 42 deletions.
3 changes: 0 additions & 3 deletions sources/freshdesk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
@dlt.source()
def freshdesk_source(
endpoints: Optional[List[str]] = None,
page: int = 1,
per_page: int = 100,
domain: str = dlt.config.value,
api_secret_key: str = dlt.secrets.value,
Expand All @@ -26,7 +25,6 @@ def freshdesk_source(
Args:
endpoints: A list of Freshdesk API endpoints to fetch. Deafults to 'settings.py'.
page: The starting page number for API pagination.
per_page: The number of items to fetch per page, with a maximum of 100.
domain: The Freshdesk domain from which to fetch the data. Defaults to 'config.toml'.
api_secret_key: Freshdesk API key. Defaults to 'secrets.toml'.
Expand Down Expand Up @@ -61,7 +59,6 @@ def incremental_resource(
# Use the FreshdeskClient instance to fetch paginated responses
yield from freshdesk.paginated_response(
endpoint=endpoint,
page=page,
per_page=per_page,
updated_at=updated_at,
)
Expand Down
64 changes: 27 additions & 37 deletions sources/freshdesk/freshdesk_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
"""Freshdesk source helpers"""
"""Freshdesk Client for making authenticated requests"""

import logging
import time
from typing import Any, Iterable, Optional

import pendulum
from dlt.common.typing import TDataItem
from dlt.sources.helpers import requests

Expand Down Expand Up @@ -40,70 +37,63 @@ def _request_with_rate_limit(self, url: str, **kwargs: Any) -> requests.Response
Handles rate limits in HTTP requests and ensures
that the client doesn't exceed the limit set by the server.
"""
import logging
import time

import requests

while True:
try:
response = requests.get(url, **kwargs, auth=(self.api_key, "X"))
response.raise_for_status()

return response
except requests.HTTPError as e:
if e.response.status_code == 429:
logging.warning("Rate limited. Waiting to retry...")
seconds_to_wait = (
int(e.response.headers["X-Rate-Limit-Reset"])
- pendulum.now().timestamp()

# Get the 'Retry-After' header to know how long to wait
# Fallback to 60 seconds if header is missing
seconds_to_wait = int(
e.response.headers.get("Retry-After", 60)
)
# Log a warning message
logging.warning(
f"Rate limited. Waiting to retry after: {seconds_to_wait} secs"
)

# Wait for the specified number of seconds before retrying
time.sleep(seconds_to_wait)
else:
# If the error is not a rate limit (429), raise the exception to be handled elsewhere or stop execution
raise

def paginated_response(
self,
endpoint: str,
page: int = 1,
per_page: int = 100,
updated_at: Optional[Any] = None,
per_page: int,
updated_at: Optional[str] = None,
) -> Iterable[TDataItem]:
"""
Retrieves data from an endpoint with pagination.
Args:
endpoint (str): The endpoint to retrieve data from (e.g., 'tickets', 'contacts').
page (int): The starting page number for the API request.
per_page (int): The number of items requested per page.
updated_at (Optional[Any]): An optional 'updated_at' to limit the data retrieved.
Defaults to None.
Yields:
Iterable[TDataItem]: Data items retrieved from the endpoint.
"""
page = 1
while True:
# Construct the URL for the specific endpoint
url = f"{self.base_url}/{endpoint}"

params = {"per_page": per_page, "page": page}

# Adjust parameters based on the endpoint
# Implement date range splitting logic here, if applicable
if endpoint in ["tickets", "contacts"]:
param_key = (
"updated_since" if endpoint == "tickets" else "_updated_since"
)
if updated_at:
params[param_key] = updated_at

# To handle requests, use the method provided by the class,
# which includes rate-limiting.
# Handle requests with rate-limiting
# A maximum of 300 pages (30000 tickets) will be returned.
response = self._request_with_rate_limit(url, params=params)

data = response.json()

if not data:
break

break # Stop if no data or max page limit reached
yield data

# Assuming the API does not return a full page of data if it's the last page
if len(data) < per_page:
break # Last page reached

page += 1 # Prepare to fetch the next page
page += 1
4 changes: 2 additions & 2 deletions sources/freshdesk_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def load_endpoints() -> None:

pipeline = dlt.pipeline(
pipeline_name="freshdesk_pipeline",
destination="bigquery",
destination="duckdb",
dataset_name="freshdesk_data",
)

Expand All @@ -30,7 +30,7 @@ def load_selected_endpoints() -> None:

pipeline = dlt.pipeline(
pipeline_name="freshdesk_pipeline",
destination="bigquery",
destination="duckdb",
dataset_name="freshdesk_data",
)

Expand Down

0 comments on commit cb5cd69

Please sign in to comment.