diff --git a/sources/hubspot/README.md b/sources/hubspot/README.md index 357170ef3..17ca44e1d 100644 --- a/sources/hubspot/README.md +++ b/sources/hubspot/README.md @@ -13,6 +13,8 @@ The `dlt` HubSpot verified source allows you to automatically load data from Hub | Tickets | requests for help from customers or users | | Quotes | pricing information of a product | | Web analytics | events | +| Owners | information about account managers or users | +| Pipelines | stages and progress tracking | ## Initialize the pipeline with Hubspot verified source ```bash diff --git a/sources/hubspot/__init__.py b/sources/hubspot/__init__.py index 5e09fc7f1..09b46fbf3 100644 --- a/sources/hubspot/__init__.py +++ b/sources/hubspot/__init__.py @@ -1,5 +1,6 @@ """ -This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. +This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API +using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. The source retrieves data from the following endpoints: - CRM Companies @@ -8,6 +9,8 @@ - CRM Tickets - CRM Products - CRM Quotes +- CRM Owners +- CRM Pipelines - Web Analytics Events For each endpoint, a resource and transformer function are defined to retrieve data and transform it to a common format. @@ -19,9 +22,6 @@ Example: To retrieve data from all endpoints, use the following code: -python - ->>> resources = hubspot(api_key="your_api_key") """ from typing import Any, Dict, Iterator, List, Literal, Sequence @@ -32,17 +32,25 @@ from dlt.common.typing import TDataItems from dlt.sources import DltResource -from .helpers import _get_property_names, fetch_data, fetch_property_history +from .helpers import ( + _get_property_names, + fetch_data, + fetch_property_history, + get_properties_labels, +) from .settings import ( ALL, + ALL_OBJECTS, + ARCHIVED_PARAM, CRM_OBJECT_ENDPOINTS, - DEFAULT_COMPANY_PROPS, - DEFAULT_CONTACT_PROPS, - DEFAULT_DEAL_PROPS, - DEFAULT_PRODUCT_PROPS, - DEFAULT_QUOTE_PROPS, - DEFAULT_TICKET_PROPS, + CRM_PIPELINES_ENDPOINT, + ENTITY_PROPERTIES, + MAX_PROPS_LENGTH, OBJECT_TYPE_PLURAL, + OBJECT_TYPE_SINGULAR, + PIPELINES_OBJECTS, + SOFT_DELETE_KEY, + STAGE_PROPERTY_PREFIX, STARTDATE, WEB_ANALYTICS_EVENTS_ENDPOINT, ) @@ -50,11 +58,290 @@ THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"] +def extract_properties_list(props): + """ + Flatten a list of property dictionaries to extract property names. + + Args: + props (List[Dict[str, Any]]): List of property dictionaries. + + Returns: + List[str]: List of property names. + """ + return [prop if isinstance(prop, str) else prop.get("name") for prop in props] + + + +def fetch_data_for_properties(props, api_key, object_type, soft_delete): + """ + Fetch data for a given set of properties from the HubSpot API. + + Args: + props (Sequence[str]): List of property names to fetch. + api_key (str): HubSpot API key for authentication. + object_type (str): The type of HubSpot object (e.g., 'company', 'contact'). + soft_delete (bool): Flag to fetch soft-deleted (archived) records. + + Yields: + Iterator[TDataItems]: Data retrieved from the HubSpot API. + """ + + params = {"properties": props, "limit": 100} + context = {SOFT_DELETE_KEY: False} if soft_delete else None + + yield from fetch_data( + CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params, context=context + ) + if soft_delete: + yield from fetch_data( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + params={**params, **ARCHIVED_PARAM}, + context={SOFT_DELETE_KEY: True}, + ) + + +def crm_objects( + object_type: str, + api_key: str = dlt.secrets.value, + props: Sequence[str] = None, + include_custom_props: bool = True, + archived: bool = False, +) -> Iterator[TDataItems]: + """ + Fetch CRM object data (e.g., companies, contacts) from the HubSpot API. + + Args: + object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): API key for HubSpot authentication. + props (Sequence[str], optional): List of properties to retrieve. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + archived (bool, optional): Fetch archived (soft-deleted) objects. Defaults to False. + + Yields: + Iterator[TDataItems]: Data items retrieved from the API. + """ + props = fetch_props(object_type, api_key, props, include_custom_props) + yield from fetch_data_for_properties(props, api_key, object_type, archived) + + +def crm_object_history( + object_type: THubspotObjectType, + api_key: str = dlt.secrets.value, + include_custom_props: bool = True, +): + """ + Fetch the history of property changes for a given CRM object type. + + Args: + object_type (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): API key for HubSpot authentication. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + + Yields: + Iterator[TDataItems]: Historical property data. + """ + + # Fetch the properties from ENTITY_PROPERTIES or default to "All" + props = ENTITY_PROPERTIES.get(object_type, "All") + + # Fetch the properties with the option to include custom properties + props = fetch_props(object_type, api_key, props, include_custom_props) + + # Yield the property history + yield from fetch_property_history( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + props, + ) + +def resource_template( + entity: THubspotObjectType, + api_key: str = dlt.config.value, + props: Sequence[str] = None, # Add props as an argument + include_custom_props: bool = False, + soft_delete: bool = False, +): + """ + Template function to yield CRM resources for a specific HubSpot entity. + + Args: + entity (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): HubSpot API key for authentication. + props (Sequence[str], optional): List of properties to retrieve. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to False. + soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. + + Yields: + Iterator[TDataItems]: CRM object data retrieved from the API. + """ + + # Use provided props or fetch from ENTITY_PROPERTIES if not provided + properties = ENTITY_PROPERTIES.get(entity, props or []) + + # Use these properties to yield the crm_objects + yield from crm_objects( + entity, + api_key, + props=properties, # Pass the properties to the crm_objects function + include_custom_props=include_custom_props, + archived=soft_delete, + ) + + + +def resource_history_template( + entity: THubspotObjectType, + api_key: str = dlt.config.value, + include_custom_props: bool = False, +): + """ + Template function to yield historical CRM resource data for a specific HubSpot entity. + + Args: + entity (THubspotObjectType): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str, optional): HubSpot API key for authentication. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to False. + + Yields: + Iterator[TDataItems]: Historical data for the CRM object. + """ + yield from crm_object_history( + entity, api_key, include_custom_props=include_custom_props + ) + + +@dlt.resource(name="properties", write_disposition="replace") +def hubspot_properties( + properties_list: List[Dict[str, Any]] = None, + api_key: str = dlt.secrets.value, +) -> DltResource: + """ + A DLT resource that retrieves HubSpot properties for a given list of objects. + + Args: + properties_list (List[Dict[str, Any]], optional): List of properties to retrieve. + api_key (str, optional): HubSpot API key for authentication. + + Yields: + DltResource: A DLT resource containing properties for HubSpot objects. + """ + + def get_properties_description(properties_list): + """Fetch properties.""" + for property_info in properties_list: + yield get_properties_labels( + api_key=api_key, + object_type=property_info["object_type"], + property_name=property_info["property_name"], + ) + + # Ensure properties_list is defined + properties_list = properties_list or [] + yield from get_properties_description(properties_list) + + +def pivot_stages_properties(data, property_prefix=STAGE_PROPERTY_PREFIX, id_prop="id"): + """ + Transform the data by pivoting stage properties. + + Args: + data (List[Dict[str, Any]]): Data containing stage properties. + property_prefix (str, optional): Prefix for stage properties. Defaults to STAGE_PROPERTY_PREFIX. + id_prop (str, optional): Name of the ID property. Defaults to "id". + + Returns: + List[Dict[str, Any]]: Transformed data with pivoted stage properties. + """ + new_data = [] + for record in data: + record_not_null = {k: v for k, v in record.items() if v is not None} + if id_prop not in record_not_null: + continue + id_val = record_not_null.pop(id_prop) + new_data += [ + {id_prop: id_val, property_prefix: v, "stage": k.split(property_prefix)[1]} + for k, v in record_not_null.items() + if k.startswith(property_prefix) + ] + return new_data + + +def stages_timing( + object_type: str, + api_key: str = dlt.config.value, + soft_delete: bool = False, + limit: int = None +) -> Iterator[TDataItems]: + """ + Fetch stage timing data for a specific object type from the HubSpot API. + + Args: + object_type (str): Type of HubSpot object (e.g., 'deal', 'ticket'). + api_key (str, optional): HubSpot API key for authentication. + soft_delete (bool, optional): Fetch soft-deleted (archived) records. Defaults to False. + limit (int, optional): Limit the number of properties to fetch. Defaults to None. + + Yields: + Iterator[TDataItems]: Stage timing data. + """ + all_properties = list(_get_property_names(api_key, object_type)) + date_entered_properties = [ + prop for prop in all_properties if prop.startswith(STAGE_PROPERTY_PREFIX) + ] + props = ",".join(date_entered_properties) + idx = 0 + if limit is None: + limit = len(date_entered_properties) + while idx < limit: + if len(props) - idx < MAX_PROPS_LENGTH: + props_part = ",".join(props[idx: idx + MAX_PROPS_LENGTH].split(",")[:-1]) + else: + props_part = props[idx: idx + MAX_PROPS_LENGTH] + idx += len(props_part) + for data in fetch_data_for_properties( + props_part, api_key, object_type, soft_delete + ): + yield pivot_stages_properties(data) + + +def owners( + api_key: str, # Define api_key as a required argument + soft_delete: bool = False # Add soft_delete as a parameter +) -> Iterator[TDataItems]: + """ + Fetch HubSpot owners data. + + Args: + api_key (str): HubSpot API key for authentication. + soft_delete (bool, optional): Fetch soft-deleted (archived) owners. Defaults to False. + + Yields: + Iterator[TDataItems]: Owner data. + """ + + # Fetch data for owners + for page in fetch_data(endpoint=CRM_OBJECT_ENDPOINTS["owner"], api_key=api_key): + yield page + + # Fetch soft-deleted owners if requested + if soft_delete: + for page in fetch_data( + endpoint=CRM_OBJECT_ENDPOINTS["owner"], + params=ARCHIVED_PARAM, + api_key=api_key, + context={SOFT_DELETE_KEY: True}, + ): + yield page + + @dlt.source(name="hubspot") def hubspot( api_key: str = dlt.secrets.value, include_history: bool = False, + soft_delete: bool = False, include_custom_props: bool = True, + props: Sequence[str] = None, # Add props argument here ) -> Sequence[DltResource]: """ A DLT source that retrieves data from the HubSpot API using the @@ -72,6 +359,10 @@ def hubspot( include_history (Optional[bool]): Whether to load history of property changes along with entities. The history entries are loaded to separate tables. + soft_delete (bool): + Whether to fetch deleted properties and mark them as `is_deleted`. + include_custom_props (bool): + Whether to include custom properties. Returns: Sequence[DltResource]: Dlt resources, one for each HubSpot API endpoint. @@ -82,115 +373,109 @@ def hubspot( `api_key` argument. """ - @dlt.resource(name="companies", write_disposition="replace") - def companies( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_COMPANY_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot companies resource""" - yield from crm_objects( - "company", - api_key, - include_history=include_history, - props=props, - include_custom_props=include_custom_props, - ) + def hubspot_pipelines_for_objects( + api_key: str = dlt.secrets.value, + ) -> DltResource: + """ + A standalone DLT resources that retrieves properties. - @dlt.resource(name="contacts", write_disposition="replace") - def contacts( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_CONTACT_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot contacts resource""" - yield from crm_objects( - "contact", - api_key, - include_history, - props, - include_custom_props, - ) + Args: + object_type(List[THubspotObjectType], required): List of the hubspot object types see definition of THubspotObjectType literal. + api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. - @dlt.resource(name="deals", write_disposition="replace") - def deals( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_DEAL_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot deals resource""" - yield from crm_objects( - "deal", - api_key, - include_history, - props, - include_custom_props, - ) + Returns: + Incremental dlt resource to track properties for objects from the list + """ - @dlt.resource(name="tickets", write_disposition="replace") - def tickets( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_TICKET_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot tickets resource""" - yield from crm_objects( - "ticket", - api_key, - include_history, - props, - include_custom_props, - ) + def get_pipelines(object_type: THubspotObjectType): + yield from fetch_data( + CRM_PIPELINES_ENDPOINT.format(objectType=object_type), + api_key=api_key, + ) - @dlt.resource(name="products", write_disposition="replace") - def products( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_PRODUCT_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot products resource""" - yield from crm_objects( - "product", - api_key, - include_history, - props, - include_custom_props, + for obj_type in PIPELINES_OBJECTS: + name = f"pipelines_{obj_type}" + yield dlt.resource( + get_pipelines, + name=name, + write_disposition="merge", + merge_key="id", + table_name=name, + primary_key="id", + )(obj_type) + + name = f"stages_timing_{obj_type}" + if obj_type in OBJECT_TYPE_SINGULAR: + yield dlt.resource( + stages_timing, + name=name, + write_disposition="merge", + primary_key=["id", "stage"], + )(OBJECT_TYPE_SINGULAR[obj_type], soft_delete=soft_delete) + + yield dlt.resource( + owners, + name="owners", + write_disposition="merge", + primary_key="id", + )( + api_key=api_key, # Pass the API key here + soft_delete=soft_delete # Pass the soft_delete flag here + ) + + for obj in ALL_OBJECTS: + yield dlt.resource( + resource_template, + name=OBJECT_TYPE_PLURAL[obj], + write_disposition="merge", + primary_key="id", + )( + entity=obj, + props=props, # Pass the props argument here + include_custom_props=include_custom_props, + soft_delete=soft_delete, ) - @dlt.resource(name="quotes", write_disposition="replace") - def quotes( - api_key: str = api_key, - include_history: bool = include_history, - props: Sequence[str] = DEFAULT_QUOTE_PROPS, - include_custom_props: bool = include_custom_props, - ) -> Iterator[TDataItems]: - """Hubspot quotes resource""" - yield from crm_objects( - "quote", - api_key, - include_history, - props, - include_custom_props, - ) + if include_history: + for obj in ALL_OBJECTS: + yield dlt.resource( + resource_history_template, + name=f"{OBJECT_TYPE_PLURAL[obj]}_property_history", + write_disposition="merge", + primary_key="object_id", + )(entity=obj, include_custom_props=include_custom_props) - return companies, contacts, deals, tickets, products, quotes + yield from hubspot_pipelines_for_objects(api_key) + yield hubspot_properties -def crm_objects( +def fetch_props( object_type: str, - api_key: str = dlt.secrets.value, - include_history: bool = False, + api_key: str, props: Sequence[str] = None, include_custom_props: bool = True, -) -> Iterator[TDataItems]: - """Building blocks for CRM resources.""" +) -> str: + """ + Fetch the list of properties for a HubSpot object type. + + Args: + object_type (str): Type of HubSpot object (e.g., 'company', 'contact'). + api_key (str): HubSpot API key for authentication. + props (Sequence[str], optional): List of properties to fetch. Defaults to None. + include_custom_props (bool, optional): Include custom properties in the result. Defaults to True. + + Returns: + str: Comma-separated list of properties. + """ if props == ALL: + # Fetch all property names props = list(_get_property_names(api_key, object_type)) + elif isinstance(props, str): + # If props are passed as a single string, convert it to a list + props = [props] + else: + # Ensure it's a list of strings, if not already + props = extract_properties_list(props) if include_custom_props: all_props = _get_property_names(api_key, object_type) @@ -199,30 +484,16 @@ def crm_objects( props = ",".join(sorted(list(set(props)))) - if len(props) > 2000: + if len(props) > MAX_PROPS_LENGTH: raise ValueError( "Your request to Hubspot is too long to process. " - "Maximum allowed query length is 2000 symbols, while " + f"Maximum allowed query length is {MAX_PROPS_LENGTH} symbols, while " f"your list of properties `{props[:200]}`... is {len(props)} " "symbols long. Use the `props` argument of the resource to " "set the list of properties to extract from the endpoint." ) + return props - params = {"properties": props, "limit": 100} - - yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params) - if include_history: - # Get history separately, as requesting both all properties and history together - # is likely to hit hubspot's URL length limit - for history_entries in fetch_property_history( - CRM_OBJECT_ENDPOINTS[object_type], - api_key, - props, - ): - yield dlt.mark.with_table_name( - history_entries, - OBJECT_TYPE_PLURAL[object_type] + "_property_history", - ) @dlt.resource @@ -278,4 +549,4 @@ def get_web_analytics_events( write_disposition="append", selected=True, table_name=lambda e: name + "_" + str(e["eventType"]), - )(dlt.sources.incremental("occurredAt", initial_value=start_date.isoformat())) + )(dlt.sources.incremental("occurredAt", initial_value=start_date.isoformat())) \ No newline at end of file diff --git a/sources/hubspot/helpers.py b/sources/hubspot/helpers.py index 3ab42f70e..2c2c5d415 100644 --- a/sources/hubspot/helpers.py +++ b/sources/hubspot/helpers.py @@ -31,6 +31,18 @@ def _get_headers(api_key: str) -> Dict[str, str]: return dict(authorization=f"Bearer {api_key}") +def pagination(_data: Dict[str, Any], headers: Dict[str, Any]): + _next = _data.get("paging", {}).get("next", None) + # _next = False + if _next: + next_url = _next["link"] + # Get the next page response + r = requests.get(next_url, headers=headers) + return r.json() + else: + return None + + def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]: for item in objects: history = item.get("propertiesWithHistory") @@ -90,7 +102,10 @@ def fetch_property_history( def fetch_data( - endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None + endpoint: str, + api_key: str, + params: Optional[Dict[str, Any]] = None, + context: Optional[Dict[str, Any]] = None, ) -> Iterator[List[Dict[str, Any]]]: """ Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result. @@ -99,7 +114,8 @@ def fetch_data( Args: endpoint (str): The endpoint to fetch data from, as a string. api_key (str): The API key to use for authentication, as a string. - params: Optional dict of query params to include in the request + params: Optional dict of query params to include in the request. + context (Optional[Dict[str, Any]]): Additional data which need to be added in the resulting page. Yields: A List of CRM object dicts @@ -132,6 +148,8 @@ def fetch_data( if "results" in _data: _objects: List[Dict[str, Any]] = [] for _result in _data["results"]: + # if _result["properties"]["hs_merged_object_ids"] is not None: + # _result["properties"]["hs_merged_object_ids"] = _result["properties"]["hs_merged_object_ids"].split(";") _obj = _result.get("properties", _result) if "id" not in _obj and "id" in _result: # Move id from properties to top level @@ -152,18 +170,13 @@ def fetch_data( ] _obj[association] = __values + if context: + _obj.update(context) _objects.append(_obj) yield _objects # Follow pagination links if they exist - _next = _data.get("paging", {}).get("next", None) - if _next: - next_url = _next["link"] - # Get the next page response - r = requests.get(next_url, headers=headers) - _data = r.json() - else: - _data = None + _data = pagination(_data, headers) def _get_property_names(api_key: str, object_type: str) -> List[str]: @@ -186,3 +199,14 @@ def _get_property_names(api_key: str, object_type: str) -> List[str]: properties.extend([prop["name"] for prop in page]) return properties + + +def get_properties_labels(api_key: str, object_type: str, property_name: str): + endpoint = f"/crm/v3/properties/{object_type}/{property_name}" + url = get_url(endpoint) + headers = _get_headers(api_key) + r = requests.get(url, headers=headers) + _data = r.json() + while _data is not None: + yield _data + _data = pagination(_data, headers) \ No newline at end of file diff --git a/sources/hubspot/settings.py b/sources/hubspot/settings.py index 05fe4d9d0..e5ab05588 100644 --- a/sources/hubspot/settings.py +++ b/sources/hubspot/settings.py @@ -1,8 +1,7 @@ """Hubspot source settings and constants""" - from dlt.common import pendulum -STARTDATE = pendulum.datetime(year=2000, month=1, day=1) +STARTDATE = pendulum.datetime(year=2024, month=2, day=10) CRM_CONTACTS_ENDPOINT = ( "/crm/v3/objects/contacts?associations=deals,products,tickets,quotes" @@ -14,6 +13,9 @@ CRM_PRODUCTS_ENDPOINT = "/crm/v3/objects/products" CRM_TICKETS_ENDPOINT = "/crm/v3/objects/tickets" CRM_QUOTES_ENDPOINT = "/crm/v3/objects/quotes" +CRM_OWNERS_ENDPOINT = "/crm/v3/owners/" +CRM_PROPERTIES_ENDPOINT = "/crm/v3/properties/{objectType}/{property_name}" +CRM_PIPELINES_ENDPOINT = "/crm/v3/pipelines/{objectType}" CRM_OBJECT_ENDPOINTS = { "contact": CRM_CONTACTS_ENDPOINT, @@ -22,6 +24,7 @@ "product": CRM_PRODUCTS_ENDPOINT, "ticket": CRM_TICKETS_ENDPOINT, "quote": CRM_QUOTES_ENDPOINT, + "owner": CRM_OWNERS_ENDPOINT, } WEB_ANALYTICS_EVENTS_ENDPOINT = "/events/v3/events?objectType={objectType}&objectId={objectId}&occurredAfter={occurredAfter}&occurredBefore={occurredBefore}&sort=-occurredAt" @@ -36,17 +39,8 @@ } OBJECT_TYPE_PLURAL = {v: k for k, v in OBJECT_TYPE_SINGULAR.items()} +ALL_OBJECTS = OBJECT_TYPE_PLURAL.keys() -DEFAULT_DEAL_PROPS = [ - "amount", - "closedate", - "createdate", - "dealname", - "dealstage", - "hs_lastmodifieddate", - "hs_object_id", - "pipeline", -] DEFAULT_COMPANY_PROPS = [ "createdate", @@ -65,6 +59,17 @@ "lastname", ] +DEFAULT_DEAL_PROPS = [ + #"amount", + #"closedate", + #"createdate", + "dealname", + "dealstage", + #"hs_lastmodifieddate", + #"hs_object_id", + #"pipeline", +] + DEFAULT_TICKET_PROPS = [ "createdate", "content", @@ -96,4 +101,22 @@ "hs_title", ] -ALL = ("ALL",) +ENTITY_PROPERTIES = { + "company": DEFAULT_COMPANY_PROPS, + "contact": DEFAULT_CONTACT_PROPS, + "deal": DEFAULT_DEAL_PROPS, + "ticket": DEFAULT_TICKET_PROPS, + "product": DEFAULT_PRODUCT_PROPS, + "quote": DEFAULT_QUOTE_PROPS, +} + + +# 'ALL' represents a list of all available properties for all types +ALL = [{"properties": "All"}] + +PIPELINES_OBJECTS = ["deals"] +SOFT_DELETE_KEY = "is_deleted" +ARCHIVED_PARAM = {"archived": True} +PREPROCESSING = {"split": ["hs_merged_object_ids"]} +STAGE_PROPERTY_PREFIX = "hs_date_entered_" +MAX_PROPS_LENGTH = 2000 \ No newline at end of file diff --git a/sources/hubspot/utils.py b/sources/hubspot/utils.py new file mode 100644 index 000000000..598afbfcf --- /dev/null +++ b/sources/hubspot/utils.py @@ -0,0 +1,7 @@ +from .settings import PREPROCESSING + +def split_data(doc): + for prop in PREPROCESSING["split"]: + if (prop in doc) and (doc[prop] is not None): + doc[prop] = doc[prop].split(";") + return doc \ No newline at end of file diff --git a/sources/hubspot_pipeline.py b/sources/hubspot_pipeline.py index 27d9f7352..ba16d027b 100644 --- a/sources/hubspot_pipeline.py +++ b/sources/hubspot_pipeline.py @@ -1,4 +1,5 @@ from typing import List + import dlt from hubspot import hubspot, hubspot_events_for_objects, THubspotObjectType @@ -13,11 +14,11 @@ def load_crm_data() -> None: """ # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type - # Add dev_mode=(True or False) if you need your pipeline to create the dataset in your destination + # Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="duckdb", + destination="bigquery", ) # Run the pipeline with the HubSpot source connector @@ -26,7 +27,6 @@ def load_crm_data() -> None: # Print information about the pipeline run print(info) - def load_crm_data_with_history() -> None: """ Loads all HubSpot CRM resources and property change history for each entity. @@ -37,11 +37,11 @@ def load_crm_data_with_history() -> None: """ # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type - # Add dev_mode=(True or False) if you need your pipeline to create the dataset in your destination + # Add full_refresh=(True or False) if you need your pipeline to create the dataset in your destination p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="duckdb", + destination="bigquery", ) # Configure the source with `include_history` to enable property history load, history is disabled by default @@ -54,6 +54,33 @@ def load_crm_data_with_history() -> None: print(info) +def load_crm_data_with_soft_delete() -> None: + """ + Loads all HubSpot CRM resources, including soft-deleted (archived) records for each entity. + By default, only the current state of the records is loaded; property change history is not included unless explicitly enabled. + + Soft-deleted records are retrieved and marked appropriately, allowing both active and archived data to be processed. + """ + + # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type. + # You can add `full_refresh=True` if the pipeline should recreate the dataset at the destination. + p = dlt.pipeline( + pipeline_name="hubspot", + dataset_name="hubspot_dataset", + destination="bigquery", + ) + + # Configure the source to load soft-deleted (archived) records. + # Property change history is disabled by default unless configured separately. + data = hubspot(soft_delete=True) + + # Run the pipeline with the HubSpot source connector. + info = p.run(data) + + # Print information about the pipeline run. + print(info) + + def load_crm_objects_with_custom_properties() -> None: """ Loads CRM objects, reading only properties defined by the user. @@ -61,31 +88,39 @@ def load_crm_objects_with_custom_properties() -> None: # Create a DLT pipeline object with the pipeline name, # dataset name, properties to read and destination database - # type Add dev_mode=(True or False) if you need your + # type Add full_refresh=(True or False) if you need your # pipeline to create the dataset in your destination - p = dlt.pipeline( + pipeline = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="duckdb", + destination="bigquery", ) - source = hubspot() - - # By default, all the custom properties of a CRM object are extracted, - # ignoring those driven by Hubspot (prefixed with `hs_`). - - # To read fields in addition to the custom ones: - # source.contacts.bind(props=["date_of_birth", "degree"]) + load_data = hubspot() + #load_data.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=True) + load_info = pipeline.run(load_data) + print(load_info) - # To read only two particular fields: - source.contacts.bind(props=["date_of_birth", "degree"], include_custom_props=False) - # Run the pipeline with the HubSpot source connector - info = p.run(source) +def load_pipelines() -> None: + """ + This function loads web analytics events for a list objects in `object_ids` of type `object_type` - # Print information about the pipeline run - print(info) + Returns: + None + """ + # Create a DLT pipeline object with the pipeline name, dataset name, and destination database type + p = dlt.pipeline( + pipeline_name="hubspot", + dataset_name="hubspot_dataset", + destination="bigquery", + dev_mode=False, + ) + # To load data from pipelines in "deals" endpoint + load_data = hubspot().with_resources("pipelines_deals", "stages_timing_deals") + load_info = p.run(load_data) + print(load_info) def load_web_analytics_events( object_type: THubspotObjectType, object_ids: List[str] @@ -101,7 +136,7 @@ def load_web_analytics_events( p = dlt.pipeline( pipeline_name="hubspot", dataset_name="hubspot_dataset", - destination="duckdb", + destination="bigquery", dev_mode=False, ) @@ -115,8 +150,9 @@ def load_web_analytics_events( if __name__ == "__main__": - # Call the functions to load HubSpot data into the database with and without company events enabled load_crm_data() load_crm_data_with_history() - load_web_analytics_events("company", ["7086461639", "7086464459"]) load_crm_objects_with_custom_properties() + load_pipelines() + load_crm_data_with_soft_delete() + load_web_analytics_events("company", ["7086461639", "7086464459"]) \ No newline at end of file