diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/master_and_slave_switch.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/master_and_slave_switch.py index 4d5386d548..0a849e5499 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/master_and_slave_switch.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/common/master_and_slave_switch.py @@ -14,10 +14,12 @@ from django.utils.crypto import get_random_string from django.utils.translation import ugettext as _ +from backend.constants import IP_PORT_DIVIDER from backend.db_meta.enums import ClusterEntryType, InstanceInnerRole from backend.db_meta.models import Cluster, ClusterEntry from backend.flow.consts import ACCOUNT_PREFIX, AUTH_ADDRESS_DIVIDER, InstanceStatus from backend.flow.engine.bamboo.scene.common.builder import SubBuilder +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import check_sub_flow from backend.flow.plugins.components.collections.mysql.add_user_for_cluster_switch import AddSwitchUserComponent from backend.flow.plugins.components.collections.mysql.clone_user import CloneUserComponent from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent @@ -57,7 +59,6 @@ def master_and_slave_switch(root_id: str, ticket_data: dict, cluster: Cluster, c # 拼接子流程需要全局参数 switch_sub_flow_context = copy.deepcopy(ticket_data) # 把公共参数拼接到子流程的全局只读上下文 - switch_sub_flow_context["is_safe"] = True switch_sub_flow_context["is_dead_master"] = False switch_sub_flow_context["grant_repl"] = True switch_sub_flow_context["locked_switch"] = True @@ -68,6 +69,27 @@ def master_and_slave_switch(root_id: str, ticket_data: dict, cluster: Cluster, c # 针对集群维度声明子流程 cluster_switch_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(switch_sub_flow_context)) + # 切换前做预检测, 克隆主从时客户端连接检测和checksum检验默认检测 + sub_flow = check_sub_flow( + uid=ticket_data["uid"], + root_id=root_id, + cluster=cluster, + is_check_client_conn=True, + is_verify_checksum=True, + check_client_conn_inst=[ + f"{cluster_info['old_master_ip']}{IP_PORT_DIVIDER}{cluster_info['mysql_port']}", + f"{cluster_info['old_slave_ip']}{IP_PORT_DIVIDER}{cluster_info['mysql_port']}", + ], + verify_checksum_tuples=[ + { + "master": f"{cluster_info['old_master_ip']}{IP_PORT_DIVIDER}{cluster_info['mysql_port']}", + "slave": f"{cluster_info['new_master_ip']}{IP_PORT_DIVIDER}{cluster_info['mysql_port']}", + } + ], + ) + if sub_flow: + cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow) + # todo ?授权切换账号 add_sw_user_kwargs = AddSwitchUserKwargs( bk_cloud_id=cluster.bk_cloud_id, diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_master_fail_over.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_master_fail_over.py index fde16fdfd2..3aab2bcae1 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_master_fail_over.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_master_fail_over.py @@ -16,10 +16,15 @@ from django.utils.translation import ugettext as _ from backend.configuration.constants import DBType +from backend.constants import IP_PORT_DIVIDER from backend.db_meta.enums import ClusterType +from backend.db_meta.models import Cluster from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList -from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_surrounding_apps_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import ( + build_surrounding_apps_sub_flow, + check_sub_flow, +) from backend.flow.engine.bamboo.scene.mysql.mysql_master_slave_switch import MySQLMasterSlaveSwitchFlow from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent @@ -91,7 +96,6 @@ def master_fail_over_flow(self): sub_sub_flow_context.pop("infos") # 把公共参数拼接到子流程的全局只读上下文 - sub_sub_flow_context["is_safe"] = info["is_safe"] sub_sub_flow_context["is_dead_master"] = True sub_sub_flow_context["grant_repl"] = True sub_sub_flow_context["locked_switch"] = False @@ -99,7 +103,10 @@ def master_fail_over_flow(self): # 获取对应的集群信息 cluster = MySQLMasterSlaveSwitchFlow.get_cluster_info( - cluster_id=cluster_id, new_master_ip=info["slave_ip"]["ip"], old_master_ip=info["master_ip"]["ip"] + cluster_id=cluster_id, + bk_biz_id=sub_sub_flow_context["bk_biz_id"], + new_master_ip=info["slave_ip"]["ip"], + old_master_ip=info["master_ip"]["ip"], ) # 拼接执行原子任务的活动节点需要的通用的私有参数 @@ -110,6 +117,24 @@ def master_fail_over_flow(self): root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context) ) + # 切换前做预检测, 强切场景理论上不需要对原来master做连接检测 + sub_flow = check_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=Cluster.objects.get(id=cluster_id, bk_biz_id=sub_sub_flow_context["bk_biz_id"]), + is_check_client_conn=sub_sub_flow_context["is_check_process"], + is_verify_checksum=sub_sub_flow_context["is_verify_checksum"], + check_client_conn_inst=[f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}"], + verify_checksum_tuples=[ + { + "master": f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + "slave": f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + } + ], + ) + if sub_flow: + cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow) + # 阶段2 执行故障切换的原子任务 cluster_sw_kwargs.exec_ip = info["slave_ip"]["ip"] cluster_sw_kwargs.get_mysql_payload_func = ( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_master_slave_switch.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_master_slave_switch.py index 1b247690f3..9aaa583a9a 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_master_slave_switch.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_master_slave_switch.py @@ -17,12 +17,17 @@ from django.utils.translation import ugettext as _ from backend.configuration.constants import DBType +from backend.constants import IP_PORT_DIVIDER from backend.db_meta.enums import ClusterEntryType, ClusterType, InstanceInnerRole +from backend.db_meta.exceptions import ClusterNotExistException from backend.db_meta.models import Cluster, ProxyInstance, StorageInstance from backend.flow.consts import ACCOUNT_PREFIX, AUTH_ADDRESS_DIVIDER from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList -from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_surrounding_apps_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import ( + build_surrounding_apps_sub_flow, + check_sub_flow, +) from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException from backend.flow.plugins.components.collections.mysql.add_user_for_cluster_switch import AddSwitchUserComponent from backend.flow.plugins.components.collections.mysql.clone_user import CloneUserComponent @@ -70,14 +75,19 @@ def __init__(self, root_id: str, data: Optional[Dict]): self.data = data @staticmethod - def get_cluster_info(cluster_id: int, new_master_ip: str, old_master_ip: str) -> dict: + def get_cluster_info(cluster_id: int, bk_biz_id: int, new_master_ip: str, old_master_ip: str) -> dict: """ 定义获取切换集群的基本信息的方法 @param cluster_id :集群id + @param bk_biz_id: 业务id @param new_master_ip: 待升主的slave ip @param old_master_ip: 目前的集群的master ip """ - cluster = Cluster.objects.get(id=cluster_id) + try: + cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id) + except Cluster.DoesNotExist: + raise ClusterNotExistException(cluster_id=cluster_id, bk_biz_id=bk_biz_id, message=_("集群不存在")) + proxy_info = ProxyInstance.objects.filter(cluster=cluster).all() new_master = StorageInstance.objects.get(machine__ip=new_master_ip, cluster=cluster) @@ -198,7 +208,6 @@ def master_slave_switch_flow(self): sub_sub_flow_context.pop("infos") # 把公共参数拼接到子流程的全局只读上下文 - sub_sub_flow_context["is_safe"] = info["is_safe"] sub_sub_flow_context["is_dead_master"] = False sub_sub_flow_context["grant_repl"] = True sub_sub_flow_context["locked_switch"] = True @@ -208,7 +217,10 @@ def master_slave_switch_flow(self): # 获取对应的集群信息 cluster = self.get_cluster_info( - cluster_id=cluster_id, new_master_ip=info["slave_ip"]["ip"], old_master_ip=info["master_ip"]["ip"] + cluster_id=cluster_id, + bk_biz_id=sub_sub_flow_context["bk_biz_id"], + new_master_ip=info["slave_ip"]["ip"], + old_master_ip=info["master_ip"]["ip"], ) # 拼接切换执行活动节点需要的通用的私有参数 @@ -219,6 +231,27 @@ def master_slave_switch_flow(self): root_id=self.root_id, data=copy.deepcopy(sub_sub_flow_context) ) + # 切换前做预检测 + sub_flow = check_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=Cluster.objects.get(id=cluster_id, bk_biz_id=sub_sub_flow_context["bk_biz_id"]), + is_check_client_conn=sub_sub_flow_context["is_check_process"], + is_verify_checksum=sub_sub_flow_context["is_verify_checksum"], + check_client_conn_inst=[ + f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + ], + verify_checksum_tuples=[ + { + "master": f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + "slave": f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + } + ], + ) + if sub_flow: + cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow) + # 阶段1 添加切换的临时账号 cluster_switch_sub_pipeline.add_act( act_name=_("旧master添加切换临时账号"), diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_flow.py index 3d95977f3b..9f7b77c7a1 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_flow.py @@ -16,10 +16,15 @@ from django.utils.translation import ugettext as _ from backend.configuration.constants import DBType +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.models import Cluster from backend.flow.consts import ACCOUNT_PREFIX, AUTH_ADDRESS_DIVIDER from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList -from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_surrounding_apps_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import ( + build_surrounding_apps_sub_flow, + check_sub_flow, +) from backend.flow.plugins.components.collections.common.pause import PauseComponent from backend.flow.plugins.components.collections.mysql.add_user_for_cluster_switch import AddSwitchUserComponent from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent @@ -410,7 +415,6 @@ def build_cluster_switch_sub_flow(self, cluster: dict): switch_sub_flow_context.pop("infos") # 把公共参数拼接到子流程的全局只读上下文 - switch_sub_flow_context["is_safe"] = True switch_sub_flow_context["is_dead_master"] = False switch_sub_flow_context["grant_repl"] = True switch_sub_flow_context["locked_switch"] = True @@ -424,6 +428,27 @@ def build_cluster_switch_sub_flow(self, cluster: dict): # 针对集群维度声明子流程 cluster_switch_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(switch_sub_flow_context)) + # 切换前做预检测, 克隆主从时客户端连接检测和checksum检验默认检测 + sub_flow = check_sub_flow( + uid=self.data["uid"], + root_id=self.root_id, + cluster=Cluster.objects.get(id=cluster["cluster_id"], bk_biz_id=cluster["bk_biz_id"]), + is_check_client_conn=True, + is_verify_checksum=True, + check_client_conn_inst=[ + f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + f"{cluster['old_slave_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + ], + verify_checksum_tuples=[ + { + "master": f"{cluster['old_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + "slave": f"{cluster['new_master_ip']}{IP_PORT_DIVIDER}{cluster['mysql_port']}", + } + ], + ) + if sub_flow: + cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow) + add_sw_user_kwargs = AddSwitchUserKwargs( bk_cloud_id=cluster["bk_cloud_id"], user=switch_account, diff --git a/dbm-ui/backend/flow/plugins/components/collections/mysql/verify_checksum.py b/dbm-ui/backend/flow/plugins/components/collections/mysql/verify_checksum.py index 71e150654f..62c4959db9 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/mysql/verify_checksum.py +++ b/dbm-ui/backend/flow/plugins/components/collections/mysql/verify_checksum.py @@ -19,14 +19,19 @@ class VerifyChecksumService(BaseService): """ 定义检测实例checksum结果的活动节点 + 检测checksum结果的规则: + 1: 这里的规则只检验例行checksum结果,对手动checksum结果不做校验 + 2: 如果最近14天内在checksum/checksum_history表没有记录,则认为这对主从长时间没有checksum,报出异常 + 3: 优先检测checksum结果,checksum结果目前是保留一个轮次的检测结果,下一个轮次会清空 + 4: checksum表有可能为空的情况,如果为则再判断checksum_history表的不一致的情况 + 5: checksum_history表一开始是不存在的,故先判断实例是否存在,以免影响结果输出 """ def _execute(self, data, parent_data): - # 每次检测checksum结果目前是保留一个轮次的检测结果,故优先检测checksum表 - # 如果checksum记录没有判断,且checksum没有test记录,说明一轮数据刚刚检验完且清空checksum表, - # 则减少checksum_history最近14天的不一致记录 allow_diff_cnt = 0 + is_exist_checksum_history = True + kwargs = data.get_one_of_inputs("kwargs") checksum_instance_tuples = kwargs["checksum_instance_tuples"] @@ -36,28 +41,63 @@ def _execute(self, data, parent_data): master_ip = t["master"].split(":")[0] master_port = t["master"].split(":")[1] slave_address = t["slave"] + checksum_history_cnt = checksum_history_diff_cnt = 0 + + # 判断checksum_history表是否存在 + is_table_cmd = ( + f"select count(0) as t from information_schema.tables " + f"where TABLE_SCHEMA = '{INFODBA_SCHEMA}' " + f"and TABLE_NAME = '{MYSQL_CHECKSUM_TABLE}' ;" + ) + + res = DRSApi.rpc( + { + "addresses": [slave_address], + "cmds": [is_table_cmd], + "force": False, + "bk_cloud_id": kwargs["bk_cloud_id"], + } + ) + if res[0]["error_msg"]: + error_message_list.append(f"This node [{slave_address}] verify checksum failed: {res[0]['error_msg']}") + continue + + if int(res[0]["cmd_results"][0]["table_data"][0]["t"]) == 0: + # 标记 checksum_history 不存在 + is_exist_checksum_history = False + # 拼接查询校验结果命令集,并分析结果 check_cmds = [ ( f"select count(0) as cnt from {INFODBA_SCHEMA}.checksum" - " where ts > date_sub(now(), interval 14 day)" - ), - ( - f"select count(0) as cnt from {INFODBA_SCHEMA}.{MYSQL_CHECKSUM_TABLE}" - " where ts > date_sub(now(), interval 14 day)" + f" where ts > date_sub(now(), interval 14 day)" + f" and ((master_ip = '{master_ip}' and master_port = {master_port}) or" + f" master_ip = '0.0.0.0')" ), ( f"select count(distinct db, tbl,chunk) as cnt from {INFODBA_SCHEMA}.checksum" " where (this_crc <> master_crc or this_cnt <> master_cnt)" - f" and master_ip = '{master_ip}' and master_port = {master_port}" - ), - ( - f"select count(distinct db, tbl,chunk) as cnt from {INFODBA_SCHEMA}.{MYSQL_CHECKSUM_TABLE}" - " where (this_crc <> master_crc or this_cnt <> master_cnt)" - " and ts > date_sub(now(), interval 14 day)" + "and ts > date_sub(now(), interval 14 day)" f" and master_ip = '{master_ip}' and master_port = {master_port}" ), ] + if is_exist_checksum_history: + check_cmds.append( + ( + f"select count(0) as cnt from {INFODBA_SCHEMA}.{MYSQL_CHECKSUM_TABLE}" + f" where ts > date_sub(now(), interval 14 day)" + f" and ((master_ip = '{master_ip}' and master_port = {master_port}) or" + f" master_ip = '0.0.0.0')" + ) + ) + check_cmds.append( + ( + f"select count(distinct db, tbl,chunk) as cnt from {INFODBA_SCHEMA}.{MYSQL_CHECKSUM_TABLE}" + " where (this_crc <> master_crc or this_cnt <> master_cnt)" + " and ts > date_sub(now(), interval 14 day)" + f" and master_ip = '{master_ip}' and master_port = {master_port}" + ) + ) res = DRSApi.rpc( { @@ -68,23 +108,30 @@ def _execute(self, data, parent_data): } ) + # 执行如果出现异常报错 if res[0]["error_msg"]: error_message_list.append(f"This node [{slave_address}] verify checksum failed: {res[0]['error_msg']}") + continue + # 捕捉返回结果 checksum_cnt = int(res[0]["cmd_results"][0]["table_data"][0]["cnt"]) - checksum_history_cnt = int(res[0]["cmd_results"][1]["table_data"][0]["cnt"]) - checksum_diff_cnt = int(res[0]["cmd_results"][2]["table_data"][0]["cnt"]) - checksum_history_diff_cnt = int(res[0]["cmd_results"][3]["table_data"][0]["cnt"]) - - if not checksum_cnt and not checksum_history_cnt: + checksum_diff_cnt = int(res[0]["cmd_results"][1]["table_data"][0]["cnt"]) + if is_exist_checksum_history: + checksum_history_cnt = int(res[0]["cmd_results"][2]["table_data"][0]["cnt"]) + checksum_history_diff_cnt = int(res[0]["cmd_results"][3]["table_data"][0]["cnt"]) + + # 分析结果 + # 场景1: 如果最近14天内在checksum/checksum_history表没有记录,异常 + if checksum_cnt == 0 and checksum_history_cnt == 0: error_message_list.append( f"This node [{slave_address}] has not queried the verification records of the last 14 days" ) - + # 场景2: checksum不一致结果大于允许范围; 或者checksum表为空且history表不一致结果大于允许范围,异常 elif checksum_diff_cnt > allow_diff_cnt or ( checksum_cnt == 0 and checksum_history_diff_cnt > allow_diff_cnt ): error_message_list.append(f"This node [{slave_address}] has diff chuck in the last 14 days") + # 其余场景:从结果看表示数据一致,正常 else: self.log_info(f"The node [{slave_address}] passed the checkpoint [verify-checksum] !") diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 50a77492ac..a41784f8f6 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -901,7 +901,7 @@ def get_set_backend_toward_slave_payload(self, **kwargs): "general": {"runtime_account": {**mysql_count, **proxy_count}}, "extend": { "host": kwargs["ip"], - "is_safe": self.ticket_data["is_safe"], + "slave_delay_check": self.ticket_data["is_check_delay"], "is_dead_master": self.ticket_data["is_dead_master"], "grant_repl": self.ticket_data["grant_repl"], "locked_switch": self.ticket_data["locked_switch"],