Skip to content

Commit

Permalink
feat: add LocalDataSourceIdentityInfoInitializer
Browse files Browse the repository at this point in the history
  • Loading branch information
narasux committed Sep 19, 2023
1 parent 9b0c5d7 commit 6ffc0b3
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 146 deletions.
96 changes: 96 additions & 0 deletions src/bk-user/bkuser/apps/data_source/initializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import datetime

from django.utils import timezone

from bkuser.apps.data_source.models import DataSource, DataSourceUser, LocalDataSourceIdentityInfo
from bkuser.common.passwd import PasswordGenerator
from bkuser.plugins.local.constants import PasswordGenerateMethod
from bkuser.plugins.local.models import LocalDataSourcePluginConfig


class PasswordProvider:
"""本地数据源用户密码"""

def __init__(self, plugin_cfg: LocalDataSourcePluginConfig):
# assert for mypy type linter
assert plugin_cfg.password_rule is not None
assert plugin_cfg.password_initial is not None

self.generate_method = plugin_cfg.password_initial.generate_method
self.fixed_password = plugin_cfg.password_initial.fixed_password
self.password_generator = PasswordGenerator(plugin_cfg.password_rule.to_rule())

def generate(self) -> str:
if self.generate_method == PasswordGenerateMethod.FIXED and self.fixed_password:
return self.fixed_password

return self.password_generator.generate()


class LocalDataSourceIdentityInfoInitializer:
"""本地数据源用户身份数据初始化"""

BATCH_SIZE = 250

def __init__(self, data_source: DataSource):
self.data_source = data_source
self.plugin_cfg = LocalDataSourcePluginConfig(**data_source.plugin_config)
self.password_provider = PasswordProvider(self.plugin_cfg)

def initialize(self) -> None:
if self._can_skip_initialize():
return

self._init_users_identity_info()

def _can_skip_initialize(self):
"""预先判断能否直接跳过"""

# 不是本地数据源的,不需要初始化
if not self.data_source.is_local:
return True

# 是本地数据源,但是没开启账密登录的,不需要初始化
if not self.plugin_cfg.enable_account_password_login:
return True

return False

def _init_users_identity_info(self):
exists_infos = LocalDataSourceIdentityInfo.objects.filter(data_source=self.data_source)
exists_info_user_ids = exists_infos.objects.values_list("user_id", flat=True)
# NOTE:已经存在的账密信息,不会按照最新规则重新生成!
waiting_init_users = DataSourceUser.objects.filter(
data_source=self.data_source,
).exclude(id__in=exists_info_user_ids)

time_now = timezone.now()
expired_at = self._get_password_expired_at(time_now)

waiting_create_infos = [
LocalDataSourceIdentityInfo(
user=user,
password=self.password_provider.generate(),
password_updated_at=time_now,
password_expired_at=expired_at,
data_source=self.data_source,
username=user.username,
created_at=time_now,
updated_at=time_now,
)
for user in waiting_init_users
]
LocalDataSourceIdentityInfo.objects.bulk_create(waiting_create_infos, batch_size=self.BATCH_SIZE)

def _get_password_expired_at(self, now: datetime.datetime) -> datetime.datetime:
return now + datetime.timedelta(days=self.plugin_cfg.password_rule.valid_time) # type: ignore
7 changes: 7 additions & 0 deletions src/bk-user/bkuser/apps/sync/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
"""
from django.dispatch import receiver

from bkuser.apps.data_source.initializers import LocalDataSourceIdentityInfoInitializer
from bkuser.apps.data_source.models import DataSource
from bkuser.apps.sync.data_models import TenantSyncOptions
from bkuser.apps.sync.managers import TenantSyncManager
from bkuser.apps.sync.signals import post_sync_data_source


@receiver(post_sync_data_source)
def initialize_local_data_source_identity_info(sender, data_source: DataSource, **kwargs):
"""在完成数据源同步后,需要对本地数据源的用户账密信息做初始化"""
LocalDataSourceIdentityInfoInitializer(data_source).initialize()


@receiver(post_sync_data_source)
def sync_tenant_departments_users(sender, data_source: DataSource, **kwargs):
"""同步租户数据(部门 & 用户)"""
Expand Down
4 changes: 2 additions & 2 deletions src/bk-user/bkuser/apps/sync/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def execute(self, context: Optional[Dict[str, Any]] = None) -> DataSourceSyncTas
status=SyncTaskStatus.PENDING,
trigger=self.sync_options.trigger,
operator=self.sync_options.operator,
start_time=timezone.now(),
start_at=timezone.now(),
extra={
"overwrite": self.sync_options.overwrite,
"async_run": self.sync_options.async_run,
Expand Down Expand Up @@ -78,7 +78,7 @@ def execute(self) -> TenantSyncTask:
status=SyncTaskStatus.PENDING,
trigger=self.sync_options.trigger,
operator=self.sync_options.operator,
start_time=timezone.now(),
start_at=timezone.now(),
extra={"async_run": self.sync_options.async_run},
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Generated by Django 3.2.20 on 2023-09-17 12:26
# Generated by Django 3.2.20 on 2023-09-19 01:59

import datetime
from django.db import migrations, models
import django.utils.timezone


class Migration(migrations.Migration):
Expand All @@ -11,24 +12,34 @@ class Migration(migrations.Migration):
]

operations = [
migrations.AlterField(
migrations.RemoveField(
model_name='datasourcesynctask',
name='duration',
field=models.DurationField(default=datetime.timedelta(0), verbose_name='任务持续时间'),
name='start_time',
),
migrations.AlterField(
model_name='datasourcesynctask',
migrations.RemoveField(
model_name='tenantsynctask',
name='start_time',
field=models.DateTimeField(auto_now_add=True, verbose_name='任务开始时间'),
),
migrations.AlterField(
migrations.AddField(
model_name='datasourcesynctask',
name='start_at',
field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now, verbose_name='任务开始时间'),
preserve_default=False,
),
migrations.AddField(
model_name='tenantsynctask',
name='start_at',
field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now, verbose_name='任务开始时间'),
preserve_default=False,
),
migrations.AlterField(
model_name='datasourcesynctask',
name='duration',
field=models.DurationField(default=datetime.timedelta(0), verbose_name='任务持续时间'),
),
migrations.AlterField(
model_name='tenantsynctask',
name='start_time',
field=models.DateTimeField(auto_now_add=True, verbose_name='任务开始时间'),
name='duration',
field=models.DurationField(default=datetime.timedelta(0), verbose_name='任务持续时间'),
),
]
12 changes: 6 additions & 6 deletions src/bk-user/bkuser/apps/sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ class DataSourceSyncTask(TimestampedModel):
status = models.CharField("任务总状态", choices=SyncTaskStatus.get_choices(), max_length=32)
trigger = models.CharField("触发方式", choices=SyncTaskTrigger.get_choices(), max_length=32)
operator = models.CharField("操作人", null=True, blank=True, default="", max_length=128)
start_time = models.DateTimeField("任务开始时间", auto_now_add=True)
start_at = models.DateTimeField("任务开始时间", auto_now_add=True)
duration = models.DurationField("任务持续时间", default=timedelta(seconds=0))
extra = models.JSONField("扩展信息", default=dict)

@property
def summary(self):
# TODO 支持获取任务总结
return "TODO"
# TODO (su) 支持获取任务总结
return "数据同步成功" if self.status == SyncTaskStatus.SUCCESS else "数据同步失败"


class DataSourceSyncStep(TimestampedModel):
Expand Down Expand Up @@ -92,14 +92,14 @@ class TenantSyncTask(TimestampedModel):
status = models.CharField("任务总状态", choices=SyncTaskStatus.get_choices(), max_length=32)
trigger = models.CharField("触发方式", choices=SyncTaskTrigger.get_choices(), max_length=32)
operator = models.CharField("操作人", null=True, blank=True, default="", max_length=128)
start_time = models.DateTimeField("任务开始时间", auto_now_add=True)
start_at = models.DateTimeField("任务开始时间", auto_now_add=True)
duration = models.DurationField("任务持续时间", default=timedelta(seconds=0))
extra = models.JSONField("扩展信息", default=dict)

@property
def summary(self):
# TODO 支持获取任务总结
return "TODO"
# TODO (su) 支持获取任务总结
return "数据同步成功" if self.status == SyncTaskStatus.SUCCESS else "数据同步失败"


class TenantSyncStep(TimestampedModel):
Expand Down
40 changes: 33 additions & 7 deletions src/bk-user/bkuser/apps/sync/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from typing import Any, Dict

from django.db import transaction
from django.utils import timezone

from bkuser.apps.data_source.models import DataSource
from bkuser.apps.sync.constants import SyncTaskStatus
from bkuser.apps.sync.models import DataSourceSyncTask, TenantSyncTask
from bkuser.apps.sync.signals import post_sync_data_source
from bkuser.apps.sync.syncers import (
Expand All @@ -32,7 +34,7 @@ class DataSourceSyncTaskRunner:
"""
数据源同步任务执行器
FIXME (su) 1. 同步异常处理,2. Task 状态更新,3. 后续支持软删除后,需要重构同步逻辑
FIXME (su) 1. 细化同步异常处理,2. 后续支持软删除后,需要重构同步逻辑
"""

def __init__(self, task: DataSourceSyncTask, context: Dict[str, Any]):
Expand All @@ -44,9 +46,15 @@ def __init__(self, task: DataSourceSyncTask, context: Dict[str, Any]):

def run(self):
with transaction.atomic():
self._sync_departments()
self._sync_users()
self._send_signal()
try:
self._sync_departments()
self._sync_users()
self._send_signal()
except Exception:
self._update_task_status(SyncTaskStatus.FAILED)
raise

self._update_task_status(SyncTaskStatus.SUCCESS)

def _initial_plugin(self):
"""初始化数据源插件"""
Expand All @@ -72,12 +80,18 @@ def _send_signal(self):
"""发送数据源同步完成信号,触发后续流程"""
post_sync_data_source.send(sender=self.__class__, data_source=self.data_source)

def _update_task_status(self, status: SyncTaskStatus):
"""任务正常完成后更新 task 状态"""
self.task.status = status
self.task.duration = timezone.now() - self.task.start_at
self.task.save(update_fields=["status", "duration", "updated_at"])


class TenantSyncTaskRunner:
"""
租户数据同步任务执行器
FIXME (su) 1. 同步异常处理,2. Task 状态更新,3. 后续支持软删除后,需要重构同步逻辑
FIXME (su) 1. 细化同步异常处理,2. 后续支持软删除后,需要重构同步逻辑
"""

def __init__(self, task: TenantSyncTask):
Expand All @@ -87,8 +101,14 @@ def __init__(self, task: TenantSyncTask):

def run(self):
with transaction.atomic():
self._sync_departments()
self._sync_users()
try:
self._sync_departments()
self._sync_users()
except Exception:
self._update_task_status(SyncTaskStatus.FAILED)
raise

self._update_task_status(SyncTaskStatus.SUCCESS)

def _sync_departments(self):
"""同步部门信息"""
Expand All @@ -97,3 +117,9 @@ def _sync_departments(self):
def _sync_users(self):
"""同步用户信息"""
TenantUserSyncer(self.task, self.data_source, self.tenant).sync()

def _update_task_status(self, status: SyncTaskStatus):
"""任务正常完成后更新 task 状态"""
self.task.status = status
self.task.duration = timezone.now() - self.task.start_at
self.task.save(update_fields=["status", "duration", "updated_at"])
Loading

0 comments on commit 6ffc0b3

Please sign in to comment.