From 8f6adc5fc3511f2efed546107bc945281871d9fa Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Nov 2024 14:22:53 +0200 Subject: [PATCH 1/3] added mirroring functions --- src/sempy_labs/_mirrored_databases.py | 170 ++++++++++++++++++++++++++ src/sempy_labs/_ml_models.py | 2 +- 2 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 src/sempy_labs/_mirrored_databases.py diff --git a/src/sempy_labs/_mirrored_databases.py b/src/sempy_labs/_mirrored_databases.py new file mode 100644 index 00000000..743c7219 --- /dev/null +++ b/src/sempy_labs/_mirrored_databases.py @@ -0,0 +1,170 @@ +import sempy.fabric as fabric +import pandas as pd +from typing import Optional +from sempy_labs._helper_functions import ( + resolve_workspace_name_and_id, + pagination, + lro, +) +from sempy.fabric.exceptions import FabricHTTPException +import sempy_labs._icons as icons + + +def list_mirrored_databases(workspace: Optional[str] = None) -> pd.DataFrame: + """ + Shows the mirrored databases within a workspace. + + This is a wrapper function for the following API: `Items - List Mirrored Databases `_. + + Parameters + ---------- + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + + Returns + ------- + pandas.DataFrame + A pandas dataframe showing the mirrored databases within a workspace. + """ + + df = pd.DataFrame( + columns=["Mirrored Database Name", "Mirrored Database Id", "Description", "OneLake Tables Path", "SQL Endpoint Connection String", "SQL Endpoint Id", "Provisioning Status", "Default Schema"] + ) + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + + client = fabric.FabricRestClient() + response = client.get(f"/v1/workspaces/{workspace_id}/mirroredDatabases") + if response.status_code != 200: + raise FabricHTTPException(response) + responses = pagination(client, response) + + for r in responses: + for v in r.get("value", []): + prop = v.get('properties', {}) + sql = prop.get('sqlEndpointProperties', {}) + new_data = { + "Mirrored Database Name": v.get("displayName"), + "Mirrored Database Id": v.get("id"), + "Description": v.get("description"), + "OneLake Tables Path": prop.get('oneLakeTablesPath') if prop is not None else None, + "SQL Endpoint Connection String": sql.get('connectionString') if sql is not None else None, + "SQL Endpoint Id": sql.get('id') if sql is not None else None, + "Provisioning Status": sql.get('provisioningStatus') if sql is not None else None, + "Default Schema": prop.get('defaultSchema') if prop is not None else None, + } + df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) + + return df + + +def create_mirrored_database( + name: str, description: Optional[str] = None, workspace: Optional[str] = None +): + """ + Creates a Fabric mirrored database. + + This is a wrapper function for the following API: `Items - Create Mirrored Database `_. + + Parameters + ---------- + name: str + Name of the mirrored database. + description : str, default=None + A description of the mirrored database. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + + request_body = {"displayName": name} + + if description: + request_body["description"] = description + + client = fabric.FabricRestClient() + response = client.post(f"/v1/workspaces/{workspace_id}/mirroredDatabases", json=request_body) + + if response.status_code != 201: + raise FabricHTTPException(response) + + print( + f"{icons.green_dot} The '{name}' mirrored database has been created within the '{workspace}' workspace." + ) + + +def delete_mirrored_database(name: str, workspace: Optional[str] = None): + """ + Deletes a mirrored database. + + This is a wrapper function for the following API: `Items - Delete Mirrored Database `_. + + Parameters + ---------- + name: str + Name of the mirrored database. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + + item_id = fabric.resolve_item_id( + item_name=name, type="MirroredDatabase", workspace=workspace + ) + + client = fabric.FabricRestClient() + response = client.delete(f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}") + + if response.status_code != 200: + raise FabricHTTPException(response) + + print( + f"{icons.green_dot} The '{name}' mirrored database within the '{workspace}' workspace has been deleted." + ) + + +def get_mirroring_status(mirrored_database: str, workspace: Optional[str] = None) -> str: + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + + item_id = fabric.resolve_item_id(item_name=mirrored_database, type='MirroredDatabase', workspace=workspace) + + client = fabric.FabricRestClient() + response = client.post(f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getMirroringStatus") + + if response.status_code != 200: + raise FabricHTTPException(response) + + return response.json().get('status', {}) + + +def get_tables_mirroring_status(mirrored_database: str, workspace: Optional[str] = None) -> str: + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + + item_id = fabric.resolve_item_id(item_name=mirrored_database, type='MirroredDatabase', workspace=workspace) + + client = fabric.FabricRestClient() + response = client.post(f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getTablesMirroringStatus") + + if response.status_code != 200: + raise FabricHTTPException(response) + + return response.json().get('status', {}) + + + + +def start_mirroring(): + + + +def stop_mirroring(): \ No newline at end of file diff --git a/src/sempy_labs/_ml_models.py b/src/sempy_labs/_ml_models.py index 7f02b1ef..f295f67b 100644 --- a/src/sempy_labs/_ml_models.py +++ b/src/sempy_labs/_ml_models.py @@ -69,7 +69,7 @@ def create_ml_model( name: str Name of the ML model. description : str, default=None - A description of the environment. + A description of the ML model. workspace : str, default=None The Fabric workspace name. Defaults to None which resolves to the workspace of the attached lakehouse From 7b2e8b708c6f8c8c1e98129ad78a4f1206f3fd01 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 6 Nov 2024 20:53:03 +0200 Subject: [PATCH 2/3] added more mirrored database functions --- src/sempy_labs/__init__.py | 20 ++ src/sempy_labs/_mirrored_databases.py | 322 +++++++++++++++++++++++--- src/sempy_labs/_notebooks.py | 2 +- 3 files changed, 315 insertions(+), 29 deletions(-) diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index 576ad1ce..3f55188f 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -1,3 +1,14 @@ +from sempy_labs._mirrored_databases import ( + get_mirrored_database_definition, + get_mirroring_status, + list_mirrored_databases, + stop_mirroring, + start_mirroring, + create_mirrored_database, + delete_mirrored_database, + update_mirrored_database_definition, + get_tables_mirroring_status, +) from sempy_labs._managed_private_endpoints import ( list_managed_private_endpoints, create_managed_private_endpoint, @@ -403,4 +414,13 @@ "delete_managed_private_endpoint", "get_dax_query_dependencies", "get_dax_query_memory_size", + "get_mirrored_database_definition", + "get_mirroring_status", + "list_mirrored_databases", + "stop_mirroring", + "start_mirroring", + "create_mirrored_database", + "delete_mirrored_database", + "update_mirrored_database_definition", + "get_tables_mirroring_status", ] diff --git a/src/sempy_labs/_mirrored_databases.py b/src/sempy_labs/_mirrored_databases.py index 743c7219..c5b87453 100644 --- a/src/sempy_labs/_mirrored_databases.py +++ b/src/sempy_labs/_mirrored_databases.py @@ -5,9 +5,11 @@ resolve_workspace_name_and_id, pagination, lro, + _decode_b64, ) from sempy.fabric.exceptions import FabricHTTPException import sempy_labs._icons as icons +import base64 def list_mirrored_databases(workspace: Optional[str] = None) -> pd.DataFrame: @@ -30,7 +32,16 @@ def list_mirrored_databases(workspace: Optional[str] = None) -> pd.DataFrame: """ df = pd.DataFrame( - columns=["Mirrored Database Name", "Mirrored Database Id", "Description", "OneLake Tables Path", "SQL Endpoint Connection String", "SQL Endpoint Id", "Provisioning Status", "Default Schema"] + columns=[ + "Mirrored Database Name", + "Mirrored Database Id", + "Description", + "OneLake Tables Path", + "SQL Endpoint Connection String", + "SQL Endpoint Id", + "Provisioning Status", + "Default Schema", + ] ) (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) @@ -43,17 +54,25 @@ def list_mirrored_databases(workspace: Optional[str] = None) -> pd.DataFrame: for r in responses: for v in r.get("value", []): - prop = v.get('properties', {}) - sql = prop.get('sqlEndpointProperties', {}) + prop = v.get("properties", {}) + sql = prop.get("sqlEndpointProperties", {}) new_data = { "Mirrored Database Name": v.get("displayName"), "Mirrored Database Id": v.get("id"), "Description": v.get("description"), - "OneLake Tables Path": prop.get('oneLakeTablesPath') if prop is not None else None, - "SQL Endpoint Connection String": sql.get('connectionString') if sql is not None else None, - "SQL Endpoint Id": sql.get('id') if sql is not None else None, - "Provisioning Status": sql.get('provisioningStatus') if sql is not None else None, - "Default Schema": prop.get('defaultSchema') if prop is not None else None, + "OneLake Tables Path": ( + prop.get("oneLakeTablesPath") if prop is not None else None + ), + "SQL Endpoint Connection String": ( + sql.get("connectionString") if sql is not None else None + ), + "SQL Endpoint Id": sql.get("id") if sql is not None else None, + "Provisioning Status": ( + sql.get("provisioningStatus") if sql is not None else None + ), + "Default Schema": ( + prop.get("defaultSchema") if prop is not None else None + ), } df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) @@ -66,7 +85,7 @@ def create_mirrored_database( """ Creates a Fabric mirrored database. - This is a wrapper function for the following API: `Items - Create Mirrored Database `_. + This is a wrapper function for the following API: `Items - Create Mirrored Database `_. Parameters ---------- @@ -88,7 +107,9 @@ def create_mirrored_database( request_body["description"] = description client = fabric.FabricRestClient() - response = client.post(f"/v1/workspaces/{workspace_id}/mirroredDatabases", json=request_body) + response = client.post( + f"/v1/workspaces/{workspace_id}/mirroredDatabases", json=request_body + ) if response.status_code != 201: raise FabricHTTPException(response) @@ -98,15 +119,15 @@ def create_mirrored_database( ) -def delete_mirrored_database(name: str, workspace: Optional[str] = None): +def delete_mirrored_database(mirrored_database: str, workspace: Optional[str] = None): """ Deletes a mirrored database. - This is a wrapper function for the following API: `Items - Delete Mirrored Database `_. + This is a wrapper function for the following API: `Items - Delete Mirrored Database `_. Parameters ---------- - name: str + mirrored_database: str Name of the mirrored database. workspace : str, default=None The Fabric workspace name. @@ -117,54 +138,299 @@ def delete_mirrored_database(name: str, workspace: Optional[str] = None): (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) item_id = fabric.resolve_item_id( - item_name=name, type="MirroredDatabase", workspace=workspace + item_name=mirrored_database, type="MirroredDatabase", workspace=workspace ) client = fabric.FabricRestClient() - response = client.delete(f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}") + response = client.delete( + f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}" + ) if response.status_code != 200: raise FabricHTTPException(response) print( - f"{icons.green_dot} The '{name}' mirrored database within the '{workspace}' workspace has been deleted." + f"{icons.green_dot} The '{mirrored_database}' mirrored database within the '{workspace}' workspace has been deleted." ) -def get_mirroring_status(mirrored_database: str, workspace: Optional[str] = None) -> str: +def get_mirroring_status( + mirrored_database: str, workspace: Optional[str] = None +) -> str: + """ + Get the status of the mirrored database. + + This is a wrapper function for the following API: `Mirroring - Get Mirroring Status `_. + + Parameters + ---------- + mirrored_database: str + Name of the mirrored database. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + + Returns + ------- + str + The status of a mirrored database. + """ (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) - item_id = fabric.resolve_item_id(item_name=mirrored_database, type='MirroredDatabase', workspace=workspace) + item_id = fabric.resolve_item_id( + item_name=mirrored_database, type="MirroredDatabase", workspace=workspace + ) client = fabric.FabricRestClient() - response = client.post(f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getMirroringStatus") + response = client.post( + f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getMirroringStatus" + ) if response.status_code != 200: raise FabricHTTPException(response) - - return response.json().get('status', {}) + return response.json().get("status", {}) + + +def get_tables_mirroring_status( + mirrored_database: str, workspace: Optional[str] = None +) -> pd.DataFrame: + """ + Gets the mirroring status of the tables. + + This is a wrapper function for the following API: `Mirroring - Get Tables Mirroring Status `_. + + Parameters + ---------- + mirrored_database: str + Name of the mirrored database. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. -def get_tables_mirroring_status(mirrored_database: str, workspace: Optional[str] = None) -> str: + Returns + ------- + pandas.DataFrame + A pandas dataframe showing the mirroring status of the tables. + """ (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) - item_id = fabric.resolve_item_id(item_name=mirrored_database, type='MirroredDatabase', workspace=workspace) + item_id = fabric.resolve_item_id( + item_name=mirrored_database, type="MirroredDatabase", workspace=workspace + ) client = fabric.FabricRestClient() - response = client.post(f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getTablesMirroringStatus") + response = client.post( + f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getTablesMirroringStatus" + ) if response.status_code != 200: raise FabricHTTPException(response) - - return response.json().get('status', {}) + responses = pagination(client, response) + + df = pd.DataFrame( + columns=[ + "Source Schema Name", + "Source Table Name", + "Status", + "Processed Bytes", + "Processed Rows", + "Last Sync Date", + ] + ) + + for r in responses: + for v in r.get("data", []): + m = v.get("metrics", {}) + new_data = { + "Source Schema Name": v.get("sourceSchemaName"), + "Source Table Name": v.get("sourceTableName"), + "Status": v.get("status"), + "Processed Bytes": m.get("processedBytes") if m is not None else None, + "Processed Rows": m.get("processedRows") if m is not None else None, + "Last Sync Date": m.get("lastSyncDateTime") if m is not None else None, + } + + df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) + + int_cols = ["Processed Bytes", "Processed Rows"] + df[int_cols] = df[int_cols].astype(int) + df["Last Sync Date"] = pd.to_datetime(df["Last Sync Date"]) + + return df + + +def start_mirroring(mirrored_database: str, workspace: Optional[str] = None): + """ + Starts the mirroring for a database. + + This is a wrapper function for the following API: `Mirroring - Start Mirroring `_. + Parameters + ---------- + mirrored_database: str + Name of the mirrored database. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) -def start_mirroring(): + item_id = fabric.resolve_item_id( + item_name=mirrored_database, type="MirroredDatabase", workspace=workspace + ) + client = fabric.FabricRestClient() + response = client.post( + f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/startMirroring" + ) + if response.status_code != 200: + raise FabricHTTPException(response) -def stop_mirroring(): \ No newline at end of file + print( + f"{icons.green_dot} Mirroring has started for the '{mirrored_database}' database within the '{workspace}' workspace." + ) + + +def stop_mirroring(mirrored_database: str, workspace: Optional[str] = None): + """ + Stops the mirroring for a database. + + This is a wrapper function for the following API: `Mirroring - Stop Mirroring `_. + + Parameters + ---------- + mirrored_database: str + Name of the mirrored database. + workspace : str, default=None + The Fabric workspace name. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + + item_id = fabric.resolve_item_id( + item_name=mirrored_database, type="MirroredDatabase", workspace=workspace + ) + + client = fabric.FabricRestClient() + response = client.post( + f"/v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/stopMirroring" + ) + + if response.status_code != 200: + raise FabricHTTPException(response) + + print( + f"{icons.green_dot} Mirroring has stopped for the '{mirrored_database}' database within the '{workspace}' workspace." + ) + + +def get_mirrored_database_definition( + mirrored_database: str, workspace: Optional[str] = None, decode: bool = True +) -> str: + """ + Obtains the mirrored database definition. + + This is a wrapper function for the following API: `Items - Get Mirrored Database Definition `_. + + Parameters + ---------- + mirrored_database : str + The name of the mirrored database. + workspace : str, default=None + The name of the workspace. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + decode : bool, default=True + If True, decodes the mirrored database definition file into .json format. + If False, obtains the mirrored database definition file in base64 format. + + Returns + ------- + str + The mirrored database definition. + """ + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + item_id = fabric.resolve_item_id( + item_name=mirrored_database, type="MirroredDatabase", workspace=workspace + ) + client = fabric.FabricRestClient() + response = client.post( + f"v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/getDefinition", + ) + + result = lro(client, response).json() + df_items = pd.json_normalize(result["definition"]["parts"]) + df_items_filt = df_items[df_items["path"] == "mirroredDatabase.json"] + payload = df_items_filt["payload"].iloc[0] + + if decode: + result = _decode_b64(payload) + else: + result = payload + + return result + + +def update_mirrored_database_definition( + mirrored_database: str, + mirrored_database_content: dict, + workspace: Optional[str] = None, +): + """ + Updates an existing notebook with a new definition. + + Parameters + ---------- + mirrored_database : str + The name of the mirrored database to be created. + mirrored_database_content : dict + The mirrored database definition (not in Base64 format). + workspace : str, default=None + The name of the workspace. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (workspace, workspace_id) = resolve_workspace_name_and_id(workspace) + client = fabric.FabricRestClient() + payload = base64.b64encode(mirrored_database_content) + item_id = fabric.resolve_item_id( + item_name=mirrored_database, type="MirroredDatabase", workspace=workspace + ) + + request_body = { + "displayName": mirrored_database, + "definition": { + "format": "ipynb", + "parts": [ + { + "path": "mirroredDatabase.json", + "payload": payload, + "payloadType": "InlineBase64", + } + ], + }, + } + + response = client.post( + f"v1/workspaces/{workspace_id}/mirroredDatabases/{item_id}/updateDefinition", + json=request_body, + ) + + lro(client, response, return_status_code=True) + + print( + f"{icons.green_dot} The '{mirrored_database}' mirrored database was updated within the '{workspace}' workspace." + ) diff --git a/src/sempy_labs/_notebooks.py b/src/sempy_labs/_notebooks.py index 09dabe99..b9d24222 100644 --- a/src/sempy_labs/_notebooks.py +++ b/src/sempy_labs/_notebooks.py @@ -186,7 +186,7 @@ def update_notebook_definition( Parameters ---------- name : str - The name of the notebook to be created. + The name of the notebook to be updated. notebook_content : str The Jupyter notebook content (not in Base64 format). workspace : str, default=None From 7ffd7887ae76e0608b89ce68a92df8332f7ac4d5 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 11 Nov 2024 13:33:26 +0200 Subject: [PATCH 3/3] fixes per markus --- src/sempy_labs/_mirrored_databases.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/sempy_labs/_mirrored_databases.py b/src/sempy_labs/_mirrored_databases.py index c5b87453..7e271821 100644 --- a/src/sempy_labs/_mirrored_databases.py +++ b/src/sempy_labs/_mirrored_databases.py @@ -60,19 +60,11 @@ def list_mirrored_databases(workspace: Optional[str] = None) -> pd.DataFrame: "Mirrored Database Name": v.get("displayName"), "Mirrored Database Id": v.get("id"), "Description": v.get("description"), - "OneLake Tables Path": ( - prop.get("oneLakeTablesPath") if prop is not None else None - ), - "SQL Endpoint Connection String": ( - sql.get("connectionString") if sql is not None else None - ), - "SQL Endpoint Id": sql.get("id") if sql is not None else None, - "Provisioning Status": ( - sql.get("provisioningStatus") if sql is not None else None - ), - "Default Schema": ( - prop.get("defaultSchema") if prop is not None else None - ), + "OneLake Tables Path": prop.get("oneLakeTablesPath"), + "SQL Endpoint Connection String": sql.get("connectionString"), + "SQL Endpoint Id": sql.get("id"), + "Provisioning Status": sql.get("provisioningStatus"), + "Default Schema": prop.get("defaultSchema"), } df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) @@ -251,9 +243,9 @@ def get_tables_mirroring_status( "Source Schema Name": v.get("sourceSchemaName"), "Source Table Name": v.get("sourceTableName"), "Status": v.get("status"), - "Processed Bytes": m.get("processedBytes") if m is not None else None, - "Processed Rows": m.get("processedRows") if m is not None else None, - "Last Sync Date": m.get("lastSyncDateTime") if m is not None else None, + "Processed Bytes": m.get("processedBytes"), + "Processed Rows": m.get("processedRows"), + "Last Sync Date": m.get("lastSyncDateTime"), } df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True)