diff --git a/dbm-ui/backend/db_services/taskflow/handlers.py b/dbm-ui/backend/db_services/taskflow/handlers.py index d20973f368..6c9f297914 100644 --- a/dbm-ui/backend/db_services/taskflow/handlers.py +++ b/dbm-ui/backend/db_services/taskflow/handlers.py @@ -160,7 +160,22 @@ def get_node_histories(self, node_id: str) -> List[Dict[str, Any]]: ) return sorted(histories, key=itemgetter("started_time"), reverse=True) - def get_version_logs(self, node_id: str, version_id: str) -> List[Dict[str, str]]: + @staticmethod + def bklog_esquery_search(indices, query_string, start_time, end_time): + """esquery搜索""" + resp = BKLogApi.esquery_search( + { + "indices": indices, + "start_time": start_time, + "end_time": end_time, + "query_string": query_string, + "start": 0, + "size": 1000, + } + ) + return resp["hits"]["hits"] + + def get_version_logs(self, node_id: str, version_id: str) -> List[Dict[str, Dict[str, str]]]: """获取节点的日志信息""" try: flow_node = FlowNode.objects.get(root_id=self.root_id, node_id=node_id) @@ -169,21 +184,28 @@ def get_version_logs(self, node_id: str, version_id: str) -> List[Dict[str, str] if flow_node.updated_at < timezone.now() - timedelta(days=7): return [self.generate_log_record(message=_("节点日志仅保留7天"))] - resp = BKLogApi.esquery_search( - { - "indices": f"{env.DBA_APP_BK_BIZ_ID}_bklog.dbm_log,{env.DBA_APP_BK_BIZ_ID}_bklog.dbm_dbactuator", - "start_time": datetime2str(flow_node.started_at), - # 检索节点开始后一天内的日志,一般情况下,节点执行时间不会超过一天 - "end_time": datetime2str(flow_node.updated_at + timedelta(days=1)), - # TODO 可优化检索,需清洗后根据 root_id、node_id、version_id 进行检索 - "query_string": f"{self.root_id} AND {node_id} AND {version_id}", - "start": 0, - "size": 1000, - "sort_list": [["dtEventTimeStamp", "asc"], ["gseIndex", "asc"], ["iterationIndex", "asc"]], - } + start_time = datetime2str(flow_node.started_at) + end_time = datetime2str(flow_node.updated_at + timedelta(days=7)) + dbm_logs = self.bklog_esquery_search( + indices=f"{env.DBA_APP_BK_BIZ_ID}_bklog.dbm_log", + query_string=f"({self.root_id} AND {node_id} AND {version_id})" + f" AND (__ext.io_kubernetes_pod:*worker* OR __ext.io_kubernetes_pod:*dbsimulation*)", + start_time=start_time, + end_time=end_time, + ) + dbm_dbactuator_logs = self.bklog_esquery_search( + indices=f"{env.DBA_APP_BK_BIZ_ID}_bklog.dbm_dbactuator", + query_string=f"{self.root_id} AND {node_id} AND {version_id}", + start_time=start_time, + end_time=end_time, ) logs = [] - for hit in resp["hits"]["hits"]: + sorted_hits = sorted( + dbm_logs + dbm_dbactuator_logs, + key=lambda x: (x["_source"]["dtEventTimeStamp"], x["_source"]["gseIndex"], x["_source"]["iterationIndex"]), + ) + + for hit in sorted_hits: log = self._format_log(hit["_source"]["log"], hit["_source"]["serverIp"], hit["_index"]) if log: logs.append(