Skip to content

Commit

Permalink
feat(database): db performance insights (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmoysrt authored Dec 26, 2024
1 parent 22f2c61 commit d0b4f8f
Show file tree
Hide file tree
Showing 7 changed files with 1,476 additions and 112 deletions.
269 changes: 268 additions & 1 deletion agent/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,273 @@ def modify_user_permissions(self, username: str, mode: str, permissions: dict |

self._run_sql(queries_str, commit=True, allow_all_stmt_types=True)

def fetch_database_table_sizes(self) -> dict:
data = self._run_sql(
f"SELECT table_name, data_length, index_length FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA"
f"='{self.database_name}'",
as_dict=True,
)
if len(data) == 0:
return []
data = data[0]["output"]
tables = {}
for d in data:
tables[d["table_name"]] = {
"data_length": int(d["data_length"]),
"index_length": int(d["index_length"]),
"total_size": int(d["data_length"]) + int(d["index_length"]),
}
d["data_length"] = int(d["data_length"])
d["index_length"] = int(d["index_length"])
d["total_size"] = d["data_length"] + d["index_length"]
return tables

def fetch_database_table_schema(self, include_index_info: bool = True):
index_info = {}
index_usage_info = {}
data = self._run_sql(
f"""SELECT
TABLE_NAME AS `table`,
COLUMN_NAME AS `column`,
DATA_TYPE AS `data_type`,
IS_NULLABLE AS `is_nullable`,
COLUMN_DEFAULT AS `default`
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_SCHEMA='{self.database_name}';
""",
as_dict=True,
)
if len(data) == 0:
return {}
data = data[0]["output"]
tables = {} # <table_name>: [<column_1_info>, <column_2_info>, ...]

if include_index_info:
index_info = self.fetch_database_table_indexes()
index_usage_info = self.fetch_database_table_index_usage()

for record in data:
if record["table"] not in tables:
tables[record["table"]] = []
indexes = index_info.get(record["table"], {}).get(record["column"], [])
column_index_usage = {}
for index in indexes:
column_index_usage[index] = index_usage_info.get(record["table"], {}).get(index, 0)

tables[record["table"]].append(
{
"column": record["column"],
"data_type": record["data_type"],
"is_nullable": record["is_nullable"] == "YES",
"default": record["default"],
"index_info": {
"is_indexed": len(indexes) > 0,
"indexes": indexes,
"index_usage": column_index_usage,
},
}
)
return tables

def fetch_database_table_indexes(self):
data = self._run_sql(
f"""
SELECT
TABLE_NAME AS `table`,
COLUMN_NAME AS `column`,
INDEX_NAME AS `index`
FROM
INFORMATION_SCHEMA.STATISTICS
WHERE
TABLE_SCHEMA='{self.database_name}'
""",
as_dict=True,
)
if len(data) == 0:
return {}
data = data[0]["output"]
tables = {} # <table_name>: { <column_name> : [<index1>, <index2>, ...] }
for record in data:
if record["table"] not in tables:
tables[record["table"]] = {}
if record["column"] not in tables[record["table"]]:
tables[record["table"]][record["column"]] = []
tables[record["table"]][record["column"]].append(record["index"])
return tables

def fetch_database_table_index_usage(self):
data = self._run_sql(
f"""
SELECT
TABLE_NAME AS `table`,
INDEX_NAME AS `index`,
ROWS_READ AS `rows_read`
FROM
INFORMATION_SCHEMA.INDEX_STATISTICS
WHERE
TABLE_SCHEMA='{self.database_name}'
""",
as_dict=True,
)
if len(data) == 0:
return {}
data = data[0]["output"]
tables = {} # <table_name>: { <index_name> : <rows_read> }
for record in data:
if record["table"] not in tables:
tables[record["table"]] = {}
tables[record["table"]][record["index"]] = int(record["rows_read"])
return tables

def explain_query(self, query) -> list:
result = self._run_sql(query=query, as_dict=True)
return result[0]["output"]

def explain_queries(self, queries: list) -> list:
sql_query = ""
for query in queries:
sql_query += f"EXPLAIN {query};\n"
result = self._run_sql(query=sql_query, as_dict=True)
data = {}
for record in result:
data[record["query"]] = record["output"]
return data

def fetch_database_column_statistics(self, table):
"""Get various stats about columns in a table.
Refer:
- https://mariadb.com/kb/en/engine-independent-table-statistics/
- https://mariadb.com/kb/en/mysqlcolumn_stats-table/
"""
self._run_sql(
f"ANALYZE TABLE `{self.database_name}`.`{table}` PERSISTENT FOR ALL",
)

results = self._run_sql(
f"""
SELECT
column_name, nulls_ratio, avg_length, avg_frequency,
decode_histogram(hist_type,histogram) as histogram
FROM
mysql.column_stats
WHERE
db_name='{self.database_name}'
and table_name='{table}'
""",
as_dict=True,
)
if len(results) == 0:
raise Exception("Failed to fetch column stats")
result = results[0]["output"]

for row in results:
for column in ["nulls_ratio", "avg_length", "avg_frequency"]:
row[column] = float(row[column]) if row[column] else None

return result

def fetch_summarized_performance_report(self):
queries = f"""
-- Top 10 time consuming queries;
SELECT
(SUM_TIMER_WAIT / SUM(SUM_TIMER_WAIT) OVER() * 100) AS percent,
round(SUM_TIMER_WAIT/1000000000, 1) AS total_time_ms,
COUNT_STAR AS calls,
round(AVG_TIMER_WAIT/1000000000, 1) AS avg_time_ms,
DIGEST_TEXT AS query
FROM performance_schema.events_statements_summary_by_digest
WHERE SCHEMA_NAME='{self.database_name}'
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
-- Top 10 queries with full table scans;
SELECT t1.exec_count AS calls,
t1.rows_examined AS rows_examined,
t1.rows_sent AS rows_sent,
t1.query AS query,
t2.DIGEST_TEXT AS example
FROM sys.statements_with_full_table_scans AS t1
LEFT JOIN (
SELECT DIGEST, FIRST_VALUE(DIGEST_TEXT) OVER (PARTITION BY DIGEST ORDER BY RAND()) AS DIGEST_TEXT
FROM performance_schema.events_statements_summary_by_digest
) AS t2 ON t1.digest = t2.DIGEST
ORDER BY rows_examined DESC
LIMIT 10;
-- Unused Indexes;
SELECT
index_name,
object_name AS table_name
FROM
sys.schema_unused_indexes
WHERE
object_schema='{self.database_name}';
-- Redundant Indexes;
SELECT
table_name,
redundant_index_name,
redundant_index_columns,
dominant_index_name,
dominant_index_columns
FROM
sys.schema_redundant_indexes
WHERE
table_schema='{self.database_name}';
"""

result = self._run_sql(queries, as_dict=True)
return {
"top_10_time_consuming_queries": result[0]["output"],
"top_10_queries_with_full_table_scan": result[1]["output"],
"unused_indexes": result[2]["output"],
"redundant_indexes": result[3]["output"],
}

def fetch_process_list(self):
result = self._run_sql("SHOW FULL PROCESSLIST", as_dict=True)
if len(result) == 0:
return []
return [
{
"id": str(record["Id"]),
"command": record["Command"],
"query": record["Info"],
"state": record["State"],
"time": record["Time"],
"db_user": record["User"],
"db_user_host": record["Host"].split(":")[0],
}
for record in result[0]["output"]
if record["db"] == self.database_name
]

def kill_process(self, pid: str):
with contextlib.suppress(Exception):
processes = self.fetch_process_list()
"""
It's important to validate whether the pid belongs to the current database
As we are running it as `root` user, it can be possible to kill processes from other databases
by forging the request
"""
isFound = False
for process in processes:
if process.get("id") == pid:
isFound = True
break
if not isFound:
return
"""
The query can fail if the process is already killed.
Anyway we need to reload the process list after killing to verify if the process is killed.
We can safely ignore the exception
"""
self._run_sql(f"KILL {pid}")

# Private helper methods
def _run_sql( # noqa C901
self, query: str, commit: bool = False, as_dict: bool = False, allow_all_stmt_types: bool = False
Expand All @@ -164,7 +431,7 @@ def _run_sql( # noqa C901
Args:
query: SQL query string
commit: True if you want to commit the changes. If commit is false, it will rollback the changes and
also wouldnt allow to run ddl, dcl or tcl queries
also wouldn't allow to run ddl, dcl or tcl queries
as_dict: True if you want to return the result as a dictionary (like frappe.db.sql).
Otherwise it will return a dict of columns and data
allow_all_stmt_types: True if you want to allow all type of sql statements
Expand Down
Loading

0 comments on commit d0b4f8f

Please sign in to comment.