Skip to content

Commit

Permalink
Merge branch 'm-kovalsky/dlconnection'
Browse files Browse the repository at this point in the history
  • Loading branch information
m-kovalsky committed Sep 19, 2024
2 parents 3161498 + 9662642 commit 6a84ef7
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 33 deletions.
43 changes: 36 additions & 7 deletions src/sempy_labs/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,21 @@ def __init__(

# Get the TDS endpoint
client = fabric.FabricRestClient()
response = client.get(f"v1/workspaces/{workspace_id}/{endpoint_type}s/{resource_id}")
response = client.get(
f"v1/workspaces/{workspace_id}/{endpoint_type}s/{resource_id}"
)
if response.status_code != 200:
raise FabricHTTPException(response)

if endpoint_type == "warehouse":
tds_endpoint = response.json().get("properties", {}).get("connectionString")
else:
tds_endpoint = response.json().get("properties", {}).get('sqlEndpointProperties', {}).get("connectionString")
tds_endpoint = (
response.json()
.get("properties", {})
.get("sqlEndpointProperties", {})
.get("connectionString")
)

# Set up the connection string
access_token = SynapseTokenProvider()()
Expand All @@ -70,7 +77,9 @@ def __init__(
self.connection = pyodbc.connect(conn_str, attrs_before={1256: tokenstruct})

@log
def query(self, sql: Union[str, List[str]]) -> Union[List[pd.DataFrame], pd.DataFrame, None]:
def query(
self, sql: Union[str, List[str]]
) -> Union[List[pd.DataFrame], pd.DataFrame, None]:
"""
Runs a SQL or T-SQL query (or multiple queries) against a Fabric Warehouse/Lakehouse.
Expand Down Expand Up @@ -127,10 +136,30 @@ def close(self):


class ConnectWarehouse(ConnectBase):
def __init__(self, warehouse: str, workspace: Optional[Union[str, uuid.UUID]] = None, timeout: Optional[int] = None):
super().__init__(name=warehouse, workspace=workspace, timeout=timeout, endpoint_type="warehouse")
def __init__(
self,
warehouse: str,
workspace: Optional[Union[str, uuid.UUID]] = None,
timeout: Optional[int] = None,
):
super().__init__(
name=warehouse,
workspace=workspace,
timeout=timeout,
endpoint_type="warehouse",
)


class ConnectLakehouse(ConnectBase):
def __init__(self, lakehouse: str, workspace: Optional[Union[str, uuid.UUID]] = None, timeout: Optional[int] = None):
super().__init__(name=lakehouse, workspace=workspace, timeout=timeout, endpoint_type="lakehouse")
def __init__(
self,
lakehouse: str,
workspace: Optional[Union[str, uuid.UUID]] = None,
timeout: Optional[int] = None,
):
super().__init__(
name=lakehouse,
workspace=workspace,
timeout=timeout,
endpoint_type="lakehouse",
)
4 changes: 2 additions & 2 deletions src/sempy_labs/_vertipaq.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,8 @@ def vertipaq_analyzer(
for key, value in vertipaq_map[key_name].items()
}
)
schema['RunId'] = data_type_long
schema['Timestamp'] = data_type_timestamp
schema["RunId"] = data_type_long
schema["Timestamp"] = data_type_timestamp

delta_table_name = f"VertipaqAnalyzer_{obj}".lower()
save_as_delta_table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ def update_direct_lake_model_lakehouse_connection(
The Fabric workspace used by the lakehouse.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.
Returns
-------
"""

workspace = fabric.resolve_workspace_name(workspace)
Expand All @@ -57,25 +53,19 @@ def update_direct_lake_model_lakehouse_connection(
f"Therefore it cannot be used to support the '{dataset}' semantic model within the '{workspace}' workspace."
)

dfP = fabric.list_partitions(dataset=dataset, workspace=workspace)
dfP_filt = dfP[dfP["Mode"] == "DirectLake"]
shEx = get_shared_expression(lakehouse, lakehouse_workspace)

if len(dfP_filt) == 0:
raise ValueError(
f"{icons.red_dot} The '{dataset}' semantic model is not in Direct Lake. This function is only applicable to Direct Lake semantic models."
)
else:
with connect_semantic_model(
dataset=dataset, readonly=False, workspace=workspace
) as tom:
with connect_semantic_model(
dataset=dataset, readonly=False, workspace=workspace
) as tom:

if not tom.is_direct_lake():
raise ValueError(
f"{icons.red_dot} The '{dataset}' semantic model is not in Direct Lake. This function is only applicable to Direct Lake semantic models."
)

tom.model.Expressions["DatabaseQuery"].Expression = shEx

shEx = get_shared_expression(lakehouse, lakehouse_workspace)
try:
tom.model.Expressions["DatabaseQuery"].Expression = shEx
print(
f"{icons.green_dot} The expression in the '{dataset}' semantic model has been updated to point to the '{lakehouse}' lakehouse in the '{lakehouse_workspace}' workspace."
)
except Exception as e:
raise ValueError(
f"{icons.red_dot} The expression in the '{dataset}' semantic model was not updated."
) from e
print(
f"{icons.green_dot} The expression in the '{dataset}' semantic model has been updated to point to the '{lakehouse}' lakehouse in the '{lakehouse_workspace}' workspace."
)

0 comments on commit 6a84ef7

Please sign in to comment.