Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(redis): 集群部署dbmon安装后置,避免部署时告警 #3650 #3671

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
logger = logging.getLogger("flow")


def ProxyBatchInstallAtomJob(root_id, ticket_data, act_kwargs: ActKwargs, param: Dict) -> SubBuilder:
def ProxyBatchInstallAtomJob(
root_id, ticket_data, act_kwargs: ActKwargs, param: Dict, dbmon_install: True
) -> SubBuilder:
"""
### SubBuilder: Proxy安装原子任务
act_kwargs.cluster = {
Expand Down Expand Up @@ -86,20 +88,21 @@ def ProxyBatchInstallAtomJob(root_id, ticket_data, act_kwargs: ActKwargs, param:
kwargs=asdict(act_kwargs),
)

sub_pipeline.add_act(
act_name=_("Proxy-002-{}-安装backup-client工具").format(exec_ip),
act_component_code=DownloadBackupClientComponent.code,
kwargs=asdict(
DownloadBackupClientKwargs(
bk_cloud_id=act_kwargs.cluster["bk_cloud_id"],
bk_biz_id=int(act_kwargs.cluster["bk_biz_id"]),
download_host_list=[exec_ip],
),
),
)

# 安装插件
acts_list = []
acts_list.append(
{
"act_name": _("Proxy-002-{}-安装backup-client工具").format(exec_ip),
"act_component_code": DownloadBackupClientComponent.code,
"kwargs": asdict(
DownloadBackupClientKwargs(
bk_cloud_id=act_kwargs.cluster["bk_cloud_id"],
bk_biz_id=int(act_kwargs.cluster["bk_biz_id"]),
download_host_list=[exec_ip],
),
),
}
)
for plugin_name in DEPENDENCIES_PLUGINS:
acts_list.append(
{
Expand Down Expand Up @@ -160,25 +163,26 @@ def ProxyBatchInstallAtomJob(root_id, ticket_data, act_kwargs: ActKwargs, param:
)

# 部署bkdbmon
act_kwargs.cluster["servers"] = [
{
"app": app,
"app_name": app_name,
"bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]),
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"server_ip": exec_ip,
"server_ports": [param["proxy_port"]],
"meta_role": act_kwargs.cluster["machine_type"],
"cluster_domain": act_kwargs.cluster["immute_domain"],
"cluster_name": act_kwargs.cluster["cluster_name"],
"cluster_type": act_kwargs.cluster["cluster_type"],
}
]
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
sub_pipeline.add_act(
act_name=_("Proxy-005-{}-安装监控").format(exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)
if dbmon_install:
act_kwargs.cluster["servers"] = [
{
"app": app,
"app_name": app_name,
"bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]),
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"server_ip": exec_ip,
"server_ports": [param["proxy_port"]],
"meta_role": act_kwargs.cluster["machine_type"],
"cluster_domain": act_kwargs.cluster["immute_domain"],
"cluster_name": act_kwargs.cluster["cluster_name"],
"cluster_type": act_kwargs.cluster["cluster_type"],
}
]
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
sub_pipeline.add_act(
act_name=_("Proxy-005-{}-安装监控").format(exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)

return sub_pipeline.build_sub_process(sub_name=_("Proxy-{}-安装原子任务").format(exec_ip))
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
logger = logging.getLogger("flow")


def RedisBatchInstallAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, param: Dict) -> SubBuilder:
def RedisBatchInstallAtomJob(
root_id, ticket_data, sub_kwargs: ActKwargs, param: Dict, dbmon_install: True
) -> SubBuilder:
"""
### SubBuilder: Redis安装原籽任务
#### 备注: 主从创建的时候, 不创建主从关系(包含元数据 以及真实的同步状态)
Expand Down Expand Up @@ -90,20 +92,21 @@ def RedisBatchInstallAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, param:
kwargs=asdict(act_kwargs),
)

sub_pipeline.add_act(
act_name=_("Redis-{}-安装backup-client工具").format(exec_ip),
act_component_code=DownloadBackupClientComponent.code,
kwargs=asdict(
DownloadBackupClientKwargs(
bk_cloud_id=act_kwargs.cluster["bk_cloud_id"],
bk_biz_id=int(act_kwargs.cluster["bk_biz_id"]),
download_host_list=[exec_ip],
),
),
)

# 安装插件
acts_list = []
acts_list.append(
{
"act_name": _("Redis-{}-安装backup-client工具").format(exec_ip),
"act_component_code": DownloadBackupClientComponent.code,
"kwargs": asdict(
DownloadBackupClientKwargs(
bk_cloud_id=act_kwargs.cluster["bk_cloud_id"],
bk_biz_id=int(act_kwargs.cluster["bk_biz_id"]),
download_host_list=[exec_ip],
),
),
}
)
for plugin_name in DEPENDENCIES_PLUGINS:
acts_list.append(
{
Expand Down Expand Up @@ -156,27 +159,28 @@ def RedisBatchInstallAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, param:
)

# 部署bkdbmon
act_kwargs.cluster["servers"] = [
{
"app": app,
"app_name": app_name,
"bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]),
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"server_ip": exec_ip,
"server_ports": param["ports"],
"meta_role": param["meta_role"],
"cluster_name": act_kwargs.cluster["cluster_name"],
"cluster_type": act_kwargs.cluster["cluster_type"],
"cluster_domain": act_kwargs.cluster["immute_domain"],
"server_shards": param.get("server_shards", {}),
"cache_backup_mode": param.get("cache_backup_mode", ""),
}
]
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
sub_pipeline.add_act(
act_name=_("Redis-{}-安装监控").format(exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)
if dbmon_install:
act_kwargs.cluster["servers"] = [
{
"app": app,
"app_name": app_name,
"bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]),
"bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]),
"server_ip": exec_ip,
"server_ports": param["ports"],
"meta_role": param["meta_role"],
"cluster_name": act_kwargs.cluster["cluster_name"],
"cluster_type": act_kwargs.cluster["cluster_type"],
"cluster_domain": act_kwargs.cluster["immute_domain"],
"server_shards": param.get("server_shards", {}),
"cache_backup_mode": param.get("cache_backup_mode", ""),
}
]
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
sub_pipeline.add_act(
act_name=_("Redis-{}-安装监控").format(exec_ip),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(act_kwargs),
)

return sub_pipeline.build_sub_process(sub_name=_("Redis-{}-安装原子任务").format(exec_ip))
Original file line number Diff line number Diff line change
Expand Up @@ -157,29 +157,15 @@ def redis_cluster_shutdown_flow(self):
access_sub_builder = AccessManagerAtomJob(self.root_id, self.data, act_kwargs, params)
redis_pipeline.add_sub_pipeline(sub_flow=access_sub_builder)

# 卸载dbmon前置
acts_list = []
for ip in proxy_ips:
# proxy执行下架
act_kwargs.exec_ip = ip
act_kwargs.cluster = {
"ip": ip,
"port": cluster_info["proxy_map"][ip],
"operate": DBActuatorTypeEnum.Proxy.value + "_" + RedisActuatorActionEnum.Shutdown.value,
}
act_kwargs.get_redis_payload_func = RedisActPayload.proxy_operate_payload.__name__
acts_list.append(
{
"act_name": _("{}下架proxy实例").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)

for ip in proxy_ips + redis_ips:
act_kwargs.cluster = {
"servers": [
{
"bk_biz_id": str(self.data["bk_biz_id"]),
"bk_cloud_id": act_kwargs.bk_cloud_id,
# TODO 这里暂时不考虑机器复用的情况,直接传空列表。如果后面机器复用的话,这里需要查询出剩余的端口列表
"server_ports": [],
"meta_role": "",
"cluster_domain": cluster_info["domain_name"],
Expand All @@ -198,39 +184,33 @@ def redis_cluster_shutdown_flow(self):
"kwargs": asdict(act_kwargs),
}
)
redis_pipeline.add_parallel_acts(acts_list=acts_list)

for ip in redis_ips:
act_kwargs.cluster = {}
acts_list = []
for ip in proxy_ips:
# proxy执行下架
act_kwargs.exec_ip = ip
act_kwargs.get_redis_payload_func = RedisActPayload.redis_shutdown_payload.__name__
act_kwargs.cluster = {
"ip": ip,
"port": cluster_info["proxy_map"][ip],
"operate": DBActuatorTypeEnum.Proxy.value + "_" + RedisActuatorActionEnum.Shutdown.value,
}
act_kwargs.get_redis_payload_func = RedisActPayload.proxy_operate_payload.__name__
acts_list.append(
{
"act_name": _("{}下架redis实例").format(ip),
"act_name": _("{}下架proxy实例").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)

act_kwargs.cluster = {
"servers": [
{
"bk_biz_id": str(self.data["bk_biz_id"]),
"bk_cloud_id": act_kwargs.bk_cloud_id,
# TODO 这里暂时不考虑机器复用的情况,直接传空列表。如果后面机器复用的话,这里需要查询出剩余的端口列表
"server_ports": [],
"meta_role": "",
"cluster_domain": cluster_info["domain_name"],
"app": app,
"app_name": app_name,
"cluster_name": cluster_info["cluster_name"],
"cluster_type": cluster_info["cluster_type"],
}
]
}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__
for ip in redis_ips:
act_kwargs.cluster = {}
act_kwargs.exec_ip = ip
act_kwargs.get_redis_payload_func = RedisActPayload.redis_shutdown_payload.__name__
acts_list.append(
{
"act_name": _("{}卸载bkdbmon").format(ip),
"act_name": _("{}下架redis实例").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def deploy_redis_instance_flow(self):
"created_by": self.data["created_by"],
"region": rule.get("city_code", ""),
"meta_func_name": RedisDBMeta.redis_instance.__name__,
"disaster_tolerance_level": self.data.get("disaster_tolerance_level", AffinityEnum.CROS_SUBZONE),
"disaster_tolerance_level": rule.get("disaster_tolerance_level", AffinityEnum.CROS_SUBZONE),
}
acts_list.append(
{
Expand Down Expand Up @@ -446,34 +446,6 @@ def deploy_redis_instance_flow(self):
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 部署bkdbmon
acts_list = []
act_kwargs.exec_ip = master_ip
act_kwargs.cluster = {
"ip": master_ip,
}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install_list_new.__name__
acts_list.append(
{
"act_name": _("{}-安装bkdbmon").format(master_ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)
act_kwargs.exec_ip = slave_ip
act_kwargs.cluster = {
"ip": slave_ip,
}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install_list_new.__name__
acts_list.append(
{
"act_name": _("{}-安装bkdbmon").format(slave_ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 添加域名
acts_list = []
for rule in info["ip_install_dict"][master_ip]:
Expand Down Expand Up @@ -507,8 +479,23 @@ def deploy_redis_instance_flow(self):
"kwargs": {**asdict(act_kwargs), **asdict(dns_kwargs)},
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 部署bkdbmon
acts_list = []
for ip in [master_ip, slave_ip]:
act_kwargs.exec_ip = ip
act_kwargs.cluster = {"ip": ip}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install_list_new.__name__
acts_list.append(
{
"act_name": _("{}-安装bkdbmon").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)
sub_pipeline.add_parallel_acts(acts_list=acts_list)

sub_pipelines.append(sub_pipeline.build_sub_process(sub_name=_("Redis主从安装-{}").format(master_ip)))
redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)
redis_pipeline.run_pipeline()
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def deploy_predixy_cluster_flow(self):
params["spec_id"] = int(self.data["resource_spec"]["master"]["id"])
params["spec_config"] = self.data["resource_spec"]["master"]
params["meta_role"] = InstanceRole.REDIS_MASTER.value
sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params)
sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params, dbmon_install=False)
sub_pipelines.append(sub_builder)
for ip in slave_ips:
# 为了解决重复问题,cluster重新赋值一下
Expand All @@ -154,7 +154,7 @@ def deploy_predixy_cluster_flow(self):
params["spec_id"] = int(self.data["resource_spec"]["slave"]["id"])
params["spec_config"] = self.data["resource_spec"]["slave"]
params["meta_role"] = InstanceRole.REDIS_SLAVE.value
sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params)
sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params, dbmon_install=False)
sub_pipelines.append(sub_builder)
redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

Expand Down Expand Up @@ -215,7 +215,7 @@ def deploy_predixy_cluster_flow(self):
act_kwargs.cluster = copy.deepcopy(cluster_tpl)

params["ip"] = ip
sub_builder = ProxyBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params)
sub_builder = ProxyBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params, dbmon_install=False)
sub_pipelines.append(sub_builder)
redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

Expand Down Expand Up @@ -309,4 +309,19 @@ def deploy_predixy_cluster_flow(self):
)
redis_pipeline.add_parallel_acts(acts_list=acts_list)

# dbmon后置安装
acts_list = []
for ip in master_ips + slave_ips + proxy_ips:
act_kwargs.exec_ip = ip
act_kwargs.cluster = {"ip": ip}
act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install_list_new.__name__
acts_list.append(
{
"act_name": _("{}-安装bkdbmon").format(ip),
"act_component_code": ExecuteDBActuatorScriptComponent.code,
"kwargs": asdict(act_kwargs),
}
)
redis_pipeline.add_parallel_acts(acts_list=acts_list)

redis_pipeline.run_pipeline()
Loading
Loading