diff --git a/agent/database.py b/agent/database.py index 018e069f..c8596227 100644 --- a/agent/database.py +++ b/agent/database.py @@ -153,6 +153,273 @@ def modify_user_permissions(self, username: str, mode: str, permissions: dict | self._run_sql(queries_str, commit=True, allow_all_stmt_types=True) + def fetch_database_table_sizes(self) -> dict: + data = self._run_sql( + f"SELECT table_name, data_length, index_length FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA" + f"='{self.database_name}'", + as_dict=True, + ) + if len(data) == 0: + return [] + data = data[0]["output"] + tables = {} + for d in data: + tables[d["table_name"]] = { + "data_length": int(d["data_length"]), + "index_length": int(d["index_length"]), + "total_size": int(d["data_length"]) + int(d["index_length"]), + } + d["data_length"] = int(d["data_length"]) + d["index_length"] = int(d["index_length"]) + d["total_size"] = d["data_length"] + d["index_length"] + return tables + + def fetch_database_table_schema(self, include_index_info: bool = True): + index_info = {} + index_usage_info = {} + data = self._run_sql( + f"""SELECT + TABLE_NAME AS `table`, + COLUMN_NAME AS `column`, + DATA_TYPE AS `data_type`, + IS_NULLABLE AS `is_nullable`, + COLUMN_DEFAULT AS `default` + FROM + INFORMATION_SCHEMA.COLUMNS + WHERE + TABLE_SCHEMA='{self.database_name}'; + """, + as_dict=True, + ) + if len(data) == 0: + return {} + data = data[0]["output"] + tables = {} # : [, , ...] + + if include_index_info: + index_info = self.fetch_database_table_indexes() + index_usage_info = self.fetch_database_table_index_usage() + + for record in data: + if record["table"] not in tables: + tables[record["table"]] = [] + indexes = index_info.get(record["table"], {}).get(record["column"], []) + column_index_usage = {} + for index in indexes: + column_index_usage[index] = index_usage_info.get(record["table"], {}).get(index, 0) + + tables[record["table"]].append( + { + "column": record["column"], + "data_type": record["data_type"], + "is_nullable": record["is_nullable"] == "YES", + "default": record["default"], + "index_info": { + "is_indexed": len(indexes) > 0, + "indexes": indexes, + "index_usage": column_index_usage, + }, + } + ) + return tables + + def fetch_database_table_indexes(self): + data = self._run_sql( + f""" + SELECT + TABLE_NAME AS `table`, + COLUMN_NAME AS `column`, + INDEX_NAME AS `index` + FROM + INFORMATION_SCHEMA.STATISTICS + WHERE + TABLE_SCHEMA='{self.database_name}' + """, + as_dict=True, + ) + if len(data) == 0: + return {} + data = data[0]["output"] + tables = {} # : { : [, , ...] } + for record in data: + if record["table"] not in tables: + tables[record["table"]] = {} + if record["column"] not in tables[record["table"]]: + tables[record["table"]][record["column"]] = [] + tables[record["table"]][record["column"]].append(record["index"]) + return tables + + def fetch_database_table_index_usage(self): + data = self._run_sql( + f""" + SELECT + TABLE_NAME AS `table`, + INDEX_NAME AS `index`, + ROWS_READ AS `rows_read` + FROM + INFORMATION_SCHEMA.INDEX_STATISTICS + WHERE + TABLE_SCHEMA='{self.database_name}' + """, + as_dict=True, + ) + if len(data) == 0: + return {} + data = data[0]["output"] + tables = {} # : { : } + for record in data: + if record["table"] not in tables: + tables[record["table"]] = {} + tables[record["table"]][record["index"]] = int(record["rows_read"]) + return tables + + def explain_query(self, query) -> list: + result = self._run_sql(query=query, as_dict=True) + return result[0]["output"] + + def explain_queries(self, queries: list) -> list: + sql_query = "" + for query in queries: + sql_query += f"EXPLAIN {query};\n" + result = self._run_sql(query=sql_query, as_dict=True) + data = {} + for record in result: + data[record["query"]] = record["output"] + return data + + def fetch_database_column_statistics(self, table): + """Get various stats about columns in a table. + + Refer: + - https://mariadb.com/kb/en/engine-independent-table-statistics/ + - https://mariadb.com/kb/en/mysqlcolumn_stats-table/ + """ + self._run_sql( + f"ANALYZE TABLE `{self.database_name}`.`{table}` PERSISTENT FOR ALL", + ) + + results = self._run_sql( + f""" + SELECT + column_name, nulls_ratio, avg_length, avg_frequency, + decode_histogram(hist_type,histogram) as histogram + FROM + mysql.column_stats + WHERE + db_name='{self.database_name}' + and table_name='{table}' + """, + as_dict=True, + ) + if len(results) == 0: + raise Exception("Failed to fetch column stats") + result = results[0]["output"] + + for row in results: + for column in ["nulls_ratio", "avg_length", "avg_frequency"]: + row[column] = float(row[column]) if row[column] else None + + return result + + def fetch_summarized_performance_report(self): + queries = f""" +-- Top 10 time consuming queries; +SELECT + (SUM_TIMER_WAIT / SUM(SUM_TIMER_WAIT) OVER() * 100) AS percent, + round(SUM_TIMER_WAIT/1000000000, 1) AS total_time_ms, + COUNT_STAR AS calls, + round(AVG_TIMER_WAIT/1000000000, 1) AS avg_time_ms, + DIGEST_TEXT AS query +FROM performance_schema.events_statements_summary_by_digest + WHERE SCHEMA_NAME='{self.database_name}' + ORDER BY SUM_TIMER_WAIT DESC + LIMIT 10; + +-- Top 10 queries with full table scans; +SELECT t1.exec_count AS calls, + t1.rows_examined AS rows_examined, + t1.rows_sent AS rows_sent, + t1.query AS query, + t2.DIGEST_TEXT AS example +FROM sys.statements_with_full_table_scans AS t1 +LEFT JOIN ( + SELECT DIGEST, FIRST_VALUE(DIGEST_TEXT) OVER (PARTITION BY DIGEST ORDER BY RAND()) AS DIGEST_TEXT + FROM performance_schema.events_statements_summary_by_digest +) AS t2 ON t1.digest = t2.DIGEST +ORDER BY rows_examined DESC +LIMIT 10; + +-- Unused Indexes; +SELECT + index_name, + object_name AS table_name +FROM + sys.schema_unused_indexes +WHERE + object_schema='{self.database_name}'; + +-- Redundant Indexes; +SELECT + table_name, + redundant_index_name, + redundant_index_columns, + dominant_index_name, + dominant_index_columns +FROM + sys.schema_redundant_indexes +WHERE + table_schema='{self.database_name}'; +""" + + result = self._run_sql(queries, as_dict=True) + return { + "top_10_time_consuming_queries": result[0]["output"], + "top_10_queries_with_full_table_scan": result[1]["output"], + "unused_indexes": result[2]["output"], + "redundant_indexes": result[3]["output"], + } + + def fetch_process_list(self): + result = self._run_sql("SHOW FULL PROCESSLIST", as_dict=True) + if len(result) == 0: + return [] + return [ + { + "id": str(record["Id"]), + "command": record["Command"], + "query": record["Info"], + "state": record["State"], + "time": record["Time"], + "db_user": record["User"], + "db_user_host": record["Host"].split(":")[0], + } + for record in result[0]["output"] + if record["db"] == self.database_name + ] + + def kill_process(self, pid: str): + with contextlib.suppress(Exception): + processes = self.fetch_process_list() + """ + It's important to validate whether the pid belongs to the current database + As we are running it as `root` user, it can be possible to kill processes from other databases + by forging the request + """ + isFound = False + for process in processes: + if process.get("id") == pid: + isFound = True + break + if not isFound: + return + """ + The query can fail if the process is already killed. + Anyway we need to reload the process list after killing to verify if the process is killed. + + We can safely ignore the exception + """ + self._run_sql(f"KILL {pid}") + # Private helper methods def _run_sql( # noqa C901 self, query: str, commit: bool = False, as_dict: bool = False, allow_all_stmt_types: bool = False @@ -164,7 +431,7 @@ def _run_sql( # noqa C901 Args: query: SQL query string commit: True if you want to commit the changes. If commit is false, it will rollback the changes and - also wouldnt allow to run ddl, dcl or tcl queries + also wouldn't allow to run ddl, dcl or tcl queries as_dict: True if you want to return the result as a dictionary (like frappe.db.sql). Otherwise it will return a dict of columns and data allow_all_stmt_types: True if you want to allow all type of sql statements diff --git a/agent/database_optimizer.py b/agent/database_optimizer.py new file mode 100644 index 00000000..8e8695e4 --- /dev/null +++ b/agent/database_optimizer.py @@ -0,0 +1,422 @@ +"""Basic DB optimizer for Frappe Framework based app. + +This is largely based on heuristics and known good practices for indexing. +""" + +from __future__ import annotations + +import re +from collections import defaultdict +from dataclasses import dataclass +from typing import TYPE_CHECKING, Literal + +from sql_metadata import Parser + +if TYPE_CHECKING: + from agent.site import Site + +# Any index that reads more than 30% table on average is not "useful" +INDEX_SCORE_THRESHOLD = 0.3 +# Anything reading less than this percent of table is considered optimal +OPTIMIZATION_THRESHOLD = 0.1 + + +def cstr(data: str | None) -> str: + if data is None: + return "" + return str(data) + + +def cint(data: int | None) -> int: + if data is None: + return 0 + return int(data) + + +def flt(data: float | None) -> float: + if data is None: + return 0.0 + return float(data) + + +@dataclass +class DBExplain: + # refer: https://mariadb.com/kb/en/explain/ + # Anything not explicitly encoded here is likely not supported. + select_type: Literal["SIMPLE", "PRIMARY", "SUBQUERY", "UNION", "DERIVED"] + table: str + scan_type: Literal[ # What type of scan will be performed + "ALL", # Full table scan + "CONST", # Single row will be read + "EQ_REF", # A single row is found from *unique* index + "REF", # Index is used, but MIGHT hit more than 1 rows as it's non-unique + "RANGE", # The table will be accessed with a key over one or more value ranges. + "INDEX_MERGE", # multiple indexes are used and merged smartly. Equivalent to RANGE + "INDEX_SUBQUERY", + "INDEX", # Full index scan is performed. Similar to full table scan in case of large number of rows. + "REF_OR_NULL", + "UNIQUE_SUBQUERY", + "FULLTEXT", # Full text index is used, + ] + possible_keys: list[str] | None = None # possible indexes that can be used + key: str | None = None # This index is being used + key_len: int | None = None # How many prefix bytes from index are being used + ref: str | None = None # is reference constant or some other column + rows: int = 0 # roughly how many rows will be examined + extra: str | None = None + + @classmethod + def from_frappe_output(cls, data) -> DBExplain: + return cls( + select_type=cstr(data["select_type"]).upper(), + table=data["table"], + scan_type=cstr(data["type"]).upper(), + possible_keys=data["possible_keys"], + key=data["key"], + key_len=cint(data["key_len"]) if data["key_len"] else None, + ref=data["ref"], + rows=cint(data["rows"]), + extra=data.get("Extra"), + ) + + +@dataclass +class DBColumn: + name: str + cardinality: int | None + is_nullable: bool + default: str + data_type: str + + @classmethod + def from_frappe_output(cls, data) -> DBColumn: + "Parse DBColumn from output of describe-database-table command in Frappe" + return cls( + name=data["column"], + cardinality=data.get("cardinality"), + is_nullable=data["is_nullable"], + default=data["default"], + data_type=data["type"], + ) + + +@dataclass +class DBIndex: + name: str + column: str + table: str + unique: bool | None = None + cardinality: int | None = None + sequence: int = 1 + nullable: bool = True + _score: float = 0.0 + + def __eq__(self, other: DBIndex) -> bool: + return self.column == other.column and self.sequence == other.sequence and self.table == other.table + + def __repr__(self): + return f"DBIndex(`{self.table}`.`{self.column}`)" + + @classmethod + def from_frappe_output(cls, data, table) -> DBIndex: + "Parse DBIndex from output of describe-database-table command in Frappe" + return cls( + name=data["name"], + table=table, + unique=data["unique"], + cardinality=data["cardinality"], + sequence=data["sequence"], + nullable=data["nullable"], + column=data["column"], + ) + + def to_dict(self, fields: list[str] | None = None) -> dict: + if not fields: + fields = ["name", "column", "table", "unique", "cardinality", "sequence", "nullable"] + return {field: getattr(self, field) for field in fields} + + +@dataclass +class ColumnStat: + column_name: str + avg_frequency: float + avg_length: float + nulls_ratio: float | None = None + histogram: list[float] = None + + def __post_init__(self): + if not self.histogram: + self.histogram = [] + + @classmethod + def from_frappe_output(cls, data) -> ColumnStat: + return cls( + column_name=data["column_name"], + avg_frequency=data["avg_frequency"], + avg_length=data["avg_length"], + nulls_ratio=data["nulls_ratio"], + histogram=[flt(bin) for bin in data["histogram"].split(",")] if data["histogram"] else [], + ) + + +@dataclass +class DBTable: + name: str + total_rows: int + schema: list[DBColumn] | None = None + indexes: list[DBIndex] | None = None + + def __post_init__(self): + if not self.schema: + self.schema = [] + if not self.indexes: + self.indexes = [] + + def update_cardinality(self, column_stats: list[ColumnStat]) -> None: + """Estimate cardinality using mysql.column_stat""" + for column_stat in column_stats: + for col in self.schema: + if col.name == column_stat.column_name and not col.cardinality and column_stat.avg_frequency: + # "hack" or "math" - average frequency is on average how frequently a row value appears. + # Avg = total_rows / cardinality, so... + col.cardinality = self.total_rows / column_stat.avg_frequency + + @classmethod + def from_frappe_output(cls, data) -> DBTable: + "Parse DBTable from output of describe-database-table command in Frappe" + table_name = data["table_name"] + return cls( + name=table_name, + total_rows=data["total_rows"], + schema=[DBColumn.from_frappe_output(c) for c in data["schema"]], + indexes=[DBIndex.from_frappe_output(i, table_name) for i in data["indexes"]], + ) + + def has_column(self, column: str) -> bool: + return any(col.name == column for col in self.schema) + + +@dataclass +class DBOptimizer: + query: str # raw query in string format + explain_plan: list[DBExplain] = None + tables: dict[str, DBTable] = None + parsed_query: Parser = None + + def __post_init__(self): + if not self.explain_plan: + self.explain_plan = [] + if not self.tables: + self.tables = {} + for explain_entry in self.explain_plan: + explain_entry.select_type = explain_entry.select_type.upper() + explain_entry.scan_type = explain_entry.scan_type.upper() + self.parsed_query = Parser(re.sub(r'"(\S+)"', r"'\1'", self.query)) + + @property + def tables_examined(self) -> list[str]: + return self.parsed_query.tables + + def update_table_data(self, table: DBTable): + self.tables[table.name] = table + + def potential_indexes(self) -> list[DBIndex]: + """Get all columns that can potentially be indexed to speed up this query.""" + + possible_indexes = [] + + # Where claus columns using these operators benefit from index + # 1. = (equality) + # 2. >, <, >=, <= + # 3. LIKE 'xyz%' (Prefix search) + # 4. BETWEEN (for date[time] fields) + # 5. IN (similar to equality) + if where_columns := self.parsed_query.columns_dict.get("where"): + # TODO: Apply some heuristics here, not all columns in where clause are actually useful + possible_indexes.extend(where_columns) + + # Join clauses - Both sides of join should ideally be indexed. One will *usually* be primary key. + if join_columns := self.parsed_query.columns_dict.get("join"): + possible_indexes.extend(join_columns) + + # Top N query variant - Order by column can possibly speed up the query + if ( + order_by_columns := self.parsed_query.columns_dict.get("order_by") + ) and self.parsed_query.limit_and_offset: + possible_indexes.extend(order_by_columns) + + possible_db_indexes = [self._convert_to_db_index(i) for i in possible_indexes] + possible_db_indexes = [i for i in possible_db_indexes if i.column not in ("*", "name")] + possible_db_indexes.sort(key=lambda i: (i.table, i.column)) + + return self._remove_existing_indexes(possible_db_indexes) + + def _convert_to_db_index(self, column: str) -> DBIndex: + column_name, table = None, None + + if "." in column: + table, column_name = column.split(".") + else: + column_name = column + for table_name, db_table in self.tables.items(): + if db_table.has_column(column): + table = table_name + break + return DBIndex(column=column_name, name=column_name, table=table) + + def _remove_existing_indexes(self, potential_indexes: list[DBIndex]) -> list[DBIndex]: # noqa: C901 + """Given list of potential index candidates remove the ones that already exist. + + This also removes multi-column indexes for parts that are applicable to query. + Example: If multi-col index A+B+C exists and query utilizes A+B then + A+B are removed from potential indexes. + """ + + def remove_maximum_indexes(idx: list[DBIndex]): + """Try to remove entire index from potential indexes + If not possible, reduce one part and try again until no parts are left. + """ + if not idx: + return None + matched_sub_index = [] + for idx_part in list(idx): + matching_part = [ + i for i in potential_indexes if i.column == idx_part.column and i.table == idx_part.table + ] + if not matching_part: + # pop and recurse + idx.pop() + return remove_maximum_indexes(idx) + matched_sub_index.extend(matching_part) + + # Every part matched now, lets remove those parts + for i in matched_sub_index: + potential_indexes.remove(i) + return None + + # Reconstruct multi-col index + for table in self.tables.values(): + merged_indexes = defaultdict(list) + for index in table.indexes: + merged_indexes[index.name].append(index) + + for idx in merged_indexes.values(): + idx.sort(key=lambda x: x.sequence) + + for idx in merged_indexes.values(): + remove_maximum_indexes(idx) + return potential_indexes + + def suggest_index(self) -> DBIndex | None: + """Suggest best possible column to index given query and table stats.""" + if missing_tables := (set(self.tables_examined) - set(self.tables.keys())): + raise Exception("DBTable information missing for: " + ", ".join(missing_tables)) + + potential_indexes = self.potential_indexes() + + for index in list(potential_indexes): + table = self.tables[index.table] + + # Data type is not easily indexable - skip + column = next(c for c in table.schema if c.name == index.column) + if "text" in column.data_type.lower() or "json" in column.data_type.lower(): + potential_indexes.remove(index) + # Update cardinality from column so scoring can be done + index.cardinality = column.cardinality + + for index in potential_indexes: + index._score = self.index_score(index) + + potential_indexes.sort(key=lambda i: i._score) + if ( + potential_indexes + and (best_index := potential_indexes[0]) + and best_index._score < INDEX_SCORE_THRESHOLD + ): + return best_index + return None + + def index_score(self, index: DBIndex) -> float: + """Score an index from 0 to 1 based on usefulness. + + A score of 0.5 indicates on average this index will read 50% of the table. (e.g. checkboxes)""" + table = self.tables[index.table] + + cardinality = index.cardinality or 2 + total_rows = table.total_rows or cardinality or 1 + + # We assume most unique values are evenly distributed, this is + # definitely not the case IRL but it should be good enough assumptions + # Score is roughly what percentage of table we will end up reading on typical query + rows_fetched_on_average = (table.total_rows or cardinality) / cardinality + return rows_fetched_on_average / total_rows + + def can_be_optimized(self) -> bool: + """Return true if it's worth optimizing. + + Few cases can not be optimized any further. E.g. ref/eq_ref/cost type + of queries. Assume that anything that reads <10% of table already is + not possible to truly optimize with these heuristics.""" + for explain in self.explain_plan: + for table in self.tables.values(): + if table.name != explain.table: + continue + if (explain.rows / table.total_rows) > OPTIMIZATION_THRESHOLD: + return True + return False + + +class OptimizeDatabaseQueries: + def __init__(self, site: Site, queries: list[str], database_root_password: str): + self.site = site + self.database_root_password = database_root_password + self.queries = queries + self.table_cache: dict[str, DBTable] = {} + self.column_statistics_cache: dict[str, list[ColumnStat]] = {} + + def analyze(self) -> dict[str, list[DBIndex]] | None: + # generate explain output for all the queries at once + explain_output_of_queries_result = self.site.db_instance().explain_queries(self.queries) + explain_output_of_queries = {} + for query, explain_output in explain_output_of_queries_result.items(): + explain_output_of_queries[query] = [DBExplain.from_frappe_output(e) for e in explain_output] + + suggested_indexes_of_queries = {} + + for query in self.queries: + if query not in explain_output_of_queries: + continue + explain_output = explain_output_of_queries[query] + optimizer = DBOptimizer(query=query, explain_plan=explain_output) + tables = optimizer.tables_examined + + for table in tables: + db_table = self.describe_database_table(table) + column_stats = self.fetch_column_stats(table) + db_table.update_cardinality(column_stats) + optimizer.update_table_data(db_table) + + index = optimizer.suggest_index() + if index: + if query not in suggested_indexes_of_queries: + suggested_indexes_of_queries[query] = [] + suggested_indexes_of_queries[query].append(index) + + return suggested_indexes_of_queries + + def describe_database_table(self, table_name: str) -> DBTable | None: + if table_name in self.table_cache: + return self.table_cache[table_name] + result = self.site.describe_database_table(table_name) + if result is None: + self.table_cache[table_name] = None + return None + table = DBTable.from_frappe_output(result) + self.table_cache[table_name] = table + return table + + def fetch_column_stats(self, table_name: str) -> list[ColumnStat] | None: + if table_name in self.column_statistics_cache: + return self.column_statistics_cache[table_name] + db = self.site.db_instance("root", self.database_root_password) + return db.fetch_database_column_statistics(table_name) diff --git a/agent/database_server.py b/agent/database_server.py index 395f0f43..d503927a 100644 --- a/agent/database_server.py +++ b/agent/database_server.py @@ -8,6 +8,7 @@ from peewee import MySQLDatabase +from agent.database import Database from agent.job import job, step from agent.server import Server @@ -177,50 +178,17 @@ def sql(db, query, params=()): return list(map(lambda x: dict(zip(columns, x)), rows)) @job("Column Statistics") - def fetch_column_stats(self, schema, table, private_ip, mariadb_root_password, doc_name): - self._fetch_column_stats(schema, table, private_ip, mariadb_root_password) + def fetch_column_stats_job(self, schema, table, private_ip, mariadb_root_password, doc_name): + self._fetch_column_stats_step(schema, table, private_ip, mariadb_root_password) return {"doc_name": doc_name} @step("Fetch Column Statistics") - def _fetch_column_stats(self, schema, table, private_ip, mariadb_root_password): - """Get various stats about columns in a table. - - Refer: - - https://mariadb.com/kb/en/engine-independent-table-statistics/ - - https://mariadb.com/kb/en/mysqlcolumn_stats-table/ - """ - mariadb = MySQLDatabase( - "mysql", - user="root", - password=mariadb_root_password, - host=private_ip, - port=3306, - ) - - try: - self.sql( - mariadb, - f"ANALYZE TABLE `{schema}`.`{table}` PERSISTENT FOR ALL", - ) - - results = self.sql( - mariadb, - """ - SELECT - column_name, nulls_ratio, avg_length, avg_frequency, - decode_histogram(hist_type,histogram) as histogram - from mysql.column_stats - WHERE db_name = %s - and table_name = %s """, - (schema, table), - ) - - for row in results: - for column in ["nulls_ratio", "avg_length", "avg_frequency"]: - row[column] = float(row[column]) if row[column] else None - except Exception as e: - print(e) + def _fetch_column_stats_step(self, schema, table, private_ip, mariadb_root_password): + return self.fetch_column_stats(schema, table, private_ip, mariadb_root_password) + def fetch_column_stats(self, schema, table, private_ip, mariadb_root_password): + db = Database(private_ip, 3306, "root", mariadb_root_password, schema) + results = db.fetch_database_column_statistics(table) return {"output": json.dumps(results)} def explain_query(self, schema, query, private_ip, mariadb_root_password): diff --git a/agent/site.py b/agent/site.py index d60cf41e..3f75ac55 100644 --- a/agent/site.py +++ b/agent/site.py @@ -13,6 +13,7 @@ from agent.base import AgentException, Base from agent.database import Database +from agent.database_optimizer import OptimizeDatabaseQueries from agent.job import job, step from agent.utils import b2mb, compute_file_hash, get_size @@ -852,75 +853,31 @@ def get_database_free_tables(self): return [] @job("Fetch Database Table Schema") - def fetch_database_table_schema(self): - return self._fetch_database_table_schema() + def fetch_database_table_schema(self, include_table_size: bool = True, include_index_info: bool = True): + database = self.db_instance() + tables = {} + table_schemas = self._fetch_database_table_schema(database, include_index_info=include_index_info) + for table_name in table_schemas: + tables[table_name] = { + "columns": table_schemas[table_name], + } + + if include_table_size: + table_sizes = self._fetch_database_table_sizes(database) + for table_name in table_sizes: + if table_name not in tables: + continue + tables[table_name]["size"] = table_sizes[table_name] - @step("Fetch Database Table Schema") - def _fetch_database_table_schema(self): - index_info = self.get_database_table_indexes() - command = f"""SELECT - TABLE_NAME AS `table`, - COLUMN_NAME AS `column`, - DATA_TYPE AS `data_type`, - IS_NULLABLE AS `is_nullable`, - COLUMN_DEFAULT AS `default` - FROM - INFORMATION_SCHEMA.COLUMNS - WHERE - TABLE_SCHEMA='{self.database}'; - """ - command = quote(command) - data = self.execute( - f"mysql -sN -h {self.host} -u{self.user} -p{self.password} -e {command} --batch" - ).get("output") - data = data.split("\n") - data = [line.split("\t") for line in data] - tables = {} # : [, , ...] - for row in data: - if len(row) != 5: - continue - table = row[0] - if table not in tables: - tables[table] = [] - tables[table].append( - { - "column": row[1], - "data_type": row[2], - "is_nullable": row[3] == "YES", - "default": row[4], - "indexes": index_info.get(table, {}).get(row[1], []), - } - ) return tables - def get_database_table_indexes(self): - command = f""" - SELECT - TABLE_NAME AS `table`, - COLUMN_NAME AS `column`, - INDEX_NAME AS `index` - FROM - INFORMATION_SCHEMA.STATISTICS - WHERE - TABLE_SCHEMA='{self.database}' - """ - command = quote(command) - data = self.execute( - f"mysql -sN -h {self.host} -u{self.user} -p{self.password} -e {command} --batch" - ).get("output") - data = data.split("\n") - data = [line.split("\t") for line in data] - tables = {} # : { : [, , ...] } - for row in data: - if len(row) != 3: - continue - table = row[0] - if table not in tables: - tables[table] = {} - if row[1] not in tables[table]: - tables[table][row[1]] = [] - tables[table][row[1]].append(row[2]) - return tables + @step("Fetch Database Table Schema") + def _fetch_database_table_schema(self, database: Database, include_index_info: bool = True): + return database.fetch_database_table_schema(include_index_info=include_index_info) + + @step("Fetch Database Table Sizes") + def _fetch_database_table_sizes(self, database: Database): + return database.fetch_database_table_sizes() def run_sql_query(self, query: str, commit: bool = False, as_dict: bool = False): db = self.db_instance() @@ -930,6 +887,38 @@ def run_sql_query(self, query: str, commit: bool = False, as_dict: bool = False) response["failed_query"] = db.last_executed_query return response + def analyze_slow_queries(self, queries: list[dict], database_root_password: str) -> list[dict]: + """ + Args: + queries (list[dict]): List of queries to analyze + { + "example": "", + "normalized": "", + } + """ + example_queries = [query["example"] for query in queries] + optimizer = OptimizeDatabaseQueries(self, example_queries, database_root_password) + analysis = optimizer.analyze() + analysis_summary = {} # map[query -> list[index_info_dict] + for query, indexes in analysis.items(): + analysis_summary[query] = [index.to_dict() for index in indexes] + + result = [] # list[{example, normalized, suggested_indexes}] + for query in queries: + query["suggested_indexes"] = analysis_summary.get(query["example"], []) + result.append(query) + return result + + def fetch_summarized_database_performance_report(self, mariadb_root_password: str): + database = self.db_instance(username="root", password=mariadb_root_password) + return database.fetch_summarized_performance_report() + + def fetch_database_process_list(self, mariadb_root_password: str): + return self.db_instance(username="root", password=mariadb_root_password).fetch_process_list() + + def kill_database_process(self, pid: str, mariadb_root_password: str): + return self.db_instance(username="root", password=mariadb_root_password).kill_process(pid) + def db_instance(self, username: str | None = None, password: str | None = None) -> Database: if not username: username = self.user diff --git a/agent/tests/test_database_optimizer.py b/agent/tests/test_database_optimizer.py new file mode 100644 index 00000000..a05ae92c --- /dev/null +++ b/agent/tests/test_database_optimizer.py @@ -0,0 +1,659 @@ +from __future__ import annotations + +import json +import unittest + +from agent.database_optimizer import DBExplain, DBOptimizer, DBTable + + +class TestDatabaseOptimizer(unittest.TestCase): + def test_basic_index_existence_analysis(self): + def possible_indexes(q): + user = DBTable.from_frappe_output(USER_TABLE) + has_role = DBTable.from_frappe_output(HAS_ROLE_TABLE) + return [ + i.column + for i in DBOptimizer( + query=q, + tables={"tabUser": user, "tabHas Role": has_role}, + ).potential_indexes() + ] + + self.assertEqual( + ["creation"], + possible_indexes("select `name` from `tabUser` order by `creation` desc limit 1"), + ) + + self.assertEqual( + ["full_name"], + possible_indexes("select `name` from `tabUser` where full_name = 'xyz'"), + ) + + self.assertIn( + "user", + possible_indexes("select `name` from `tabUser` u join `tabHas Role` h on h.user = u.name"), + ) + + def test_suggestion_using_table_stats(self): + user = DBTable.from_frappe_output(USER_TABLE) + has_role = DBTable.from_frappe_output(HAS_ROLE_TABLE) + + tables = {"tabUser": user, "tabHas Role": has_role} + self.assertEqual(user.total_rows, 92) + + # This should suggest adding api_key as it definitely has highest cardinality. + optimizer = DBOptimizer( + query="select name from tabUser where enabled = 1 and api_key = 'xyz'", tables=tables + ) + self.assertIn("api_key", [i.column for i in optimizer.potential_indexes()]) + + index = optimizer.suggest_index() + self.assertEqual(index.column, "api_key") + + # This should suggest nothing as modified is already indexed + optimizer = DBOptimizer( + query="select name from tabUser order by modified asc", + tables=tables, + ) + self.assertIsNone(optimizer.suggest_index()) + + # This should suggest nothing as modified is already indexed + optimizer = DBOptimizer( + query="select name from tabUser u join `tabHas Role` r on r.parent = u.name where r.role='System Manager'", # noqa: E501 + tables=tables, + ) + index = optimizer.suggest_index() + self.assertEqual(index.column, "role") + self.assertEqual(index.table, "tabHas Role") + + def test_complex_sub_query_aliases(self): + """Check if table identification is correct for subqueries.""" + + q = """SELECT *, + (SELECT COUNT(*) FROM `tabHD Ticket Comment` WHERE `tabHD Ticket Comment`.`reference_ticket`=`tabHD Ticket`.`name`) `count_comment`, + (SELECT COUNT(*) FROM `tabCommunication` WHERE `tabCommunication`.`reference_doctype`='HD Ticket' AND `tabCommunication`.`reference_name`=`tabHD Ticket`.`name`) `count_msg`, + FROM `tabHD Ticket` + WHERE `agent_group`='L2' + ORDER BY `modified` DESC + LIMIT 20 + """ # noqa: E501 + explain = [DBExplain.from_frappe_output(e) for e in json.loads(EXPLAIN_OUTPUT)] + optimizer = DBOptimizer(query=q, explain_plan=explain) + optimizer.update_table_data(DBTable.from_frappe_output(HD_TICKET_TABLE)) + optimizer.update_table_data(DBTable.from_frappe_output(HD_TICKET_COMMENT_TABLE)) + optimizer.update_table_data(DBTable.from_frappe_output(COMMUNICATION_TABLE)) + + self.assertTrue(optimizer.can_be_optimized()) + index = optimizer.suggest_index() + self.assertEqual(index.table, "tabHD Ticket Comment") + self.assertEqual(index.column, "reference_ticket") + + +# Table stats extracted using describe-database-table for testing. + +USER_TABLE = { + "table_name": "tabUser", + "total_rows": 92, + "schema": [ + { + "column": "name", + "type": "varchar(140)", + "is_nullable": False, + "default": None, + "cardinality": 91, + }, + {"column": "creation", "type": "datetime(6)", "is_nullable": True, "default": None}, + { + "column": "modified", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + "cardinality": 91, + }, + { + "column": "modified_by", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + }, + {"column": "owner", "type": "varchar(140)", "is_nullable": True, "default": None}, + {"column": "docstatus", "type": "int(1)", "is_nullable": False, "default": "0"}, + {"column": "idx", "type": "int(8)", "is_nullable": False, "default": "0"}, + {"column": "enabled", "type": "int(1)", "is_nullable": False, "default": "1"}, + {"column": "email", "type": "varchar(140)", "is_nullable": False, "default": ""}, + { + "column": "first_name", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + "cardinality": 88, + }, + { + "column": "reset_password_key", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + "cardinality": 84, + }, + { + "column": "user_type", + "type": "varchar(140)", + "is_nullable": True, + "default": "System User", + "cardinality": 2, + }, + { + "column": "api_key", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + "cardinality": 70, + }, + {"column": "api_secret", "type": "text", "is_nullable": True, "default": None}, + {"column": "_user_tags", "type": "text", "is_nullable": True, "default": None}, + {"column": "_comments", "type": "text", "is_nullable": True, "default": None}, + {"column": "_assign", "type": "text", "is_nullable": True, "default": None}, + {"column": "_liked_by", "type": "text", "is_nullable": True, "default": None}, + ], + "indexes": [ + { + "unique": True, + "cardinality": 91, + "name": "PRIMARY", + "sequence": 1, + "nullable": False, + "column": "name", + "type": "BTREE", + }, + { + "unique": True, + "cardinality": 91, + "name": "username", + "sequence": 1, + "nullable": True, + "column": "username", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 91, + "name": "modified", + "sequence": 1, + "nullable": True, + "column": "modified", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 91, + "name": "reset_password_key_index", + "sequence": 1, + "nullable": True, + "column": "reset_password_key", + "type": "BTREE", + }, + ], +} + + +HAS_ROLE_TABLE = { + "table_name": "tabHas Role", + "total_rows": 96, + "schema": [ + { + "column": "name", + "type": "varchar(140)", + "is_nullable": "NO", + "default": None, + "cardinality": 92, + }, + {"column": "creation", "type": "datetime(6)", "is_nullable": "YES", "default": None}, + {"column": "modified", "type": "datetime(6)", "is_nullable": "YES", "default": None}, + { + "column": "modified_by", + "type": "varchar(140)", + "is_nullable": "YES", + "default": None, + }, + {"column": "owner", "type": "varchar(140)", "is_nullable": "YES", "default": None}, + {"column": "docstatus", "type": "int(1)", "is_nullable": "NO", "default": "0"}, + {"column": "idx", "type": "int(8)", "is_nullable": "NO", "default": "0"}, + { + "column": "role", + "type": "varchar(140)", + "is_nullable": "YES", + "default": None, + "cardinality": 78, + }, + { + "column": "parent", + "type": "varchar(140)", + "is_nullable": "YES", + "default": None, + "cardinality": 92, + }, + { + "column": "parentfield", + "type": "varchar(140)", + "is_nullable": "YES", + "default": None, + }, + { + "column": "parenttype", + "type": "varchar(140)", + "is_nullable": "YES", + "default": None, + }, + ], + "indexes": [ + { + "unique": True, + "cardinality": 92, + "name": "PRIMARY", + "sequence": 1, + "nullable": "", + "column": "name", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 92, + "name": "parent", + "sequence": 1, + "nullable": "YES", + "column": "parent", + "type": "BTREE", + }, + ], +} + + +HD_TICKET_TABLE = { + "table_name": "tabHD Ticket", + "total_rows": 3820, + "schema": [ + { + "column": "name", + "type": "bigint(20)", + "is_nullable": False, + "default": None, + "cardinality": 3529, + }, + {"column": "creation", "type": "datetime(6)", "is_nullable": True, "default": None}, + { + "column": "modified", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + "cardinality": 3529, + }, + { + "column": "modified_by", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + }, + {"column": "owner", "type": "varchar(140)", "is_nullable": True, "default": None}, + {"column": "docstatus", "type": "int(1)", "is_nullable": False, "default": "0"}, + {"column": "idx", "type": "int(8)", "is_nullable": False, "default": "0"}, + {"column": "subject", "type": "varchar(140)", "is_nullable": True, "default": None}, + {"column": "raised_by", "type": "varchar(140)", "is_nullable": True, "default": None}, + { + "column": "status", + "type": "varchar(140)", + "is_nullable": True, + "default": "Open", + "cardinality": 8, + }, + {"column": "priority", "type": "varchar(140)", "is_nullable": True, "default": None}, + { + "column": "ticket_type", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + }, + { + "column": "agent_group", + "type": "varchar(140)", + "is_nullable": True, + "default": "L1", + "cardinality": 9, + }, + { + "column": "ticket_split_from", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + }, + {"column": "description", "type": "longtext", "is_nullable": True, "default": None}, + {"column": "template", "type": "varchar(140)", "is_nullable": True, "default": None}, + {"column": "sla", "type": "varchar(140)", "is_nullable": True, "default": None}, + { + "column": "response_by", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + }, + { + "column": "response_by_variance", + "type": "decimal(21,9)", + "is_nullable": True, + "default": None, + }, + { + "column": "agreement_status", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + }, + { + "column": "resolution_by", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + }, + { + "column": "resolution_by_variance", + "type": "decimal(21,9)", + "is_nullable": True, + "default": None, + }, + { + "column": "service_level_agreement_creation", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + }, + { + "column": "on_hold_since", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + }, + { + "column": "total_hold_time", + "type": "decimal(21,9)", + "is_nullable": True, + "default": None, + }, + { + "column": "first_response_time", + "type": "decimal(21,9)", + "is_nullable": True, + "default": None, + }, + { + "column": "first_responded_on", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + }, + { + "column": "avg_response_time", + "type": "decimal(21,9)", + "is_nullable": True, + "default": None, + }, + { + "column": "resolution_details", + "type": "longtext", + "is_nullable": True, + "default": None, + }, + {"column": "opening_date", "type": "date", "is_nullable": True, "default": None}, + {"column": "opening_time", "type": "time(6)", "is_nullable": True, "default": None}, + { + "column": "resolution_date", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + }, + { + "column": "resolution_time", + "type": "decimal(21,9)", + "is_nullable": True, + "default": None, + }, + { + "column": "user_resolution_time", + "type": "decimal(21,9)", + "is_nullable": True, + "default": None, + }, + {"column": "contact", "type": "varchar(140)", "is_nullable": True, "default": None}, + {"column": "customer", "type": "varchar(140)", "is_nullable": True, "default": None}, + { + "column": "email_account", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + }, + {"column": "attachment", "type": "text", "is_nullable": True, "default": None}, + {"column": "_user_tags", "type": "text", "is_nullable": True, "default": None}, + {"column": "_comments", "type": "text", "is_nullable": True, "default": None}, + {"column": "_assign", "type": "text", "is_nullable": True, "default": None}, + {"column": "_liked_by", "type": "text", "is_nullable": True, "default": None}, + {"column": "_seen", "type": "text", "is_nullable": True, "default": None}, + ], + "indexes": [ + { + "unique": True, + "cardinality": 3529, + "name": "PRIMARY", + "sequence": 1, + "nullable": False, + "column": "name", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 8, + "name": "status", + "sequence": 1, + "nullable": True, + "column": "status", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 3529, + "name": "modified", + "sequence": 1, + "nullable": True, + "column": "modified", + "type": "BTREE", + }, + ], +} + + +HD_TICKET_COMMENT_TABLE = { + "table_name": "tabHD Ticket Comment", + "total_rows": 2683, + "schema": [ + { + "column": "name", + "type": "varchar(140)", + "is_nullable": False, + "default": None, + "cardinality": 2683, + }, + {"column": "creation", "type": "datetime(6)", "is_nullable": True, "default": None}, + { + "column": "modified", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + "cardinality": 2345, + }, + { + "column": "reference_ticket", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + "cardinality": 1379, + }, + { + "column": "commented_by", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + }, + {"column": "content", "type": "longtext", "is_nullable": True, "default": None}, + {"column": "is_pinned", "type": "int(1)", "is_nullable": False, "default": "0"}, + ], + "indexes": [ + { + "unique": True, + "cardinality": 2345, + "name": "PRIMARY", + "sequence": 1, + "nullable": False, + "column": "name", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 2345, + "name": "modified", + "sequence": 1, + "nullable": True, + "column": "modified", + "type": "BTREE", + }, + ], +} + + +COMMUNICATION_TABLE = { + "table_name": "tabCommunication", + "total_rows": 20727, + "schema": [ + { + "column": "name", + "type": "varchar(140)", + "is_nullable": False, + "default": None, + "cardinality": 19713, + }, + {"column": "creation", "type": "datetime(6)", "is_nullable": True, "default": None}, + { + "column": "modified", + "type": "datetime(6)", + "is_nullable": True, + "default": None, + "cardinality": 19713, + }, + { + "column": "reference_doctype", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + "cardinality": 1, + }, + { + "column": "reference_name", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + "cardinality": 3798, + }, + { + "column": "reference_owner", + "type": "varchar(140)", + "is_nullable": True, + "default": None, + "cardinality": 1314, + }, + ], + "indexes": [ + { + "unique": True, + "cardinality": 19713, + "name": "PRIMARY", + "sequence": 1, + "nullable": False, + "column": "name", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 19713, + "name": "modified", + "sequence": 1, + "nullable": True, + "column": "modified", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 2, + "name": "reference_doctype_reference_name_index", + "sequence": 1, + "nullable": True, + "column": "reference_doctype", + "type": "BTREE", + }, + { + "unique": False, + "cardinality": 9856, + "name": "reference_doctype_reference_name_index", + "sequence": 2, + "nullable": True, + "column": "reference_name", + "type": "BTREE", + }, + ], +} + + +EXPLAIN_OUTPUT = """[ + { + "Extra": "", + "id": 1, + "key": "modified", + "key_len": "9", + "possible_keys": null, + "ref": null, + "rows": "20", + "select_type": "PRIMARY", + "table": "tabHD Ticket", + "type": "index" + }, + { + "Extra": "Using index condition; Using where", + "id": 4, + "key": "reference_doctype_reference_name_index", + "key_len": "563", + "possible_keys": "reference_doctype_reference_name_index", + "ref": "const", + "rows": "10236", + "select_type": "DEPENDENT SUBQUERY", + "table": "tabCommunication", + "type": "ref" + }, + { + "Extra": "Using index condition; Using where", + "id": 3, + "key": "reference_doctype_reference_name_index", + "key_len": "563", + "possible_keys": "reference_doctype_reference_name_index", + "ref": "const", + "rows": "10236", + "select_type": "DEPENDENT SUBQUERY", + "table": "tabCommunication", + "type": "ref" + }, + { + "Extra": "Using where; Using index", + "id": 2, + "key": "reference_ticket_index", + "key_len": "563", + "possible_keys": "reference_ticket_index", + "ref": null, + "rows": "2823", + "select_type": "DEPENDENT SUBQUERY", + "table": "tabHD Ticket Comment", + "type": "index" + } +]""" diff --git a/agent/web.py b/agent/web.py index 9e50c924..c1c428a0 100644 --- a/agent/web.py +++ b/agent/web.py @@ -557,7 +557,18 @@ def backup_site(bench, site): ) @validate_bench_and_site def fetch_database_table_schema(bench, site): - job = Server().benches[bench].sites[site].fetch_database_table_schema() + data = request.json or {} + include_table_size = data.get("include_table_size", False) + include_index_info = data.get("include_index_info", False) + job = ( + Server() + .benches[bench] + .sites[site] + .fetch_database_table_schema( + include_table_size=include_table_size, + include_index_info=include_index_info, + ) + ) return {"job": job} @@ -580,10 +591,55 @@ def run_sql(bench, site): @application.route( - "/benches//sites//database/users", - methods=["POST"], + "/benches//sites//database/analyze-slow-queries", methods=["POST"] ) @validate_bench_and_site +def analyze_slow_queries(bench: str, site: str): + queries = request.json["queries"] + mariadb_root_password = request.json["mariadb_root_password"] + + return Response( + json.dumps( + Server().benches[bench].sites[site].analyze_slow_queries(queries, mariadb_root_password), + cls=JSONEncoderForSQLQueryResult, + ), + mimetype="application/json", + ) + + +@application.route( + "/benches//sites//database/performance-report", methods=["POST"] +) +def database_performance_report(bench, site): + data = request.json + result = ( + Server() + .benches[bench] + .sites[site] + .fetch_summarized_database_performance_report(data["mariadb_root_password"]) + ) + return jsonify(json.loads(json.dumps(result, cls=JSONEncoderForSQLQueryResult))) + + +@application.route("/benches//sites//database/processes", methods=["GET", "POST"]) +def database_process_list(bench, site): + data = request.json + return jsonify( + Server().benches[bench].sites[site].fetch_database_process_list(data["mariadb_root_password"]) + ) + + +@application.route( + "/benches//sites//database/kill-process/", methods=["GET", "POST"] +) +def database_kill_process(bench, site, pid): + data = request.json + Server().benches[bench].sites[site].kill_database_process(pid, data["mariadb_root_password"]) + return "killed" + + +@application.route("/benches//sites//database/users", methods=["POST"]) +@validate_bench_and_site def create_database_user(bench, site): data = request.json job = ( @@ -1010,13 +1066,15 @@ def get_database_deadlocks(): return jsonify(DatabaseServer().get_deadlocks(**data)) +# TODO can be removed @application.route("/database/column-stats", methods=["POST"]) def fetch_column_statistics(): data = request.json - job = DatabaseServer().fetch_column_stats(**data) + job = DatabaseServer().fetch_column_stats_job(**data) return {"job": job} +# TODO can be removed @application.route("/database/explain", methods=["POST"]) def explain(): data = request.json diff --git a/requirements.txt b/requirements.txt index e13b0e4b..68367b47 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,5 @@ Werkzeug==0.16.0 wrapt==1.16.0 docker==6.1.2 filelock==3.13.1 -sentry-sdk[flask, rq]==2.1.1 \ No newline at end of file +sentry-sdk[flask, rq]==2.1.1 +sql_metadata==2.15.0 \ No newline at end of file