Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): 清理空闲集群下主机的dbm_meta信息,提高监控数据准确性 #1275

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,66 @@

from celery.schedules import crontab

from backend import env
from backend.components import CCApi
from backend.db_meta.models import Cluster, Machine
from backend.db_meta.models.cluster_monitor import SyncFailedMachine
from backend.db_periodic_task.local_tasks.register import register_periodic_task
from backend.db_services.ipchooser.query.resource import ResourceQueryHelper
from backend.dbm_init.constants import CC_HOST_DBM_ATTR

logger = logging.getLogger("celery")


@register_periodic_task(run_every=crontab(minute="*/5"))
def reset_host_dbmeta(bk_biz_id=env.DBA_APP_BK_BIZ_ID, bk_host_ids=None):
"""
重置业务下空闲集群主机的dbm_meta属性
"""
now = datetime.datetime.now()
logger.info("[reset_host_dbmeta] start reset begin: %s", now)

if not bk_host_ids:
machines = ResourceQueryHelper.query_cc_hosts(
{
"bk_biz_id": bk_biz_id,
"bk_inst_id": CCApi.get_biz_internal_module({"bk_biz_id": bk_biz_id}, use_admin=True)["bk_set_id"],
"bk_obj_id": "set",
},
fields=["bk_host_id"],
)["info"]
bk_host_ids = list(map(lambda x: x["bk_host_id"], machines))

# 批量更新接口限制最多500条
step_size = 496
reset_hosts, failed_hosts = [], []
for step in range(len(bk_host_ids) // step_size + 1):
updates = []
for bk_host_id in bk_host_ids[step * step_size : (step + 1) * step_size]:
updates.append({"properties": {CC_HOST_DBM_ATTR: json.dumps([])}, "bk_host_id": bk_host_id})
reset_hosts.extend(updates)

try:
logger.info("[reset_host_dbmeta] batch_update_host: %s", updates)
CCApi.batch_update_host({"update": updates}, use_admin=True)
except Exception as e: # pylint: disable=wildcard-import
failed_hosts.extend(updates)
logger.error("[reset_host_dbmeta] batch reset exception: %s (%s)", updates, e)

# 容错处理:逐个更新,避免批量更新误伤有效ip
for fail_host in failed_hosts:
try:
CCApi.update_host({"bk_host_id": fail_host["bk_host_id"], "data": fail_host["properties"]}, use_admin=True)
except Exception as e: # pylint: disable=wildcard-import
logger.error("[reset_host_dbmeta] single reset error: %s (%s)", fail_host, e)

logger.info(
"[reset_host_dbmeta] finish reset end: %s, update_cnt: %s",
datetime.datetime.now() - now,
len(reset_hosts),
)


@register_periodic_task(run_every=crontab(minute="*/2"))
def update_host_dbmeta(bk_biz_id=None, cluster_id=None, cluster_ips=None, dbm_meta=None):
"""
Expand Down Expand Up @@ -57,12 +108,12 @@ def update_host_dbmeta(bk_biz_id=None, cluster_id=None, cluster_ips=None, dbm_me
machines = machines.filter(ip__in=cluster_ips)

# 批量更新接口限制最多500条,这里取456条
STEP = 456
step_size = 456
updated_hosts, failed_updates = [], []
machine_count = machines.count()
for step in range(machine_count // STEP + 1):
for step in range(machine_count // step_size + 1):
updates = []
for machine in machines[step * STEP : (step + 1) * STEP]:
for machine in machines[step * step_size : (step + 1) * step_size]:
cc_dbm_meta = machine.dbm_meta if dbm_meta is None else dbm_meta
updates.append(
{"properties": {CC_HOST_DBM_ATTR: json.dumps(cc_dbm_meta)}, "bk_host_id": machine.bk_host_id}
Expand Down
Loading