diff --git a/dbm-ui/backend/ticket/tasks/ticket_tasks.py b/dbm-ui/backend/ticket/tasks/ticket_tasks.py index 9025ffaf21..2ec807e51a 100644 --- a/dbm-ui/backend/ticket/tasks/ticket_tasks.py +++ b/dbm-ui/backend/ticket/tasks/ticket_tasks.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import asyncio import json import logging import operator @@ -30,7 +31,6 @@ from backend.constants import DEFAULT_SYSTEM_USER from backend.db_meta.enums import ClusterType, InstanceInnerRole from backend.db_meta.models import AppCache, Cluster, StorageInstance -from backend.ticket.builders.common.constants import MYSQL_CHECKSUM_TABLE, MySQLDataRepairTriggerMode from backend.ticket.constants import ( TICKET_EXPIRE_DEFAULT_CONFIG, TODO_RUNNING_STATUS, @@ -41,13 +41,13 @@ TicketExpireType, TicketFlowStatus, TicketStatus, - TicketType, TodoType, ) from backend.ticket.exceptions import TicketTaskTriggerException from backend.ticket.flow_manager.inner import InnerFlow from backend.ticket.models.ticket import Flow, Ticket, TicketFlowsConfig -from backend.utils.time import date2str, datetime2str +from backend.utils.batch_request import process_repair_infos +from backend.utils.time import datetime2str logger = logging.getLogger("root") @@ -209,34 +209,7 @@ def auto_create_data_repair_ticket(cls): db_type = ClusterType.cluster_type_to_db_type(cluster.cluster_type) biz__db_type__repair_infos[cluster.bk_biz_id][db_type].extend(ticket_infos) - # 构造修复单据 - for biz, db_type__repair_infos in biz__db_type__repair_infos.items(): - for db_type, repair_infos in db_type__repair_infos.items(): - ticket_details = { - # "非innodb表是否修复"这个参数与校验保持一致,默认为false - "is_sync_non_innodb": False, - "is_ticket_consistent": False, - "checksum_table": MYSQL_CHECKSUM_TABLE, - "trigger_type": MySQLDataRepairTriggerMode.ROUTINE.value, - "start_time": date2str(start_time), - "end_time": date2str(end_time), - "infos": [ - { - "cluster_id": data_info["cluster_id"], - "master": data_info["master"], - "slaves": data_info["slaves"], - } - for data_info in repair_infos - ], - } - ticket_type = getattr(TicketType, f"{db_type.upper()}_DATA_REPAIR") - cls._create_ticket( - ticket_type=ticket_type, - creator=DEFAULT_SYSTEM_USER, - bk_biz_id=biz, - remark=_("集群存在数据不一致,自动创建的数据修复单据"), - details=ticket_details, - ) + asyncio.run(process_repair_infos(biz__db_type__repair_infos, start_time, end_time)) @classmethod def auto_clear_expire_flow(cls): diff --git a/dbm-ui/backend/utils/batch_request.py b/dbm-ui/backend/utils/batch_request.py index fba5e955c4..82c023291e 100644 --- a/dbm-ui/backend/utils/batch_request.py +++ b/dbm-ui/backend/utils/batch_request.py @@ -8,6 +8,8 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import asyncio +import concurrent.futures from concurrent.futures import ThreadPoolExecutor, as_completed from copy import deepcopy from multiprocessing.pool import ThreadPool @@ -16,9 +18,15 @@ import wrapt from django.conf import settings from django.utils.translation import get_language +from django.utils.translation import gettext as _ +from backend.constants import DEFAULT_SYSTEM_USER from backend.core.translation.context import RespectsLanguage +from backend.ticket.builders.common.constants import MYSQL_CHECKSUM_TABLE, MySQLDataRepairTriggerMode +from backend.ticket.constants import TicketType +from backend.ticket.models import Ticket from backend.utils.local import local +from backend.utils.time import date2str QUERY_CMDB_LIMIT = 500 WRITE_CMDB_LIMIT = 500 @@ -71,7 +79,7 @@ def batch_request( limit=QUERY_CMDB_LIMIT, sort=None, split_params=False, - **kwargs + **kwargs, ): """ 异步并发请求接口 @@ -226,3 +234,46 @@ def wrapper(wrapped, instance, args, kwargs): return {"data": data, "total": len(data)} return wrapper + + +def create_ticket_for_repair_info(biz, db_type, repair_infos, start_time, end_time): + ticket_details = { + "is_sync_non_innodb": False, + "is_ticket_consistent": False, + "checksum_table": MYSQL_CHECKSUM_TABLE, + "trigger_type": MySQLDataRepairTriggerMode.ROUTINE.value, + "start_time": date2str(start_time), + "end_time": date2str(end_time), + "infos": [ + { + "cluster_id": data_info["cluster_id"], + "master": data_info["master"], + "slaves": data_info["slaves"], + } + for data_info in repair_infos + ], + } + ticket_type = getattr(TicketType, f"{db_type.upper()}_DATA_REPAIR") + Ticket.create_ticket( + ticket_type=ticket_type, + creator=DEFAULT_SYSTEM_USER, + bk_biz_id=biz, + remark=_("集群存在数据不一致,自动创建的数据修复单据"), + details=ticket_details, + ) + + +async def process_repair_infos(biz__db_type__repair_infos, start_time, end_time): + loop = asyncio.get_running_loop() + + with concurrent.futures.ThreadPoolExecutor() as executor: + tasks = [ + loop.run_in_executor( + executor, create_ticket_for_repair_info, biz, db_type, repair_infos, start_time, end_time + ) + for biz, db_type__repair_infos in biz__db_type__repair_infos.items() + for db_type, repair_infos in db_type__repair_infos.items() + ] + + # 等待所有任务完成 + await asyncio.gather(*tasks)