From 8bf430d39a2e8c82cfa02440e055087d8d183376 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 24 Nov 2024 14:09:41 +0200 Subject: [PATCH 1/8] added workspace monitoring functions --- src/sempy_labs/_authentication.py | 3 +- src/sempy_labs/_kusto.py | 209 ++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 src/sempy_labs/_kusto.py diff --git a/src/sempy_labs/_authentication.py b/src/sempy_labs/_authentication.py index 5c33dc17..4d0cc31a 100644 --- a/src/sempy_labs/_authentication.py +++ b/src/sempy_labs/_authentication.py @@ -1,6 +1,5 @@ from typing import Literal from sempy.fabric._token_provider import TokenProvider -import notebookutils from azure.identity import ClientSecretCredential @@ -72,6 +71,8 @@ def with_azure_key_vault( sempy.fabric.TokenProvider Token provider to be used with FabricRestClient or PowerBIRestClient. """ + import notebookutils + tenant_id = notebookutils.credentials.getSecret( key_vault_uri, key_vault_tenant_id ) diff --git a/src/sempy_labs/_kusto.py b/src/sempy_labs/_kusto.py new file mode 100644 index 00000000..cfb283d4 --- /dev/null +++ b/src/sempy_labs/_kusto.py @@ -0,0 +1,209 @@ +import sempy.fabric as fabric +import requests +import pandas as pd +from sempy.fabric.exceptions import FabricHTTPException +from sempy._utils._log import log +import sempy_labs._icons as icons +from typing import Optional, List +from uuid import UUID + + +@log +def query_kusto(cluster_uri: str, query: str, database: str) -> pd.DataFrame: + """ + Shows the KQL querysets within a workspace. + + Parameters + ---------- + cluster_uri : str + The Query URI for the KQL database. Example: "https://guid.kusto.fabric.microsoft.com" + query : str + The KQL query. + database : str + The KQL database name. + + Returns + ------- + pandas.DataFrame + A pandas dataframe showing the result of the KQL query. + """ + + import notebookutils + + token = notebookutils.credentials.getToken(cluster_uri) + + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "Accept": "application/json", + } + + payload = {"db": database, "csl": query} + + response = requests.post( + f"{cluster_uri}/v1/rest/query", + headers=headers, + json=payload, + ) + + if response.status_code != 200: + raise FabricHTTPException(response) + + results = response.json() + columns_info = results["Tables"][0]["Columns"] + rows = results["Tables"][0]["Rows"] + + df = pd.DataFrame(rows, columns=[col["ColumnName"] for col in columns_info]) + + for col_info in columns_info: + col_name = col_info["ColumnName"] + data_type = col_info["DataType"] + + try: + if data_type == "DateTime": + df[col_name] = pd.to_datetime(df[col_name]) + elif data_type in ["Int64", "Int32", "Long"]: + df[col_name] = ( + pd.to_numeric(df[col_name], errors="coerce") + .fillna(0) + .astype("int64") + ) + elif data_type == "Real" or data_type == "Double": + df[col_name] = pd.to_numeric(df[col_name], errors="coerce") + else: + # Convert any other type to string, change as needed + df[col_name] = df[col_name].astype(str) + except Exception as e: + print( + f"{icons.yellow_dot} Could not convert column {col_name} to {data_type}, defaulting to string: {str(e)}" + ) + df[col_name] = df[col_name].astype(str) + + return df + + +def semantic_model_logs( + cluster_uri: str, + dataset: Optional[str | List[str]] = None, + workspace: Optional[str | List[str]] = None, + report: Optional[str | UUID | List[str] | List[UUID]] = None, + capacity: Optional[str | List[str]] = None, + operation_name: Optional[str | List[str]] = None, + operation_detail_name: Optional[str | List[str]] = None, + application_name: Optional[str | List[str]] = None, + executing_user: Optional[str | List[str]] = None, + duration_ms: Optional[int] = None, + cpu_time_ms: Optional[int] = None, + timespan: Optional[int] = 1, + timespan_format: Optional[str] = "hour" +) -> pd.DataFrame: + """ + Shows the semantic model logs based on `Workspace Monitoring `_. + + Requirement: Workspace Monitoring must be enabled for the workspace. See the link above for how to enable it. + + Parameters + ---------- + cluster_uri : str + The Query URI for the KQL database. Example: "https://guid.kusto.fabric.microsoft.com" + dataset : str | List[str], default=None + Filter to be applied to the DatasetName column. + workspace : str | List[str], default=None + Filter to be applied to the WorkspaceName column. + report : str | UUID | List[str] | List[UUID], default=None + Filters the output to a report or list of reports. Must specify a single workspace if specifying a report or list of reports. + capacity : str | List[str], default=None + Filters the output to a capacity or list of capacities. + operation_name : str | List[str], default=None + Filters the output to an operation or list of operations. + operation_detail_name : str | List[str], default=None + Filters the output to a detail operation or list of detail operations. + application_name : str | List[str], default=None + Filters the output to an application name. + executing_user : str | List[str], default=None + Filters the ouptut to a user or list of users (email addresses). + duration_ms : int, default=None + Filter to be applied to the Duration (milliseconds) column. + cpu_time_ms : int, default=None + Filter to be applied to the CPU Time (milliseconds) column. + timespan : int, default=1, + The timespan (use in conjunction with the timespan_format). + timespan_format : str, default="hour" + The timespan format. Valid options: "day", "hour", "minute". + + Returns + ------- + pandas.DataFrame + A pandas dataframe showing the semantic model logs based on the filters provided. + """ + + timespan_format = timespan_format.lower() + if timespan_format.startswith('h'): + timespan_format = 'h' + elif timespan_format.startswith('m'): + timespan_format = 'm' + elif timespan_format.startswith('d'): + timespan_format = 'd' + else: + raise ValueError(f"{icons.red_dot} The '{timespan_format} timespan_format is not supported. Only 'day, 'hour', and 'minute' are supported.") + + if report is not None and (workspace is None or not isinstance(workspace, str)): + raise ValueError(f"{icons.red_dot} A report or list of reports may only be specified if a single workspace is specified.") + + query = "SemanticModelLogs" + query += f"\n| where Timestamp > ago({timespan}{timespan_format})" + + report_json_filter = "tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].ReportId)" + visual_json_filter = "tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].VisualId)" + return_columns = ['Timestamp', 'OperationName', 'OperationDetailName', 'ItemName', 'WorkspaceId', 'WorkspaceName', 'CapacityId', 'DurationMs', 'CpuTimeMs', 'EventText', 'Status', 'ReportId', 'VisualId', 'ApplicationName', 'ExecutingUser',] + + param_dict = { + "dataset": "ItemName", + "workspace": "WorkspaceName", + "operation_name": "OperationName", + "operation_detail_name": "OperationDetailName", + "application_name": "ApplicationName", + "duration_ms": "DurationMs", + "cpu_time_ms": "CpuTimeMs", + "executing_user": "ExecutingUser", + } + + if capacity is not None: + dfC = fabric.list_capacities() + if isinstance(capacity, str): + capacity = [capacity] + capacities = dfC[dfC['Display Name'].isin(capacity)]['Id'].tolist() + if len(capacities) > 0: + comma_delimited_string = ', '.join(f'"{item}"' for item in capacities) + query += f'\nand CapacityId in ({comma_delimited_string})' + + if report is not None: + dfR = fabric.list_reports(workspace=workspace) + if isinstance(report, str): + report = [report] + reports = dfR[dfR['Name'].isin(report)]['Id'].tolist() + reports = reports + dfR[dfR['Id'].isin(report)]['Id'].tolist() + if len(reports) > 0: + comma_delimited_string = ', '.join(f'"{item}"' for item in reports) + query += f'\nand {report_json_filter} in ({comma_delimited_string})' + + def _add_to_filter(parameter, filter_name, query): + if parameter is not None: + if isinstance(parameter, str): + parameter = [parameter] + comma_delimited_string = ', '.join(f'"{item}"' for item in parameter) + query += f'\nand {filter_name} in ({comma_delimited_string})' + return query + + for param, filter_name in param_dict.items(): + query = _add_to_filter(parameter=locals()[param], filter_name=filter_name, query=query) + + query += f"\n| extend ReportId = {report_json_filter}, VisualId = {visual_json_filter}" + + # Add columns to return + return_cols = ", ".join(return_columns) + query += f"\n| project {return_cols}" + + return query_kusto( + cluster_uri=cluster_uri, query=query, database="Monitoring Eventhouse" + ) From 7e0609b60de0e408e07920167b5c357bca8111a8 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 25 Nov 2024 10:26:44 +0200 Subject: [PATCH 2/8] added admin.list-reports --- src/sempy_labs/_kusto.py | 74 +++++++++++++++------- src/sempy_labs/admin/__init__.py | 2 + src/sempy_labs/admin/_basic_functions.py | 79 ++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 23 deletions(-) diff --git a/src/sempy_labs/_kusto.py b/src/sempy_labs/_kusto.py index cfb283d4..a2550f45 100644 --- a/src/sempy_labs/_kusto.py +++ b/src/sempy_labs/_kusto.py @@ -95,7 +95,7 @@ def semantic_model_logs( duration_ms: Optional[int] = None, cpu_time_ms: Optional[int] = None, timespan: Optional[int] = 1, - timespan_format: Optional[str] = "hour" + timespan_format: Optional[str] = "hour", ) -> pd.DataFrame: """ Shows the semantic model logs based on `Workspace Monitoring `_. @@ -138,24 +138,48 @@ def semantic_model_logs( """ timespan_format = timespan_format.lower() - if timespan_format.startswith('h'): - timespan_format = 'h' - elif timespan_format.startswith('m'): - timespan_format = 'm' - elif timespan_format.startswith('d'): - timespan_format = 'd' + if timespan_format.startswith("h"): + timespan_format = "h" + elif timespan_format.startswith("m"): + timespan_format = "m" + elif timespan_format.startswith("d"): + timespan_format = "d" else: - raise ValueError(f"{icons.red_dot} The '{timespan_format} timespan_format is not supported. Only 'day, 'hour', and 'minute' are supported.") + raise ValueError( + f"{icons.red_dot} The '{timespan_format} timespan_format is not supported. Only 'day, 'hour', and 'minute' are supported." + ) if report is not None and (workspace is None or not isinstance(workspace, str)): - raise ValueError(f"{icons.red_dot} A report or list of reports may only be specified if a single workspace is specified.") + raise ValueError( + f"{icons.red_dot} A report or list of reports may only be specified if a single workspace is specified." + ) query = "SemanticModelLogs" query += f"\n| where Timestamp > ago({timespan}{timespan_format})" - report_json_filter = "tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].ReportId)" - visual_json_filter = "tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].VisualId)" - return_columns = ['Timestamp', 'OperationName', 'OperationDetailName', 'ItemName', 'WorkspaceId', 'WorkspaceName', 'CapacityId', 'DurationMs', 'CpuTimeMs', 'EventText', 'Status', 'ReportId', 'VisualId', 'ApplicationName', 'ExecutingUser',] + report_json_filter = ( + "tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].ReportId)" + ) + visual_json_filter = ( + "tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].VisualId)" + ) + return_columns = [ + "Timestamp", + "OperationName", + "OperationDetailName", + "ItemName", + "WorkspaceId", + "WorkspaceName", + "CapacityId", + "DurationMs", + "CpuTimeMs", + "EventText", + "Status", + "ReportId", + "VisualId", + "ApplicationName", + "ExecutingUser", + ] param_dict = { "dataset": "ItemName", @@ -172,33 +196,37 @@ def semantic_model_logs( dfC = fabric.list_capacities() if isinstance(capacity, str): capacity = [capacity] - capacities = dfC[dfC['Display Name'].isin(capacity)]['Id'].tolist() + capacities = dfC[dfC["Display Name"].isin(capacity)]["Id"].tolist() if len(capacities) > 0: - comma_delimited_string = ', '.join(f'"{item}"' for item in capacities) - query += f'\nand CapacityId in ({comma_delimited_string})' + comma_delimited_string = ", ".join(f'"{item}"' for item in capacities) + query += f"\nand CapacityId in ({comma_delimited_string})" if report is not None: dfR = fabric.list_reports(workspace=workspace) if isinstance(report, str): report = [report] - reports = dfR[dfR['Name'].isin(report)]['Id'].tolist() - reports = reports + dfR[dfR['Id'].isin(report)]['Id'].tolist() + reports = dfR[dfR["Name"].isin(report)]["Id"].tolist() + reports = reports + dfR[dfR["Id"].isin(report)]["Id"].tolist() if len(reports) > 0: - comma_delimited_string = ', '.join(f'"{item}"' for item in reports) - query += f'\nand {report_json_filter} in ({comma_delimited_string})' + comma_delimited_string = ", ".join(f'"{item}"' for item in reports) + query += f"\nand {report_json_filter} in ({comma_delimited_string})" def _add_to_filter(parameter, filter_name, query): if parameter is not None: if isinstance(parameter, str): parameter = [parameter] - comma_delimited_string = ', '.join(f'"{item}"' for item in parameter) - query += f'\nand {filter_name} in ({comma_delimited_string})' + comma_delimited_string = ", ".join(f'"{item}"' for item in parameter) + query += f"\nand {filter_name} in ({comma_delimited_string})" return query for param, filter_name in param_dict.items(): - query = _add_to_filter(parameter=locals()[param], filter_name=filter_name, query=query) + query = _add_to_filter( + parameter=locals()[param], filter_name=filter_name, query=query + ) - query += f"\n| extend ReportId = {report_json_filter}, VisualId = {visual_json_filter}" + query += ( + f"\n| extend ReportId = {report_json_filter}, VisualId = {visual_json_filter}" + ) # Add columns to return return_cols = ", ".join(return_columns) diff --git a/src/sempy_labs/admin/__init__.py b/src/sempy_labs/admin/__init__.py index 10e65016..e54b6811 100644 --- a/src/sempy_labs/admin/__init__.py +++ b/src/sempy_labs/admin/__init__.py @@ -14,6 +14,7 @@ list_items, list_activity_events, list_modified_workspaces, + list_reports, ) from sempy_labs.admin._domains import ( list_domains, @@ -54,4 +55,5 @@ "revoke_external_data_share", "list_activity_events", "list_modified_workspaces", + "list_reports", ] diff --git a/src/sempy_labs/admin/_basic_functions.py b/src/sempy_labs/admin/_basic_functions.py index 6cabc1df..fdf7de04 100644 --- a/src/sempy_labs/admin/_basic_functions.py +++ b/src/sempy_labs/admin/_basic_functions.py @@ -1077,3 +1077,82 @@ def list_modified_workspaces( df = pd.DataFrame(response.json()).rename(columns={"id": "Workspace Id"}) return df + + +def list_reports( + top: Optional[int] = None, skip: Optional[int] = None, filter: Optional[str] = None +) -> pd.DataFrame: + """ + Shows a list of reports for the organization. + + This is a wrapper function for the following API: `Admin - Reports GetReportsAsAdmin `_. + + Returns + ------- + pandas.DataFrame + A pandas dataframe showing a list of reports for the organization. + """ + + df = pd.DataFrame( + columns=[ + "Report Id", + "Report Name", + "Type", + "Web URL", + "Embed URL", + "Dataset Id", + "Created Date", + "Modified Date", + "Created By", + "Modified By", + "Sensitivity Label Id", + "Users", + "Subscriptions", + "Workspace Id", + "Report Flags", + ] + ) + + url = "/v1.0/myorg/admin/reports?" + if top is not None: + url += f"$top={top}&" + if skip is not None: + url += f"$skip={skip}&" + if filter is not None: + url += f"$filter={filter}&" + + url.rstrip("$").rstrip("?") + + client = fabric.PowerBIRestClient() + response = client.get(url) + + if response.status_code != 200: + raise FabricHTTPException(response) + + for v in response.json().get("value", []): + new_data = { + "Report Id": v.get("id"), + "Report Name": v.get("name"), + "Type": v.get("reportType"), + "Web URL": v.get("webUrl"), + "Embed URL": v.get("embedUrl"), + "Dataset Id": v.get("datasetId"), + "Created Date": v.get("createdDateTime"), + "Modified Date": v.get("modifiedDateTime"), + "Created By": v.get("createdBy"), + "Modified By": v.get("modifiedBy"), + "Sensitivity Label Id": v.get("sensitivityLabel", {}).get("labelId"), + "Users": v.get("users"), + "Subscriptions": v.get("subscriptions"), + "Workspace Id": v.get("workspaceId"), + "Report Flags": v.get("reportFlags"), + } + df = pd.concat([df, pd.DataFrame([new_data])], ignore_index=True) + + int_cols = ["Report Flags"] + df[int_cols] = df[int_cols].astype(int) + + df["Created Date"] = pd.to_datetime(df["Created Date"], errors="coerce") + df["Modified Date"] = pd.to_datetime(df["Modified Date"], errors="coerce") + + return df From 3e753940832b884315a89ece98c620a2c835fbec Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 25 Nov 2024 13:15:19 +0200 Subject: [PATCH 3/8] added model_mode to TOM, get cluster_uri by default --- src/sempy_labs/_kusto.py | 16 ++++++++++++++-- src/sempy_labs/tom/_model.py | 26 ++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/sempy_labs/_kusto.py b/src/sempy_labs/_kusto.py index a2550f45..07fa5296 100644 --- a/src/sempy_labs/_kusto.py +++ b/src/sempy_labs/_kusto.py @@ -6,6 +6,7 @@ import sempy_labs._icons as icons from typing import Optional, List from uuid import UUID +from sempy_labs._helper_functions import resolve_workspace_name_and_id @log @@ -83,7 +84,7 @@ def query_kusto(cluster_uri: str, query: str, database: str) -> pd.DataFrame: def semantic_model_logs( - cluster_uri: str, + cluster_uri: Optional[str] = None, dataset: Optional[str | List[str]] = None, workspace: Optional[str | List[str]] = None, report: Optional[str | UUID | List[str] | List[UUID]] = None, @@ -104,7 +105,7 @@ def semantic_model_logs( Parameters ---------- - cluster_uri : str + cluster_uri : str, default=None The Query URI for the KQL database. Example: "https://guid.kusto.fabric.microsoft.com" dataset : str | List[str], default=None Filter to be applied to the DatasetName column. @@ -137,6 +138,17 @@ def semantic_model_logs( A pandas dataframe showing the semantic model logs based on the filters provided. """ + from sempy_labs._kql_databases import list_kql_databases + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + + if cluster_uri is None: + dfK = list_kql_databases(workspace=workspace) + dfK_filt = dfK[dfK['KQL Database Name'] == 'Monitoring KQL database'] + if len(dfK_filt) == 0: + raise ValueError(f"{icons.red_dot} Workspace monitoring is not set up for the '{workspace_name}' workspace.") + cluster_uri = dfK_filt['Query Service URI'].iloc[0] + timespan_format = timespan_format.lower() if timespan_format.startswith("h"): timespan_format = "h" diff --git a/src/sempy_labs/tom/_model.py b/src/sempy_labs/tom/_model.py index 3ea68ed5..80a88e1d 100644 --- a/src/sempy_labs/tom/_model.py +++ b/src/sempy_labs/tom/_model.py @@ -4515,6 +4515,32 @@ def set_value_filter_behavior(self, value_filter_behavior: str = "Automatic"): TOM.ValueFilterBehaviorType, value_filter_behavior ) + def model_mode(self) -> str: + """ + Obtains the semantic model mode. + + Returns + ------- + str + The semantic model mode. + """ + + import Microsoft.AnalysisServices.Tabular as TOM + + if self.is_direct_lake(): + return 'Direct Lake' + + partitions = list(self.all_partitions()) + modes = {p.Mode for p in partitions} + + if all(mode == TOM.ModeType.Import for mode in modes): + return 'Import' + elif all(mode in {TOM.ModeType.DirectQuery, TOM.ModeType.Dual} for mode in modes): + return 'DirectQuery' + else: + return 'Composite' + # TOM.ModeType.DirectQuery in modes and TOM.ModeType.Import in modes: + def close(self): if not self._readonly and self.model is not None: From bad476197e6ecdf41b48287e732310261bec11ce Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 25 Nov 2024 13:19:41 +0200 Subject: [PATCH 4/8] timespan_literal --- src/sempy_labs/_kusto.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/sempy_labs/_kusto.py b/src/sempy_labs/_kusto.py index 07fa5296..af548cd0 100644 --- a/src/sempy_labs/_kusto.py +++ b/src/sempy_labs/_kusto.py @@ -96,7 +96,7 @@ def semantic_model_logs( duration_ms: Optional[int] = None, cpu_time_ms: Optional[int] = None, timespan: Optional[int] = 1, - timespan_format: Optional[str] = "hour", + timespan_literal: Optional[str] = "hour", ) -> pd.DataFrame: """ Shows the semantic model logs based on `Workspace Monitoring `_. @@ -129,8 +129,8 @@ def semantic_model_logs( Filter to be applied to the CPU Time (milliseconds) column. timespan : int, default=1, The timespan (use in conjunction with the timespan_format). - timespan_format : str, default="hour" - The timespan format. Valid options: "day", "hour", "minute". + timespan_literal : str, default="hour" + The timespan literal format. Valid options: "day", "hour", "minute". Returns ------- @@ -149,16 +149,16 @@ def semantic_model_logs( raise ValueError(f"{icons.red_dot} Workspace monitoring is not set up for the '{workspace_name}' workspace.") cluster_uri = dfK_filt['Query Service URI'].iloc[0] - timespan_format = timespan_format.lower() - if timespan_format.startswith("h"): - timespan_format = "h" - elif timespan_format.startswith("m"): - timespan_format = "m" - elif timespan_format.startswith("d"): - timespan_format = "d" + timespan_literal = timespan_literal.lower() + if timespan_literal.startswith("h"): + timespan_literal = "h" + elif timespan_literal.startswith("m"): + timespan_literal = "m" + elif timespan_literal.startswith("d"): + timespan_literal = "d" else: raise ValueError( - f"{icons.red_dot} The '{timespan_format} timespan_format is not supported. Only 'day, 'hour', and 'minute' are supported." + f"{icons.red_dot} The '{timespan_literal} timespan_format is not supported. Only 'day, 'hour', and 'minute' are supported." ) if report is not None and (workspace is None or not isinstance(workspace, str)): @@ -166,8 +166,7 @@ def semantic_model_logs( f"{icons.red_dot} A report or list of reports may only be specified if a single workspace is specified." ) - query = "SemanticModelLogs" - query += f"\n| where Timestamp > ago({timespan}{timespan_format})" + query = f"SemanticModelLogs\n| where Timestamp > ago({timespan}{timespan_literal})" report_json_filter = ( "tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].ReportId)" From 5161153f367b05ba9d3ff20eb82935228103986f Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 25 Nov 2024 13:23:08 +0200 Subject: [PATCH 5/8] float --- src/sempy_labs/_kusto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sempy_labs/_kusto.py b/src/sempy_labs/_kusto.py index af548cd0..4f40161a 100644 --- a/src/sempy_labs/_kusto.py +++ b/src/sempy_labs/_kusto.py @@ -95,7 +95,7 @@ def semantic_model_logs( executing_user: Optional[str | List[str]] = None, duration_ms: Optional[int] = None, cpu_time_ms: Optional[int] = None, - timespan: Optional[int] = 1, + timespan: Optional[int | float] = 1, timespan_literal: Optional[str] = "hour", ) -> pd.DataFrame: """ From 3f861229c8c17ac58a27ffc16451e780b207e06e Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 28 Nov 2024 10:28:25 +0200 Subject: [PATCH 6/8] literal --- src/sempy_labs/_kusto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sempy_labs/_kusto.py b/src/sempy_labs/_kusto.py index 4f40161a..bc55310f 100644 --- a/src/sempy_labs/_kusto.py +++ b/src/sempy_labs/_kusto.py @@ -158,7 +158,7 @@ def semantic_model_logs( timespan_literal = "d" else: raise ValueError( - f"{icons.red_dot} The '{timespan_literal} timespan_format is not supported. Only 'day, 'hour', and 'minute' are supported." + f"{icons.red_dot} The '{timespan_literal} timespan_literal is not supported. Only 'day, 'hour', and 'minute' are supported." ) if report is not None and (workspace is None or not isinstance(workspace, str)): From 9f5bc248ebc80dc333955269e6a5bb6be40afb82 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 28 Nov 2024 12:00:55 +0200 Subject: [PATCH 7/8] added save_semantic_model_logs --- src/sempy_labs/_kusto.py | 93 ++++++++++++++++++++++++++++++++++-- src/sempy_labs/tom/_model.py | 12 +++-- 2 files changed, 96 insertions(+), 9 deletions(-) diff --git a/src/sempy_labs/_kusto.py b/src/sempy_labs/_kusto.py index bc55310f..c004aef8 100644 --- a/src/sempy_labs/_kusto.py +++ b/src/sempy_labs/_kusto.py @@ -6,7 +6,11 @@ import sempy_labs._icons as icons from typing import Optional, List from uuid import UUID -from sempy_labs._helper_functions import resolve_workspace_name_and_id +from sempy_labs._helper_functions import ( + resolve_workspace_name_and_id, + create_abfss_path, + save_as_delta_table, +) @log @@ -144,10 +148,12 @@ def semantic_model_logs( if cluster_uri is None: dfK = list_kql_databases(workspace=workspace) - dfK_filt = dfK[dfK['KQL Database Name'] == 'Monitoring KQL database'] + dfK_filt = dfK[dfK["KQL Database Name"] == "Monitoring KQL database"] if len(dfK_filt) == 0: - raise ValueError(f"{icons.red_dot} Workspace monitoring is not set up for the '{workspace_name}' workspace.") - cluster_uri = dfK_filt['Query Service URI'].iloc[0] + raise ValueError( + f"{icons.red_dot} Workspace monitoring is not set up for the '{workspace_name}' workspace." + ) + cluster_uri = dfK_filt["Query Service URI"].iloc[0] timespan_literal = timespan_literal.lower() if timespan_literal.startswith("h"): @@ -246,3 +252,82 @@ def _add_to_filter(parameter, filter_name, query): return query_kusto( cluster_uri=cluster_uri, query=query, database="Monitoring Eventhouse" ) + + +def _resolve_cluster_uri(workspace: Optional[str] = None) -> str: + + from sempy_labs._kql_databases import list_kql_databases + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + dfK = list_kql_databases(workspace=workspace) + dfK_filt = dfK[dfK["KQL Database Name"] == "Monitoring KQL database"] + if len(dfK_filt) == 0: + raise ValueError( + f"{icons.red_dot} Workspace monitoring is not set up for the '{workspace_name}' workspace." + ) + cluster_uri = dfK_filt["Query Service URI"].iloc[0] + + return cluster_uri + + +def save_semantic_model_logs( + workspace: Optional[str] = None, + frequency: int = 1, + frequency_literal: str = "hour", +): + + from sempy_labs.lakehouse import get_lakehouse_tables + from pyspark.sql import SparkSession + + delta_table_name = "SLL_SemanticModelLogs" + cluster_uri = _resolve_cluster_uri(workspace=workspace) + + query = f""" + let StartDate = datetime_add('{frequency_literal}', -{frequency}, bin(now(), 1h)); + let EndDate = bin(now(), 1h); + //let StartDate = datetime_add('day', -4, bin(now(), 1h)); + //let EndDate = bin(now(), 1h); + SemanticModelLogs + | where Timestamp between (StartDate .. EndDate) + | extend ReportId = tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].ReportId) + | extend VisualId = tostring(parse_json(dynamic_to_json(ApplicationContext)).Sources[0].VisualId) + | extend UniqueId = hash(strcat((Timestamp), "_", OperationName, "_", OperationDetailName)) + """ + + df = query_kusto( + cluster_uri=cluster_uri, database="Monitoring Eventhouse", query=query + ) + if df.empty: + print(f"{icons.yellow_dot} No logs to capture in this time period.") + return + + lakehouse_id = fabric.get_lakehouse_id() + lakehouse_workspace_id = fabric.get_workspace_id() + path = create_abfss_path( + lakehouse_id, lakehouse_workspace_id, delta_table_name=delta_table_name + ) + + dfLT = get_lakehouse_tables() + dfLT_filt = dfLT[dfLT["Table Name"] == delta_table_name] + + if len(dfLT_filt) == 1: + spark = SparkSession.builder.getOrCreate() + existing_df = spark.read.format("delta").load(path) + df_spark = spark.createDataFrame(df) + + # Filter out rows that already exist in the Delta table + incremental_df = df_spark.join(existing_df, "UniqueId", "left_anti") + else: + incremental_df = df_spark + + incremental_df_pandas = incremental_df.toPandas() + + # If the resulting DataFrame is not empty, save to Delta table + if not incremental_df_pandas.empty: + save_as_delta_table( + dataframe=incremental_df_pandas, # Use the filtered DataFrame here + write_mode="append", + delta_table_name=delta_table_name, + ) + else: + print(f"{icons.yellow_dot} No new logs to capture in this time period.") diff --git a/src/sempy_labs/tom/_model.py b/src/sempy_labs/tom/_model.py index 80a88e1d..908e6274 100644 --- a/src/sempy_labs/tom/_model.py +++ b/src/sempy_labs/tom/_model.py @@ -4528,17 +4528,19 @@ def model_mode(self) -> str: import Microsoft.AnalysisServices.Tabular as TOM if self.is_direct_lake(): - return 'Direct Lake' + return "Direct Lake" partitions = list(self.all_partitions()) modes = {p.Mode for p in partitions} if all(mode == TOM.ModeType.Import for mode in modes): - return 'Import' - elif all(mode in {TOM.ModeType.DirectQuery, TOM.ModeType.Dual} for mode in modes): - return 'DirectQuery' + return "Import" + elif all( + mode in {TOM.ModeType.DirectQuery, TOM.ModeType.Dual} for mode in modes + ): + return "DirectQuery" else: - return 'Composite' + return "Composite" # TOM.ModeType.DirectQuery in modes and TOM.ModeType.Import in modes: def close(self): From 1d9cdda19961d21dd50160d2e243a302681cb011 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 23 Dec 2024 14:45:03 +0200 Subject: [PATCH 8/8] updated resolve cluster uri --- src/sempy_labs/_kql_databases.py | 15 +++++++++++++++ src/sempy_labs/_kusto.py | 27 ++++++--------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/sempy_labs/_kql_databases.py b/src/sempy_labs/_kql_databases.py index 3999b501..4a65dcd9 100644 --- a/src/sempy_labs/_kql_databases.py +++ b/src/sempy_labs/_kql_databases.py @@ -8,6 +8,7 @@ pagination, ) from sempy.fabric.exceptions import FabricHTTPException +from uuid import UUID def list_kql_databases(workspace: Optional[str] = None) -> pd.DataFrame: @@ -138,3 +139,17 @@ def delete_kql_database(name: str, workspace: Optional[str] = None): print( f"{icons.green_dot} The '{name}' KQL database within the '{workspace}' workspace has been deleted." ) + + +def _resolve_cluster_uri(workspace: Optional[str | UUID] = None) -> str: + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + dfK = list_kql_databases(workspace=workspace_id) + dfK_filt = dfK[dfK["KQL Database Name"] == "Monitoring KQL database"] + if len(dfK_filt) == 0: + raise ValueError( + f"{icons.red_dot} Workspace monitoring is not set up for the '{workspace_name}' workspace." + ) + cluster_uri = dfK_filt["Query Service URI"].iloc[0] + + return cluster_uri diff --git a/src/sempy_labs/_kusto.py b/src/sempy_labs/_kusto.py index c004aef8..95c2694d 100644 --- a/src/sempy_labs/_kusto.py +++ b/src/sempy_labs/_kusto.py @@ -11,6 +11,7 @@ create_abfss_path, save_as_delta_table, ) +from sempy_labs._kql_databases import _resolve_cluster_uri @log @@ -90,7 +91,7 @@ def query_kusto(cluster_uri: str, query: str, database: str) -> pd.DataFrame: def semantic_model_logs( cluster_uri: Optional[str] = None, dataset: Optional[str | List[str]] = None, - workspace: Optional[str | List[str]] = None, + workspace: Optional[str | UUID] = None, report: Optional[str | UUID | List[str] | List[UUID]] = None, capacity: Optional[str | List[str]] = None, operation_name: Optional[str | List[str]] = None, @@ -113,7 +114,7 @@ def semantic_model_logs( The Query URI for the KQL database. Example: "https://guid.kusto.fabric.microsoft.com" dataset : str | List[str], default=None Filter to be applied to the DatasetName column. - workspace : str | List[str], default=None + workspace : str | UUID, default=None Filter to be applied to the WorkspaceName column. report : str | UUID | List[str] | List[UUID], default=None Filters the output to a report or list of reports. Must specify a single workspace if specifying a report or list of reports. @@ -147,7 +148,7 @@ def semantic_model_logs( (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) if cluster_uri is None: - dfK = list_kql_databases(workspace=workspace) + dfK = list_kql_databases(workspace=workspace_id) dfK_filt = dfK[dfK["KQL Database Name"] == "Monitoring KQL database"] if len(dfK_filt) == 0: raise ValueError( @@ -200,7 +201,7 @@ def semantic_model_logs( param_dict = { "dataset": "ItemName", - "workspace": "WorkspaceName", + "workspace_name": "WorkspaceName", "operation_name": "OperationName", "operation_detail_name": "OperationDetailName", "application_name": "ApplicationName", @@ -254,24 +255,8 @@ def _add_to_filter(parameter, filter_name, query): ) -def _resolve_cluster_uri(workspace: Optional[str] = None) -> str: - - from sempy_labs._kql_databases import list_kql_databases - - (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - dfK = list_kql_databases(workspace=workspace) - dfK_filt = dfK[dfK["KQL Database Name"] == "Monitoring KQL database"] - if len(dfK_filt) == 0: - raise ValueError( - f"{icons.red_dot} Workspace monitoring is not set up for the '{workspace_name}' workspace." - ) - cluster_uri = dfK_filt["Query Service URI"].iloc[0] - - return cluster_uri - - def save_semantic_model_logs( - workspace: Optional[str] = None, + workspace: Optional[str | UUID] = None, frequency: int = 1, frequency_literal: str = "hour", ):