Skip to content

Commit

Permalink
Merge branch 'm-kovalsky/connectlakehouse'
Browse files Browse the repository at this point in the history
  • Loading branch information
m-kovalsky committed Sep 18, 2024
2 parents d26133c + 3b3184f commit a4a9f22
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 32 deletions.
1 change: 1 addition & 0 deletions notebooks/SQL.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cells":[{"cell_type":"markdown","id":"5c27dfd1-4fe0-4a97-92e6-ddf78889aa93","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["### Install the latest .whl package\n","\n","Check [here](https://pypi.org/project/semantic-link-labs/) to see the latest version."]},{"cell_type":"code","execution_count":null,"id":"d5cae9db-cef9-48a8-a351-9c5fcc99645c","metadata":{"jupyter":{"outputs_hidden":true,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["%pip install semantic-link-labs"]},{"cell_type":"markdown","id":"b195eae8","metadata":{},"source":["### Import the library and necessary packages"]},{"cell_type":"code","execution_count":null,"id":"1344e286","metadata":{},"outputs":[],"source":["import sempy_labs as labs\n","from sempy_labs import ConnectWarehouse\n","from sempy_labs import ConnectLakehouse\n","\n","lakehouse_name = ''\n","lakehouse_workspace_name = ''\n","warehouse_name = ''\n","warehouse_workspace_name = ''"]},{"cell_type":"markdown","id":"55e5ca67","metadata":{},"source":["### Run a SQL query (or queries) against a Fabric warehouse"]},{"cell_type":"code","execution_count":null,"id":"a9f984e9","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=warehouse_workspace_name) as sql:\n"," df = sql.query(\"SELECT * FROM Product\")\n"," display(df)"]},{"cell_type":"code","execution_count":null,"id":"865ac4a1","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=warehouse_workspace_name) as sql:\n"," dfs = sql.query([\"SELECT * FROM Product\", \"SELECT DISTINCT [Category] FROM Product\"])\n","\n","for df in dfs:\n"," display(df)"]},{"cell_type":"markdown","id":"634700c3","metadata":{},"source":["### Run a T-SQL query (or queries) against a Fabric warehouse"]},{"cell_type":"code","execution_count":null,"id":"5dbf34f3","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=warehouse_workspace_name) as sql:\n"," sql.query(\"CREATE SCHEMA [Business]\")"]},{"cell_type":"code","execution_count":null,"id":"ec8ddb81","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=warehouse_workspace_name) as sql:\n"," sql.query([\"CREATE SCHEMA [Business]\", \"CREATE SCHEMA [Marketing]\"])"]},{"cell_type":"markdown","id":"d5b090da","metadata":{},"source":["### Run a SQL query (or queries) against a Fabric lakehouse"]},{"cell_type":"code","execution_count":null,"id":"4dca7f4a","metadata":{},"outputs":[],"source":["with ConnectLakehouse(lakehouse=lakehouse_name, workspace=lakehouse_workspace_name) as sql:\n"," df = sql.query(\"SELECT * FROM Product\")\n"," display(df)"]},{"cell_type":"code","execution_count":null,"id":"b9606ae8","metadata":{},"outputs":[],"source":["with ConnectLakehouse(lakehouse=lakehouse_name, workspace=lakehouse_workspace_name) as sql:\n"," dfs = sql.query([\"SELECT * FROM Product\", \"SELECT DISTINCT [Category] FROM Product\"])\n","\n","for df in dfs:\n"," display(df)"]}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python"},"nteract":{"version":"[email protected]"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5}
1 change: 0 additions & 1 deletion notebooks/Warehouse.ipynb

This file was deleted.

2 changes: 2 additions & 0 deletions src/sempy_labs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from sempy_labs._sql import (
ConnectWarehouse,
ConnectLakehouse,
)
from sempy_labs._capacities import (
check_fabric_capacity_name_availablility,
Expand Down Expand Up @@ -175,6 +176,7 @@
__all__ = [
"resolve_warehouse_id",
"ConnectWarehouse",
"ConnectLakehouse",
"update_semantic_model_from_bim",
"list_connections",
"get_semantic_model_size",
Expand Down
23 changes: 13 additions & 10 deletions src/sempy_labs/_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ def list_connections() -> pd.DataFrame:
return df


def list_item_connections(item_name: str, item_type: str, workspace: Optional[str] = None) -> pd.DataFrame:

def list_item_connections(
item_name: str, item_type: str, workspace: Optional[str] = None
) -> pd.DataFrame:
"""
Shows the list of connections that the specified item is connected to.
Expand All @@ -104,7 +105,9 @@ def list_item_connections(item_name: str, item_type: str, workspace: Optional[st
workspace = fabric.resolve_workspace_name(workspace)
workspace_id = fabric.resolve_workspace_id(workspace)
item_type = item_type[0].upper() + item_type[1:]
item_id = fabric.resolve_item_id(item_name=item_name, type=item_type, workspace=workspace)
item_id = fabric.resolve_item_id(
item_name=item_name, type=item_type, workspace=workspace
)

client = fabric.FabricRestClient()
response = client.post(f"/v1/workspaces/{workspace_id}/items/{item_id}/connections")
Expand All @@ -126,14 +129,14 @@ def list_item_connections(item_name: str, item_type: str, workspace: Optional[st
respnoses = pagination(client, response)

for r in respnoses:
for v in r.get('value', []):
for v in r.get("value", []):
new_data = {
"Connection Name": v.get('displayName'),
"Connection Id": v.get('id'),
"Connectivity Type": v.get('connectivityType'),
"Connection Type": v.get('connectionDetails', {}).get('type'),
"Connection Path": v.get('connectionDetails', {}).get('path'),
"Gateway Id": v.get('gatewayId'),
"Connection Name": v.get("displayName"),
"Connection Id": v.get("id"),
"Connectivity Type": v.get("connectivityType"),
"Connection Type": v.get("connectionDetails", {}).get("type"),
"Connection Path": v.get("connectionDetails", {}).get("path"),
"Gateway Id": v.get("gatewayId"),
}

df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True)
Expand Down
2 changes: 1 addition & 1 deletion src/sempy_labs/_model_bpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ def translate_using_spark(rule_file):
content_html += f'<td>{row["Object Type"]}</td>'
content_html += f'<td>{row["Object Name"]}</td>'
content_html += f'<td style="text-align: center;">{row["Severity"]}</td>'
#content_html += f'<td>{row["Severity"]}</td>'
# content_html += f'<td>{row["Severity"]}</td>'
content_html += "</tr>"
content_html += "</table>"

Expand Down
6 changes: 4 additions & 2 deletions src/sempy_labs/_query_scale_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,12 @@ def set_workspace_default_storage_format(
dfW = fabric.list_workspaces(filter=f"name eq '{workspace}'")
if len(dfW) == 0:
raise ValueError()
current_storage_format = dfW['Default Dataset Storage Format'].iloc[0]
current_storage_format = dfW["Default Dataset Storage Format"].iloc[0]

if current_storage_format == storage_format:
print(f"{icons.info} The '{workspace}' is already set to a default storage format of '{current_storage_format}'.")
print(
f"{icons.info} The '{workspace}' is already set to a default storage format of '{current_storage_format}'."
)
return

request_body = {
Expand Down
52 changes: 34 additions & 18 deletions src/sempy_labs/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import uuid
from itertools import chain, repeat
from sempy.fabric.exceptions import FabricHTTPException
from sempy_labs._helper_functions import resolve_warehouse_id
from sempy_labs._helper_functions import resolve_warehouse_id, resolve_lakehouse_id


def bytes2mswin_bstr(value: bytes) -> bytes:
Expand All @@ -28,42 +28,51 @@ def bytes2mswin_bstr(value: bytes) -> bytes:
return struct.pack("<i", len(encoded_bytes)) + encoded_bytes


class ConnectWarehouse:
class ConnectBase:
def __init__(
self,
warehouse: str,
name: str,
workspace: Optional[Union[str, uuid.UUID]] = None,
timeout: Optional[int] = None,
endpoint_type: str = "warehouse",
):
from sempy.fabric._token_provider import SynapseTokenProvider
import pyodbc

workspace = fabric.resolve_workspace_name(workspace)
workspace_id = fabric.resolve_workspace_id(workspace)
warehouse_id = resolve_warehouse_id(warehouse=warehouse, workspace=workspace)

# get the TDS endpoint
# Resolve the appropriate ID (warehouse or lakehouse)
if endpoint_type == "warehouse":
resource_id = resolve_warehouse_id(warehouse=name, workspace=workspace)
else:
resource_id = resolve_lakehouse_id(lakehouse=name, workspace=workspace)

# Get the TDS endpoint
client = fabric.FabricRestClient()
response = client.get(f"v1/workspaces/{workspace_id}/warehouses/{warehouse_id}")
response = client.get(f"v1/workspaces/{workspace_id}/{endpoint_type}s/{resource_id}")
if response.status_code != 200:
raise FabricHTTPException(response)
tds_endpoint = response.json().get("properties", {}).get("connectionString")

if endpoint_type == "warehouse":
tds_endpoint = response.json().get("properties", {}).get("connectionString")
else:
tds_endpoint = response.json().get("properties", {}).get('sqlEndpointProperties', {}).get("connectionString")

# Set up the connection string
access_token = SynapseTokenProvider()()
tokenstruct = bytes2mswin_bstr(access_token.encode())
conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={warehouse};Encrypt=Yes;"
conn_str = f"DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={tds_endpoint};DATABASE={name};Encrypt=Yes;"

if timeout is not None:
conn_str += f"Connect Timeout={timeout};"

self.connection = pyodbc.connect(conn_str, attrs_before={1256: tokenstruct})

@log
def query(
self, sql: Union[str, List[str]]
) -> Union[List[pd.DataFrame], pd.DataFrame, None]:
def query(self, sql: Union[str, List[str]]) -> Union[List[pd.DataFrame], pd.DataFrame, None]:
"""
Runs a SQL or T-SQL query (or multiple queries) against a Fabric Warehouse.
Runs a SQL or T-SQL query (or multiple queries) against a Fabric Warehouse/Lakehouse.
Parameters
----------
Expand All @@ -76,10 +85,10 @@ def query(
A list of pandas DataFrames if multiple SQL queries return results,
a single DataFrame if one query is executed and returns results, or None.
"""

cursor = None
results = [] # To store results from multiple queries if needed
results = []

# If the input is a single string, convert it to a list for consistency
if isinstance(sql, str):
sql = [sql]

Expand All @@ -101,10 +110,7 @@ def query(
results.append(result)

# Return results if any queries returned a result set
if results:
return results if len(results) > 1 else results[0]
else:
return None
return results if len(results) > 1 else (results[0] if results else None)

finally:
if cursor:
Expand All @@ -118,3 +124,13 @@ def __exit__(self, type, value, traceback):

def close(self):
self.connection.close()


class ConnectWarehouse(ConnectBase):
def __init__(self, warehouse: str, workspace: Optional[Union[str, uuid.UUID]] = None, timeout: Optional[int] = None):
super().__init__(name=warehouse, workspace=workspace, timeout=timeout, endpoint_type="warehouse")


class ConnectLakehouse(ConnectBase):
def __init__(self, lakehouse: str, workspace: Optional[Union[str, uuid.UUID]] = None, timeout: Optional[int] = None):
super().__init__(name=lakehouse, workspace=workspace, timeout=timeout, endpoint_type="lakehouse")

0 comments on commit a4a9f22

Please sign in to comment.