From 699cd305ec7d54dc0a08222c45d41f974f110992 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 17 Sep 2024 10:18:26 +0300 Subject: [PATCH 1/2] added functionality for connectlakehouse --- notebooks/SQL.ipynb | 1 + notebooks/Warehouse.ipynb | 1 - src/sempy_labs/__init__.py | 2 ++ src/sempy_labs/_connections.py | 23 +++++++------ src/sempy_labs/_model_bpa.py | 2 +- src/sempy_labs/_query_scale_out.py | 6 ++-- src/sempy_labs/_sql.py | 55 ++++++++++++++++++------------ 7 files changed, 55 insertions(+), 35 deletions(-) create mode 100644 notebooks/SQL.ipynb delete mode 100644 notebooks/Warehouse.ipynb diff --git a/notebooks/SQL.ipynb b/notebooks/SQL.ipynb new file mode 100644 index 00000000..3f83a325 --- /dev/null +++ b/notebooks/SQL.ipynb @@ -0,0 +1 @@ +{"cells":[{"cell_type":"markdown","id":"5c27dfd1-4fe0-4a97-92e6-ddf78889aa93","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["### Install the latest .whl package\n","\n","Check [here](https://pypi.org/project/semantic-link-labs/) to see the latest version."]},{"cell_type":"code","execution_count":null,"id":"d5cae9db-cef9-48a8-a351-9c5fcc99645c","metadata":{"jupyter":{"outputs_hidden":true,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["%pip install semantic-link-labs"]},{"cell_type":"markdown","id":"b195eae8","metadata":{},"source":["### Import the library and necessary packages"]},{"cell_type":"code","execution_count":null,"id":"1344e286","metadata":{},"outputs":[],"source":["import sempy_labs as labs\n","from sempy_labs import ConnectWarehouse\n","from sempy_labs import ConnectLakehouse\n","\n","lakehouse_name = ''\n","lakehouse_workspace_name = ''\n","warehouse_name = ''\n","warehouse_workspace_name = ''"]},{"cell_type":"markdown","id":"55e5ca67","metadata":{},"source":["### Run a SQL query (or queries) against a Fabric warehouse"]},{"cell_type":"code","execution_count":null,"id":"a9f984e9","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=warehouse_workspace_name) as sql:\n"," df = sql.query(\"SELECT * FROM Product\")\n"," display(df)"]},{"cell_type":"code","execution_count":null,"id":"865ac4a1","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=warehouse_workspace_name) as sql:\n"," dfs = sql.query([\"SELECT * FROM Product\", \"SELECT DISTINCT [Category] FROM Product\"])\n","\n","for df in dfs:\n"," display(df)"]},{"cell_type":"markdown","id":"634700c3","metadata":{},"source":["### Run a T-SQL query (or queries) against a Fabric warehouse"]},{"cell_type":"code","execution_count":null,"id":"5dbf34f3","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=warehouse_workspace_name) as sql:\n"," sql.query(\"CREATE SCHEMA [Business]\")"]},{"cell_type":"code","execution_count":null,"id":"ec8ddb81","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=warehouse_workspace_name) as sql:\n"," sql.query([\"CREATE SCHEMA [Business]\", \"CREATE SCHEMA [Marketing]\"])"]},{"cell_type":"markdown","id":"d5b090da","metadata":{},"source":["### Run a SQL query (or queries) against a Fabric lakehouse"]},{"cell_type":"code","execution_count":null,"id":"4dca7f4a","metadata":{},"outputs":[],"source":["with ConnectLakehouse(lakehouse=lakehouse_name, workspace=lakehouse_workspace_name) as sql:\n"," df = sql.query(\"SELECT * FROM Product\")\n"," display(df)"]},{"cell_type":"code","execution_count":null,"id":"b9606ae8","metadata":{},"outputs":[],"source":["with ConnectLakehouse(lakehouse=lakehouse_name, workspace=lakehouse_workspace_name) as sql:\n"," dfs = sql.query([\"SELECT * FROM Product\", \"SELECT DISTINCT [Category] FROM Product\"])\n","\n","for df in dfs:\n"," display(df)"]}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python"},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5} diff --git a/notebooks/Warehouse.ipynb b/notebooks/Warehouse.ipynb deleted file mode 100644 index f60f6f7d..00000000 --- a/notebooks/Warehouse.ipynb +++ /dev/null @@ -1 +0,0 @@ -{"cells":[{"cell_type":"markdown","id":"5c27dfd1-4fe0-4a97-92e6-ddf78889aa93","metadata":{"nteract":{"transient":{"deleting":false}}},"source":["### Install the latest .whl package\n","\n","Check [here](https://pypi.org/project/semantic-link-labs/) to see the latest version."]},{"cell_type":"code","execution_count":null,"id":"d5cae9db-cef9-48a8-a351-9c5fcc99645c","metadata":{"jupyter":{"outputs_hidden":true,"source_hidden":false},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["%pip install semantic-link-labs"]},{"cell_type":"markdown","id":"b195eae8","metadata":{},"source":["### Import the library and necessary packages"]},{"cell_type":"code","execution_count":null,"id":"1344e286","metadata":{},"outputs":[],"source":["import sempy_labs as labs\n","from sempy_labs import ConnectWarehouse\n","warehouse_name = ''\n","workspace_name = ''"]},{"cell_type":"markdown","id":"55e5ca67","metadata":{},"source":["### Run a SQL query against a Fabric warehouse"]},{"cell_type":"code","execution_count":null,"id":"a9f984e9","metadata":{},"outputs":[],"source":["with ConnectWarehouse(warehouse=warehouse_name, workspace=workspace_name) as sql:\n"," df = sql.query(\"SELECT * FROM Product\")"]},{"cell_type":"markdown","id":"d5b090da","metadata":{},"source":["#### Display the results of the query"]},{"cell_type":"code","execution_count":null,"id":"05953d6d","metadata":{},"outputs":[],"source":["df"]},{"cell_type":"code","execution_count":null,"id":"4dca7f4a","metadata":{},"outputs":[],"source":["display(df)"]}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python"},"nteract":{"version":"nteract-front-end@1.0.0"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5} diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index 45ea5d26..f1c40330 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -28,6 +28,7 @@ ) from sempy_labs._sql import ( ConnectWarehouse, + ConnectLakehouse, ) from sempy_labs._capacities import ( check_fabric_capacity_name_availablility, @@ -175,6 +176,7 @@ __all__ = [ "resolve_warehouse_id", "ConnectWarehouse", + "ConnectLakehouse", "update_semantic_model_from_bim", "list_connections", "get_semantic_model_size", diff --git a/src/sempy_labs/_connections.py b/src/sempy_labs/_connections.py index c4761cdb..447a4826 100644 --- a/src/sempy_labs/_connections.py +++ b/src/sempy_labs/_connections.py @@ -77,8 +77,9 @@ def list_connections() -> pd.DataFrame: return df -def list_item_connections(item_name: str, item_type: str, workspace: Optional[str] = None) -> pd.DataFrame: - +def list_item_connections( + item_name: str, item_type: str, workspace: Optional[str] = None +) -> pd.DataFrame: """ Shows the list of connections that the specified item is connected to. @@ -104,7 +105,9 @@ def list_item_connections(item_name: str, item_type: str, workspace: Optional[st workspace = fabric.resolve_workspace_name(workspace) workspace_id = fabric.resolve_workspace_id(workspace) item_type = item_type[0].upper() + item_type[1:] - item_id = fabric.resolve_item_id(item_name=item_name, type=item_type, workspace=workspace) + item_id = fabric.resolve_item_id( + item_name=item_name, type=item_type, workspace=workspace + ) client = fabric.FabricRestClient() response = client.post(f"/v1/workspaces/{workspace_id}/items/{item_id}/connections") @@ -126,14 +129,14 @@ def list_item_connections(item_name: str, item_type: str, workspace: Optional[st respnoses = pagination(client, response) for r in respnoses: - for v in r.get('value', []): + for v in r.get("value", []): new_data = { - "Connection Name": v.get('displayName'), - "Connection Id": v.get('id'), - "Connectivity Type": v.get('connectivityType'), - "Connection Type": v.get('connectionDetails', {}).get('type'), - "Connection Path": v.get('connectionDetails', {}).get('path'), - "Gateway Id": v.get('gatewayId'), + "Connection Name": v.get("displayName"), + "Connection Id": v.get("id"), + "Connectivity Type": v.get("connectivityType"), + "Connection Type": v.get("connectionDetails", {}).get("type"), + "Connection Path": v.get("connectionDetails", {}).get("path"), + "Gateway Id": v.get("gatewayId"), } df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) diff --git a/src/sempy_labs/_model_bpa.py b/src/sempy_labs/_model_bpa.py index d2aaf869..4f75327d 100644 --- a/src/sempy_labs/_model_bpa.py +++ b/src/sempy_labs/_model_bpa.py @@ -506,7 +506,7 @@ def translate_using_spark(rule_file): content_html += f'{row["Object Type"]}' content_html += f'{row["Object Name"]}' content_html += f'{row["Severity"]}' - #content_html += f'{row["Severity"]}' + # content_html += f'{row["Severity"]}' content_html += "" content_html += "" diff --git a/src/sempy_labs/_query_scale_out.py b/src/sempy_labs/_query_scale_out.py index eae407cd..80e7fc4c 100644 --- a/src/sempy_labs/_query_scale_out.py +++ b/src/sempy_labs/_query_scale_out.py @@ -413,10 +413,12 @@ def set_workspace_default_storage_format( dfW = fabric.list_workspaces(filter=f"name eq '{workspace}'") if len(dfW) == 0: raise ValueError() - current_storage_format = dfW['Default Dataset Storage Format'].iloc[0] + current_storage_format = dfW["Default Dataset Storage Format"].iloc[0] if current_storage_format == storage_format: - print(f"{icons.info} The '{workspace}' is already set to a default storage format of '{current_storage_format}'.") + print( + f"{icons.info} The '{workspace}' is already set to a default storage format of '{current_storage_format}'." + ) return request_body = { diff --git a/src/sempy_labs/_sql.py b/src/sempy_labs/_sql.py index 7c41ca06..12812e87 100644 --- a/src/sempy_labs/_sql.py +++ b/src/sempy_labs/_sql.py @@ -6,7 +6,7 @@ import uuid from itertools import chain, repeat from sempy.fabric.exceptions import FabricHTTPException -from sempy_labs._helper_functions import resolve_warehouse_id +from sempy_labs._helper_functions import resolve_warehouse_id, resolve_lakehouse_id def bytes2mswin_bstr(value: bytes) -> bytes: @@ -28,30 +28,41 @@ def bytes2mswin_bstr(value: bytes) -> bytes: return struct.pack(" Union[List[pd.DataFrame], pd.DataFrame, None]: + def query(self, sql: Union[str, List[str]]) -> Union[List[pd.DataFrame], pd.DataFrame, None]: """ - Runs a SQL or T-SQL query (or multiple queries) against a Fabric Warehouse. + Runs a SQL or T-SQL query (or multiple queries) against a Fabric Warehouse/Lakehouse. Parameters ---------- @@ -76,10 +85,10 @@ def query( A list of pandas DataFrames if multiple SQL queries return results, a single DataFrame if one query is executed and returns results, or None. """ + cursor = None - results = [] # To store results from multiple queries if needed + results = [] - # If the input is a single string, convert it to a list for consistency if isinstance(sql, str): sql = [sql] @@ -89,22 +98,16 @@ def query( for sql_query in sql: cursor.execute(sql_query) - # Commit for non-select queries (like CREATE, INSERT, etc.) if not cursor.description: self.connection.commit() else: - # Fetch and append results for queries that return a result set result = pd.DataFrame.from_records( cursor.fetchall(), columns=[col[0] for col in cursor.description], ) results.append(result) - # Return results if any queries returned a result set - if results: - return results if len(results) > 1 else results[0] - else: - return None + return results if len(results) > 1 else (results[0] if results else None) finally: if cursor: @@ -118,3 +121,13 @@ def __exit__(self, type, value, traceback): def close(self): self.connection.close() + + +class ConnectWarehouse(ConnectBase): + def __init__(self, warehouse: str, workspace: Optional[Union[str, uuid.UUID]] = None, timeout: Optional[int] = None): + super().__init__(name=warehouse, workspace=workspace, timeout=timeout, endpoint_type="warehouse") + + +class ConnectLakehouse(ConnectBase): + def __init__(self, lakehouse: str, workspace: Optional[Union[str, uuid.UUID]] = None, timeout: Optional[int] = None): + super().__init__(name=lakehouse, workspace=workspace, timeout=timeout, endpoint_type="lakehouse") From 3b3184fbf5275e41b2efd30ce66cd13df5059667 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 17 Sep 2024 20:08:43 +0300 Subject: [PATCH 2/2] added comments back in --- src/sempy_labs/_sql.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sempy_labs/_sql.py b/src/sempy_labs/_sql.py index 12812e87..64a4fddc 100644 --- a/src/sempy_labs/_sql.py +++ b/src/sempy_labs/_sql.py @@ -98,15 +98,18 @@ def query(self, sql: Union[str, List[str]]) -> Union[List[pd.DataFrame], pd.Data for sql_query in sql: cursor.execute(sql_query) + # Commit for non-select queries (like CREATE, INSERT, etc.) if not cursor.description: self.connection.commit() else: + # Fetch and append results for queries that return a result set result = pd.DataFrame.from_records( cursor.fetchall(), columns=[col[0] for col in cursor.description], ) results.append(result) + # Return results if any queries returned a result set return results if len(results) > 1 else (results[0] if results else None) finally: