Skip to content

Commit

Permalink
fix(redis): 自愈修复 #2674
Browse files Browse the repository at this point in the history
  • Loading branch information
xiepaup committed Dec 19, 2023
1 parent fa3a339 commit 5568f96
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 61 deletions.
2 changes: 1 addition & 1 deletion dbm-ui/backend/db_meta/api/cluster/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def query_cluster_by_hosts(hosts: List):
"machine_type": storage.machine.machine_type,
"cs_ports": [
cstorage.port
for cstorage in cluster.storageinstance_set.filter(machine__ip=storage.machine.ip)
for cstorage in cluster.proxyinstance_set.filter(machine__ip=storage.machine.ip)
],
"cluster": cluster.immute_domain,
"cluster_name": cluster.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
from backend.utils.time import datetime2str

logger = logging.getLogger("celery")
logger = logging.getLogger("root")


@register_periodic_task(run_every=crontab(minute="*/1"))
Expand Down
58 changes: 9 additions & 49 deletions dbm-ui/backend/db_services/redis/autofix/bill.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

from backend.configuration.constants import DBType
from backend.configuration.models.dba import DBAdministrator
from backend.db_meta.enums import InstanceInnerRole, MachineType
from backend.db_meta.models import Cluster
from backend.db_meta.enums import MachineType
from backend.db_meta.models import Machine
from backend.db_services.dbbase.constants import IpSource
from backend.ticket.builders import BuilderFactory
from backend.ticket.constants import TicketStatus, TicketType
Expand All @@ -35,23 +35,20 @@

def generate_autofix_ticket(fault_clusters: QuerySet):
for cluster in fault_clusters:
cluster_zones = load_cluster_arch_zone(cluster)
fault_machines = json.loads(cluster.fault_machines)
# {"instance_type": swiched_host.instance_type, "ip": swiched_host.ip}
proxy_distrubt, redis_proxies, redis_slaves = cluster_zones["proxy_distrubt"], [], []
redis_proxies, redis_slaves = [], []
for fault_machine in fault_machines:
fault_ip = fault_machine["ip"]
fault_obj = Machine.objects.filter(ip=fault_ip, bk_biz_id=cluster.bk_biz_id).get()
fault_info = {"ip": fault_ip, "spec_id": fault_obj.spec_id, "bk_sub_zone": fault_obj.bk_sub_zone}
if fault_machine["instance_type"] in [MachineType.TWEMPROXY.value, MachineType.PREDIXY.value]:
proxy_zone = cluster_zones["proxy_zones"][fault_ip]
proxy_distrubt[proxy_zone["bk_sub_zone_id"]] -= 1
redis_proxies.append({"ip": fault_ip, "spec_id": proxy_zone["spec_id"]})
redis_proxies.append(fault_info)
else:
zone_info = cluster_zones["storage_zones"][fault_ip]
redis_slaves.append({"ip": fault_ip, "spec_id": zone_info["spec_id"]})
redis_slaves.append(fault_info)

logger.info(
"cluster summary fault {} proxies:{},curr available zone distrubt:{},storages:{}".format(
cluster.immute_domain, len(redis_proxies), proxy_distrubt, len(redis_slaves)
"cluster_summary_fault {}; proxies:{}, storages:{}".format(
cluster.immute_domain, redis_proxies, redis_slaves
)
)
create_ticket(cluster, redis_proxies, redis_slaves)
Expand Down Expand Up @@ -95,40 +92,3 @@ def create_ticket(cluster: RedisAutofixCore, redis_proxies: list, redis_slaves:
cluster.save(update_fields=["ticket_id", "status_version", "deal_status", "update_at"])

TicketFlowManager(ticket=ticket).run_next_flow()


def load_cluster_arch_zone(cluster: RedisAutofixCore):
cluster_obj = Cluster.objects.get(bk_biz_id=cluster.bk_biz_id, id=cluster.cluster_id)
# 构造园区分布 bk_sub_zone: bk_sub_zone_id:
proxy_zones, proxy_distrubt = {}, {}
for proxy in cluster_obj.proxyinstance_set.all():
proxy_zones[proxy.machine.ip] = {
"spec_id": proxy.machine.spec_id,
"proxy_ip": proxy.machine.ip,
"bk_sub_zone": proxy.machine.bk_sub_zone,
"bk_sub_zone_id": proxy.machine.bk_sub_zone_id,
}
if not proxy_distrubt.get(proxy.machine.bk_sub_zone_id):
proxy_distrubt[proxy.machine.bk_sub_zone_id] = 0
proxy_distrubt[proxy.machine.bk_sub_zone_id] += 1

storage_zones = {}
for master in cluster_obj.storageinstance_set.filter(instance_inner_role=InstanceInnerRole.MASTER.value):
slave_obj = master.as_ejector.get().receiver
slave_ip = slave_obj.machine.ip
storage_zones[slave_ip] = {
"slave_ip": slave_ip,
"bk_sub_zone": slave_obj.machine.bk_sub_zone,
"bk_sub_zone_id": slave_obj.machine.bk_sub_zone_id,
"spec_id": slave_obj.machine.spec_id,
"master_ip": master.machine.ip,
"master_zone": master.machine.bk_sub_zone,
"master_zone_id": master.machine.bk_sub_zone_id,
}

return {
"storage_zones": storage_zones,
"proxy_zones": proxy_zones,
"region": cluster_obj.region,
"proxy_distrubt": proxy_distrubt,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 3.2.19 on 2023-12-19 07:32

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("autofix", "0002_auto_20231120_1955"),
]

operations = [
migrations.AlterIndexTogether(
name="redisignoreautofix",
index_together={("bk_biz_id", "ip")},
),
]
1 change: 1 addition & 0 deletions dbm-ui/backend/db_services/redis/autofix/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class RedisIgnoreAutofix(AuditedModel):

class Meta:
db_table = "tb_tendis_autofix_ignore"
index_together = [("bk_biz_id", "ip")]


class NodeUpdateTaskStatus(str, StructuredEnum):
Expand Down
23 changes: 14 additions & 9 deletions dbm-ui/backend/db_services/redis/autofix/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .enums import AutofixItem, AutofixStatus, DBHASwitchResult
from .models import RedisAutofixCore, RedisAutofixCtl, RedisIgnoreAutofix

logger = logging.getLogger("celery")
logger = logging.getLogger("root")


# 从切换队列拿到切换实例列表, 然后聚会成故障机器维度
Expand All @@ -50,14 +50,17 @@ def watcher_get_by_hosts() -> (int, dict):

# 遍历切换队列,聚合故障机
switch_hosts, batch_small_id = {}, SWITCH_SMALL
if len(switch_queues) == 0:
return switch_id, switch_hosts

for switch_inst in switch_queues:
switch_ip, switch_id = switch_inst["ip"], int(switch_inst["uid"]) # uid / sw_id
logger.info(
"get new switched_fault_instance {}:{}, uid {}, db_type: {}:{}".format(
switch_ip, switch_inst["port"], switch_id, switch_inst["db_type"], switch_inst["db_role"]
)
)
if not switch_hosts.get(switch_ip):
logger.info(
"get new switched_fault_ip {}:{}, uid {}, db_type: {}:{}".format(
switch_ip, switch_inst["port"], switch_id, switch_inst["db_type"], switch_inst["db_role"]
)
)
# 忽略没有集群信息、或者多集群共用的情况
cluster = query_cluster_by_hosts([switch_ip]) # return: [{},{}]
if not cluster:
Expand All @@ -84,7 +87,9 @@ def watcher_get_by_hosts() -> (int, dict):
)
current_host = switch_hosts[switch_ip]
current_host.switch_ports.append(switch_inst["port"])
current_host.sw_result[switch_inst["status"]] = switch_inst["port"]
if not current_host.sw_result.get(switch_inst["status"]):
current_host.sw_result[switch_inst["status"]] = []
current_host.sw_result[switch_inst["status"]].append(switch_inst["port"])

# 这台机器的Max值
if switch_id > current_host.sw_max_id:
Expand Down Expand Up @@ -230,7 +235,7 @@ def save_swithed_host_by_cluster(batch_small: int, switch_hosts: Dict):

# 把需要忽略自愈的保存起来
def save_ignore_host(switched_host: RedisSwitchHost, msg):
RedisIgnoreAutofix.objects.create(
RedisIgnoreAutofix.objects.update_or_create(
bk_cloud_id=DEFAULT_BK_CLOUD_ID,
bk_biz_id=switched_host.bk_biz_id,
cluster_id=switched_host.cluster_id,
Expand All @@ -245,4 +250,4 @@ def save_ignore_host(switched_host: RedisSwitchHost, msg):
sw_max_id=switched_host.sw_max_id,
sw_result=json.dumps(switched_host.sw_result),
ignore_msg=msg,
).save()
)
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def RedisClusterSlaveReplaceJob(root_id, ticket_data, sub_kwargs: ActKwargs, sla
# "Old": {"ip": "2.2.a.4", "bk_cloud_id": 0, "bk_host_id": 123},
old_slave = replace_link["ip"]
params = {
"ignore_ips": act_kwargs.cluster["slave_master_map"][old_slave],
"ignore_ips": [act_kwargs.cluster["slave_master_map"][old_slave]],
"ip": old_slave,
"ports": act_kwargs.cluster["slave_ports"][old_slave],
}
Expand Down

0 comments on commit 5568f96

Please sign in to comment.