Skip to content

Commit

Permalink
perf(redis): 集群部署dbmon安装后置,避免部署时告警 #3650
Browse files Browse the repository at this point in the history
  • Loading branch information
OMG-By committed Mar 25, 2024
1 parent 0e6b44e commit ab94c6a
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
logger = logging.getLogger("flow")


def ProxyBatchInstallAtomJob(root_id, ticket_data, act_kwargs: ActKwargs, param: Dict) -> SubBuilder:
def ProxyBatchInstallAtomJob(
root_id, ticket_data, act_kwargs: ActKwargs, param: Dict, dbmon_install: True
) -> SubBuilder:
"""
### SubBuilder: Proxy安装原子任务
act_kwargs.cluster = {
Expand Down Expand Up @@ -86,20 +88,21 @@ def ProxyBatchInstallAtomJob(root_id, ticket_data, act_kwargs: ActKwargs, param:
kwargs=asdict(act_kwargs),
)

sub_pipeline.add_act(
act_name=_("Proxy-002-{}-安装backup-client工具").format(exec_ip),
act_component_code=DownloadBackupClientComponent.code,
kwargs=asdict(
DownloadBackupClientKwargs(
bk_cloud_id=act_kwargs.cluster["bk_cloud_id"],
bk_biz_id=int(act_kwargs.cluster["bk_biz_id"]),
download_host_list=[exec_ip],
),
),
)

# 安装插件
acts_list = []
acts_list.append(
{
"act_name": _("Proxy-002-{}-安装backup-client工具").format(exec_ip),
"act_component_code": DownloadBackupClientComponent.code,
"kwargs": asdict(
DownloadBackupClientKwargs(
bk_cloud_id=act_kwargs.cluster["bk_cloud_id"],
bk_biz_id=int(act_kwargs.cluster["bk_biz_id"]),
download_host_list=[exec_ip],
),
),
}
)
for plugin_name in DEPENDENCIES_PLUGINS:
acts_list.append(
{
Expand Down Expand Up @@ -160,25 +163,26 @@ def ProxyBatchInstallAtomJob(root_id, ticket_data, act_kwargs: ActKwargs, param:
)

# 部署bkdbmon
act_kwargs.cluster["servers"] = [
{
"app": app,
"app_name": app_name,
"bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]),
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"server_ip": exec_ip,
"server_ports": [param["proxy_port"]],
"meta_role": act_kwargs.cluster["machine_type"],
"cluster_domain": act_kwargs.cluster["immute_domain"],
"cluster_name": act_kwargs.cluster["cluster_name"],
"cluster_type": act_kwargs.cluster["cluster_type"],
}
]
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
sub_pipeline.add_act(
act_name=_("Proxy-005-{}-安装监控").format(exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)
if dbmon_install:
act_kwargs.cluster["servers"] = [
{
"app": app,
"app_name": app_name,
"bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]),
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"server_ip": exec_ip,
"server_ports": [param["proxy_port"]],
"meta_role": act_kwargs.cluster["machine_type"],
"cluster_domain": act_kwargs.cluster["immute_domain"],
"cluster_name": act_kwargs.cluster["cluster_name"],
"cluster_type": act_kwargs.cluster["cluster_type"],
}
]
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
sub_pipeline.add_act(
act_name=_("Proxy-005-{}-安装监控").format(exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)

return sub_pipeline.build_sub_process(sub_name=_("Proxy-{}-安装原子任务").format(exec_ip))
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
logger = logging.getLogger("flow")


def RedisBatchInstallAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, param: Dict) -> SubBuilder:
def RedisBatchInstallAtomJob(
root_id, ticket_data, sub_kwargs: ActKwargs, param: Dict, dbmon_install: True
) -> SubBuilder:
"""
### SubBuilder: Redis安装原籽任务
#### 备注: 主从创建的时候, 不创建主从关系(包含元数据 以及真实的同步状态)
Expand Down Expand Up @@ -90,20 +92,21 @@ def RedisBatchInstallAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, param:
kwargs=asdict(act_kwargs),
)

sub_pipeline.add_act(
act_name=_("Redis-{}-安装backup-client工具").format(exec_ip),
act_component_code=DownloadBackupClientComponent.code,
kwargs=asdict(
DownloadBackupClientKwargs(
bk_cloud_id=act_kwargs.cluster["bk_cloud_id"],
bk_biz_id=int(act_kwargs.cluster["bk_biz_id"]),
download_host_list=[exec_ip],
),
),
)

# 安装插件
acts_list = []
acts_list.append(
{
"act_name": _("Redis-{}-安装backup-client工具").format(exec_ip),
"act_component_code": DownloadBackupClientComponent.code,
"kwargs": asdict(
DownloadBackupClientKwargs(
bk_cloud_id=act_kwargs.cluster["bk_cloud_id"],
bk_biz_id=int(act_kwargs.cluster["bk_biz_id"]),
download_host_list=[exec_ip],
),
),
}
)
for plugin_name in DEPENDENCIES_PLUGINS:
acts_list.append(
{
Expand Down Expand Up @@ -156,27 +159,28 @@ def RedisBatchInstallAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, param:
)

# 部署bkdbmon
act_kwargs.cluster["servers"] = [
{
"app": app,
"app_name": app_name,
"bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]),
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"server_ip": exec_ip,
"server_ports": param["ports"],
"meta_role": param["meta_role"],
"cluster_name": act_kwargs.cluster["cluster_name"],
"cluster_type": act_kwargs.cluster["cluster_type"],
"cluster_domain": act_kwargs.cluster["immute_domain"],
"server_shards": param.get("server_shards", {}),
"cache_backup_mode": param.get("cache_backup_mode", ""),
}
]
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
sub_pipeline.add_act(
act_name=_("Redis-{}-安装监控").format(exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)
if dbmon_install:
act_kwargs.cluster["servers"] = [
{
"app": app,
"app_name": app_name,
"bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]),
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"server_ip": exec_ip,
"server_ports": param["ports"],
"meta_role": param["meta_role"],
"cluster_name": act_kwargs.cluster["cluster_name"],
"cluster_type": act_kwargs.cluster["cluster_type"],
"cluster_domain": act_kwargs.cluster["immute_domain"],
"server_shards": param.get("server_shards", {}),
"cache_backup_mode": param.get("cache_backup_mode", ""),
}
]
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
sub_pipeline.add_act(
act_name=_("Redis-{}-安装监控").format(exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)

return sub_pipeline.build_sub_process(sub_name=_("Redis-{}-安装原子任务").format(exec_ip))
Original file line number Diff line number Diff line change
Expand Up @@ -157,29 +157,15 @@ def redis_cluster_shutdown_flow(self):
access_sub_builder = AccessManagerAtomJob(self.root_id, self.data, act_kwargs, params)
redis_pipeline.add_sub_pipeline(sub_flow=access_sub_builder)

# 卸载dbmon前置
acts_list = []
for ip in proxy_ips:
# proxy执行下架
act_kwargs.exec_ip = ip
act_kwargs.cluster = {
"ip": ip,
"port": cluster_info["proxy_map"][ip],
"operate": DBActuatorTypeEnum.Proxy.value + "_" + RedisActuatorActionEnum.Shutdown.value,
}
act_kwargs.get_redis_payload_func = RedisActPayload.proxy_operate_payload.__name__
acts_list.append(
{
"act_name": _("{}下架proxy实例").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)

for ip in proxy_ips + redis_ips:
act_kwargs.cluster = {
"servers": [
{
"bk_biz_id": str(self.data["bk_biz_id"]),
"bk_cloud_id": act_kwargs.bk_cloud_id,
# TODO 这里暂时不考虑机器复用的情况,直接传空列表。如果后面机器复用的话,这里需要查询出剩余的端口列表
"server_ports": [],
"meta_role": "",
"cluster_domain": cluster_info["domain_name"],
Expand All @@ -198,39 +184,33 @@ def redis_cluster_shutdown_flow(self):
"kwargs": asdict(act_kwargs),
}
)
redis_pipeline.add_parallel_acts(acts_list=acts_list)

for ip in redis_ips:
act_kwargs.cluster = {}
acts_list = []
for ip in proxy_ips:
# proxy执行下架
act_kwargs.exec_ip = ip
act_kwargs.get_redis_payload_func = RedisActPayload.redis_shutdown_payload.__name__
act_kwargs.cluster = {
"ip": ip,
"port": cluster_info["proxy_map"][ip],
"operate": DBActuatorTypeEnum.Proxy.value + "_" + RedisActuatorActionEnum.Shutdown.value,
}
act_kwargs.get_redis_payload_func = RedisActPayload.proxy_operate_payload.__name__
acts_list.append(
{
"act_name": _("{}下架redis实例").format(ip),
"act_name": _("{}下架proxy实例").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)

act_kwargs.cluster = {
"servers": [
{
"bk_biz_id": str(self.data["bk_biz_id"]),
"bk_cloud_id": act_kwargs.bk_cloud_id,
# TODO 这里暂时不考虑机器复用的情况,直接传空列表。如果后面机器复用的话,这里需要查询出剩余的端口列表
"server_ports": [],
"meta_role": "",
"cluster_domain": cluster_info["domain_name"],
"app": app,
"app_name": app_name,
"cluster_name": cluster_info["cluster_name"],
"cluster_type": cluster_info["cluster_type"],
}
]
}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
for ip in redis_ips:
act_kwargs.cluster = {}
act_kwargs.exec_ip = ip
act_kwargs.get_redis_payload_func = RedisActPayload.redis_shutdown_payload.__name__
acts_list.append(
{
"act_name": _("{}卸载bkdbmon").format(ip),
"act_name": _("{}下架redis实例").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def deploy_redis_instance_flow(self):
"created_by": self.data["created_by"],
"region": rule.get("city_code", ""),
"meta_func_name": RedisDBMeta.redis_instance.__name__,
"disaster_tolerance_level": self.data.get("disaster_tolerance_level", AffinityEnum.CROS_SUBZONE),
"disaster_tolerance_level": rule.get("disaster_tolerance_level", AffinityEnum.CROS_SUBZONE),
}
acts_list.append(
{
Expand Down Expand Up @@ -446,34 +446,6 @@ def deploy_redis_instance_flow(self):
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 部署bkdbmon
acts_list = []
act_kwargs.exec_ip = master_ip
act_kwargs.cluster = {
"ip": master_ip,
}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install_list_new.__name__
acts_list.append(
{
"act_name": _("{}-安装bkdbmon").format(master_ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)
act_kwargs.exec_ip = slave_ip
act_kwargs.cluster = {
"ip": slave_ip,
}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install_list_new.__name__
acts_list.append(
{
"act_name": _("{}-安装bkdbmon").format(slave_ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 添加域名
acts_list = []
for rule in info["ip_install_dict"][master_ip]:
Expand Down Expand Up @@ -507,8 +479,23 @@ def deploy_redis_instance_flow(self):
"kwargs": {**asdict(act_kwargs), **asdict(dns_kwargs)},
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 部署bkdbmon
acts_list = []
for ip in [master_ip, slave_ip]:
act_kwargs.exec_ip = ip
act_kwargs.cluster = {"ip": ip}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install_list_new.__name__
acts_list.append(
{
"act_name": _("{}-安装bkdbmon").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

sub_pipelines.append(sub_pipeline.build_sub_process(sub_name=_("Redis主从安装-{}").format(master_ip)))
redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)
redis_pipeline.run_pipeline()
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def deploy_predixy_cluster_flow(self):
params["spec_id"] = int(self.data["resource_spec"]["master"]["id"])
params["spec_config"] = self.data["resource_spec"]["master"]
params["meta_role"] = InstanceRole.REDIS_MASTER.value
sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params)
sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params, dbmon_install=False)
sub_pipelines.append(sub_builder)
for ip in slave_ips:
# 为了解决重复问题,cluster重新赋值一下
Expand All @@ -154,7 +154,7 @@ def deploy_predixy_cluster_flow(self):
params["spec_id"] = int(self.data["resource_spec"]["slave"]["id"])
params["spec_config"] = self.data["resource_spec"]["slave"]
params["meta_role"] = InstanceRole.REDIS_SLAVE.value
sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params)
sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params, dbmon_install=False)
sub_pipelines.append(sub_builder)
redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

Expand Down Expand Up @@ -215,7 +215,7 @@ def deploy_predixy_cluster_flow(self):
act_kwargs.cluster = copy.deepcopy(cluster_tpl)

params["ip"] = ip
sub_builder = ProxyBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params)
sub_builder = ProxyBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params, dbmon_install=False)
sub_pipelines.append(sub_builder)
redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

Expand Down Expand Up @@ -309,4 +309,19 @@ def deploy_predixy_cluster_flow(self):
)
redis_pipeline.add_parallel_acts(acts_list=acts_list)

# dbmon后置安装
acts_list = []
for ip in master_ips + slave_ips + proxy_ips:
act_kwargs.exec_ip = ip
act_kwargs.cluster = {"ip": ip}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install_list_new.__name__
acts_list.append(
{
"act_name": _("{}-安装bkdbmon").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)
redis_pipeline.add_parallel_acts(acts_list=acts_list)

redis_pipeline.run_pipeline()
Loading

0 comments on commit ab94c6a

Please sign in to comment.