Skip to content

Commit

Permalink
fix(backend): 日志平台查询超时的问题 #3718
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzhw8 committed Mar 26, 2024
1 parent 44127a7 commit f212829
Showing 1 changed file with 36 additions and 14 deletions.
50 changes: 36 additions & 14 deletions dbm-ui/backend/db_services/taskflow/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,22 @@ def get_node_histories(self, node_id: str) -> List[Dict[str, Any]]:
)
return sorted(histories, key=itemgetter("started_time"), reverse=True)

def get_version_logs(self, node_id: str, version_id: str) -> List[Dict[str, str]]:
@staticmethod
def bklog_esquery_search(indices, query_string, start_time, end_time):
"""esquery搜索"""
resp = BKLogApi.esquery_search(
{
"indices": indices,
"start_time": start_time,
"end_time": end_time,
"query_string": query_string,
"start": 0,
"size": 1000,
}
)
return resp["hits"]["hits"]

def get_version_logs(self, node_id: str, version_id: str) -> List[Dict[str, Dict[str, str]]]:
"""获取节点的日志信息"""
try:
flow_node = FlowNode.objects.get(root_id=self.root_id, node_id=node_id)
Expand All @@ -169,21 +184,28 @@ def get_version_logs(self, node_id: str, version_id: str) -> List[Dict[str, str]
if flow_node.updated_at < timezone.now() - timedelta(days=7):
return [self.generate_log_record(message=_("节点日志仅保留7天"))]

resp = BKLogApi.esquery_search(
{
"indices": f"{env.DBA_APP_BK_BIZ_ID}_bklog.dbm_log,{env.DBA_APP_BK_BIZ_ID}_bklog.dbm_dbactuator",
"start_time": datetime2str(flow_node.started_at),
# 检索节点开始后一天内的日志,一般情况下,节点执行时间不会超过一天
"end_time": datetime2str(flow_node.updated_at + timedelta(days=1)),
# TODO 可优化检索,需清洗后根据 root_id、node_id、version_id 进行检索
"query_string": f"{self.root_id} AND {node_id} AND {version_id}",
"start": 0,
"size": 1000,
"sort_list": [["dtEventTimeStamp", "asc"], ["gseIndex", "asc"], ["iterationIndex", "asc"]],
}
start_time = datetime2str(flow_node.started_at)
end_time = datetime2str(flow_node.updated_at + timedelta(days=7))
dbm_logs = self.bklog_esquery_search(
indices=f"{env.DBA_APP_BK_BIZ_ID}_bklog.dbm_log",
query_string=f"({self.root_id} AND {node_id} AND {version_id})"
f" AND (__ext.io_kubernetes_pod:*worker* OR __ext.io_kubernetes_pod:*dbsimulation*)",
start_time=start_time,
end_time=end_time,
)
dbm_dbactuator_logs = self.bklog_esquery_search(
indices=f"{env.DBA_APP_BK_BIZ_ID}_bklog.dbm_dbactuator",
query_string=f"{self.root_id} AND {node_id} AND {version_id}",
start_time=start_time,
end_time=end_time,
)
logs = []
for hit in resp["hits"]["hits"]:
sorted_hits = sorted(
dbm_logs + dbm_dbactuator_logs,
key=lambda x: (x["_source"]["dtEventTimeStamp"], x["_source"]["gseIndex"], x["_source"]["iterationIndex"]),
)

for hit in sorted_hits:
log = self._format_log(hit["_source"]["log"], hit["_source"]["serverIp"], hit["_index"])
if log:
logs.append(
Expand Down

0 comments on commit f212829

Please sign in to comment.