diff --git a/backend/apps/core/admin.py b/backend/apps/core/admin.py index c78b6a53..fe65b8c1 100644 --- a/backend/apps/core/admin.py +++ b/backend/apps/core/admin.py @@ -1,4 +1,15 @@ # -*- coding: utf-8 -*- -from django.contrib import admin # noqa +from django.contrib import admin -# Register your models here. +from .models import TaskExecution + + +class TaskExecutionAdmin(admin.ModelAdmin): + list_display = ["task_name", "execution_time", "status", "duration"] + readonly_fields = ["task_name", "execution_time", "duration", "status", "result", "error"] + + def has_add_permission(self, request): + return False + + +admin.site.register(TaskExecution, TaskExecutionAdmin) diff --git a/backend/apps/core/management/commands/fetch_metabase.py b/backend/apps/core/management/commands/fetch_metabase.py new file mode 100644 index 00000000..b4447d5d --- /dev/null +++ b/backend/apps/core/management/commands/fetch_metabase.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +import json +import os +from os import getenv + +import requests +from django.core.management.base import BaseCommand +from tqdm import tqdm + +BASE_URL = "https://perguntas.basedosdados.org" +METABASE_USER = getenv("METABASE_USER") +METABASE_PASSWORD = getenv("METABASE_PASSWORD") + + +class Table: + def __init__(self, id, name, fields): + self.id = id + self.name = name + self.fields = fields + + def __str__(self): + return self.name + + +class Command(BaseCommand): + help = "Fetch data from Metabase" + + def authenticate(self) -> str: + headers = { + "Content-Type": "application/json", + } + + json_data = { + "username": METABASE_USER, + "password": METABASE_PASSWORD, + } + + response = requests.post(BASE_URL + "/api/session", headers=headers, json=json_data).json() + + if "id" not in response: + self.stderr.write("Falha na autenticação.") + return "" + + return response["id"] + + def get_headers(self, token: str): + return { + "Content-Type": "application/json", + "X-Metabase-Session": token, + } + + def get_databases(self, token: str): + headers = self.get_headers(token) + + response = requests.get(BASE_URL + "/api/database", headers=headers) + + return response.json()["data"] + + def get_tables(self, token: str, database_id: int): + headers = self.get_headers(token) + + response = requests.get(BASE_URL + f"/api/database/{database_id}/metadata", headers=headers) + + json_data = response.json() + tables = [] + + for table in json_data["tables"]: + if table["name"].startswith("account"): + continue + + fields = [field["name"] for field in table["fields"]] + tables.append(Table(table["id"], table["name"], fields)) + + return tables + + def get_table_data(self, token: str, database_id: int, table: Table): + headers = self.get_headers(token) + fields = [f'"{field}"' for field in table.fields] + formated_field = ", ".join(fields) + query = f'SELECT {formated_field} FROM "{table.name}"' + + payload = { + "database": database_id, + "native": { + "query": query, + }, + "type": "native", + } + + response = requests.post(BASE_URL + "/api/dataset", headers=headers, json=payload) + + if response.status_code != 202: + return + + response_json = response.json() + rows = [] + + for row in response_json["data"]["rows"]: + instance = {} + for i, field in enumerate(table.fields): + instance[field] = row[i] + + rows.append(instance) + + if len(rows) > 0: + self.save_data(table.name, json.dumps(rows, ensure_ascii=False, indent=4)) + else: + self.stdout.write(self.style.WARNING(f"No data found for {str(table)}")) + self.stdout.write(self.style.WARNING(query)) + + def clean_data(self): + directory = os.path.join(os.getcwd(), "metabase_data") + + if os.path.exists(directory) and os.path.isdir(directory): + for filename in os.listdir(directory): + file_path = os.path.join(directory, filename) + os.remove(file_path) + + def save_data(self, table_name, data): + directory = os.path.join(os.getcwd(), "metabase_data") + + if not os.path.exists(directory): + os.makedirs(directory) + + file_path = os.path.join(directory, f"{table_name}.json") + + with open(file_path, "w", encoding="utf-8") as file: + file.write(data) + + def handle(self, *args, **kwargs): + token = self.authenticate() + + databases = self.get_databases(token) + + database = next( + (db for db in databases if db["name"] == "Metadados"), + None, + ) + + self.clean_data() + tables = self.get_tables(token, database["id"]) + + for table in tqdm(tables, desc="Fetching tables"): + self.stdout.write(f"Fetching data from {str(table)}") + self.get_table_data(token, database["id"], table) + + self.stdout.write(self.style.SUCCESS("Data fetched successfully.")) diff --git a/backend/apps/core/management/commands/populate.py b/backend/apps/core/management/commands/populate.py new file mode 100644 index 00000000..3fd1ea46 --- /dev/null +++ b/backend/apps/core/management/commands/populate.py @@ -0,0 +1,303 @@ +# -*- coding: utf-8 -*- +import json +import os + +from django.apps import apps +from django.core.management.base import BaseCommand +from django.db import models, transaction +from tqdm import tqdm + + +class BulkUpdate: + """ + Bulk update model instances + """ + + instances_by_model = {} + + def add(self, instance, field_name): + model_name = instance.__class__.__name__ + namespace = f"{model_name}.{field_name}" + + if namespace not in self.instances_by_model: + self.instances_by_model[namespace] = [] + + self.instances_by_model[namespace].append(instance) + + def bulk_update(self): + for namespace, instances in self.instances_by_model.items(): + model = instances[0].__class__ + field_name = namespace.split(".")[1] + + # Bulk update in chunks of 2000 instances + for i in range(0, len(instances), 2000): + chunk = instances[i : i + 2000] + model.objects.bulk_update(chunk, [field_name]) + + +class References: + """ + Store references between legacy and new ids + """ + + tables = {} + + def add(self, table, legacy_id, new_id): + if table not in self.tables: + self.tables[table] = {} + + self.tables[table][legacy_id] = new_id + + def get(self, table, legacy_id): + if table not in self.tables: + return None + + return self.tables[table].get(legacy_id) + + +class Layer: + """ + Store models in a layer + """ + + models = [] + depth = 1 + + def print(self, context): + for model in self.models: + context.stdout.write(context.style.SUCCESS(f"{'-' * self.depth * 2} {model.__name__}")) + for field in model._meta.get_fields(): + if isinstance(field, models.ForeignKey): + name = f"{field.name} -> {field.related_model.__name__}" + + if field.null: + context.stdout.write(f"{' ' * self.depth * 2} {name} (nullable)") + else: + context.stdout.write( + context.style.WARNING(f"{' ' * self.depth * 2} {name}") + ) + + +class Command(BaseCommand): + help = "Populate database with initial data" + + def get_all_files(self): + directory = os.path.join(os.getcwd(), "metabase_data") + files = os.listdir(directory) + self.files = files + + def load_table_data(self, table_name): + directory = os.path.join(os.getcwd(), "metabase_data") + with open(f"{directory}/{table_name}.json") as f: + data = json.load(f) + + return data + + def model_has_data(self, model_name): + if f"{model_name}.json" in self.files: + return True + return False + + def get_models_without_foreign_keys(self, models_to_populate): + models_without_foreign_keys = [] + + for model in models_to_populate: + has_foreign_key = False + + for field in model._meta.get_fields(): + if isinstance(field, models.ForeignKey): + has_foreign_key = True + break + + if not has_foreign_key: + models_without_foreign_keys.append(model) + + return models_without_foreign_keys + + def get_models_that_depends_on(self, models_to_populate, layer_models): + leaf_dependent_models = [] + + for model in models_to_populate: + next_layer = False + + for field in model._meta.get_fields(): + if isinstance(field, models.ForeignKey): + if field.related_model not in layer_models: + next_layer = True + break + + if next_layer is False: + leaf_dependent_models.append(model) + + return leaf_dependent_models + + def sort_models_by_depedencies(self, models_to_populate, other_models): + sorted_models = [] + + while len(models_to_populate) > 0: + for model in models_to_populate: + has_all_dependencies = True + + for field in model._meta.get_fields(): + if isinstance(field, models.ForeignKey): + if ( + field.related_model not in other_models + and field.related_model not in sorted_models + and field.related_model != model + and field.null is False + ): + has_all_dependencies = False + break + + if has_all_dependencies: + sorted_models.append(model) + models_to_populate.remove(model) + + return sorted_models + + def clean_database(self, _models): + """ + Clean database + """ + for model in tqdm(_models, desc="Set foreign keys to null"): + foreign_keys = [ + field + for field in model._meta.get_fields() + if isinstance(field, models.ForeignKey) and field.null is True + ] + + if foreign_keys: + field_names = [field.name for field in foreign_keys] + model.objects.update(**{field_name: None for field_name in field_names}) + + for model in tqdm(_models, desc="Cleaning database"): + with transaction.atomic(): + model.objects.all().delete() + + def create_instance(self, model, item): + payload = {} + retry = None + table_name = model._meta.db_table + + for field in model._meta.local_fields: + if isinstance(field, models.ForeignKey): + field_name = f"{field.name}_id" + current_value = item[field_name] + + if current_value is None: + continue + + reference = self.references.get(field.related_model._meta.db_table, current_value) + if reference: + payload[field_name] = reference + else: + # If the field is required and the reference is missing, we need to skip + if field.null is False: + return + + retry = { + "item": item, + "table_name": field.related_model._meta.db_table, + "field_name": field_name, + } + + else: + payload[field.name] = item[field.name] + + instance = model(**payload) + instance.save() + + if retry: + retry["instance"] = instance + self.retry_instances.append(retry) + + self.references.add(table_name, item["id"], instance.id) + + def handle(self, *args, **kwargs): + app_name = "v1" + + app = apps.get_app_config(app_name) + self.get_all_files() + models_to_populate = [] + + for model in app.get_models(): + table_name = model._meta.db_table + + if self.model_has_data(table_name): + models_to_populate.append(model) + else: + self.stdout.write(self.style.WARNING(f"No data for {table_name}")) + + self.stdout.write(self.style.SUCCESS(f"Will populate {len(models_to_populate)} models")) + + leaf_layer = Layer() + leaf_layer.models = self.get_models_without_foreign_keys(models_to_populate) + + # Remove leaf layer models from models_to_populate + models_to_populate = list(set(models_to_populate) - set(leaf_layer.models)) + leaf_layer.print(self) + + # Create a layer with models that only depend on the leaf layer + leaf_dependent_layer = Layer() + leaf_dependent_layer.depth = 2 + leaf_dependent_layer.models = self.get_models_that_depends_on( + models_to_populate, leaf_layer.models + ) + + # Remove leaf dependent layer models from models_to_populate + models_to_populate = list(set(models_to_populate) - set(leaf_dependent_layer.models)) + leaf_dependent_layer.print(self) + + # Sort populated models by dependencies + sorted_layer = Layer() + sorted_layer.depth = 3 + sorted_layer.models = self.sort_models_by_depedencies( + models_to_populate, leaf_layer.models + leaf_dependent_layer.models + ) + + sorted_layer.print(self) + models_to_populate = list(set(models_to_populate) - set(sorted_layer.models)) + + # Populate models + all_models = leaf_layer.models + leaf_dependent_layer.models + sorted_layer.models + + # Clean database + # make a copy, dont modify the original array + reversed_models = all_models.copy()[::-1] + self.stdout.write(self.style.WARNING("Cleaning database")) + self.clean_database(reversed_models) + self.stdout.write(self.style.SUCCESS("Database cleaned")) + + self.references = References() + # After populating all models, we need to retry the instances that had a missing references + self.retry_instances = [] + self.stdout.write(self.style.SUCCESS("Populating models")) + + for model in all_models: + table_name = model._meta.db_table + data = self.load_table_data(table_name) + self.stdout.write(self.style.SUCCESS(f"Populating {table_name}")) + + for item in tqdm(data, desc=f"Populating {table_name}"): + self.create_instance(model, item) + + self.stdout.write(self.style.SUCCESS("Populating instances with missing references")) + + bulk = BulkUpdate() + + for retry in tqdm(self.retry_instances, desc="Retrying instances"): + item = retry["item"] + instance = retry["instance"] + field_name = retry["field_name"] + related_table_name = retry["table_name"] + current_value = item[field_name] + + reference = self.references.get(related_table_name, current_value) + + if reference: + setattr(instance, field_name, reference) + bulk.add(instance, field_name) + + bulk.bulk_update() + + self.stdout.write(self.style.SUCCESS("Data populated")) diff --git a/backend/apps/core/migrations/0002_taskexecution.py b/backend/apps/core/migrations/0002_taskexecution.py new file mode 100644 index 00000000..c5b4bf87 --- /dev/null +++ b/backend/apps/core/migrations/0002_taskexecution.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# Generated by Django 4.2.14 on 2024-07-18 12:49 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("core", "0001_initial"), + ] + + operations = [ + migrations.CreateModel( + name="TaskExecution", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ("task_name", models.CharField(max_length=255)), + ("execution_time", models.DateTimeField()), + ("duration", models.FloatField(default=0)), + ("status", models.CharField(max_length=255)), + ("result", models.TextField(blank=True, null=True)), + ("error", models.TextField(blank=True, null=True)), + ], + options={ + "verbose_name": "Tarefa executada", + "verbose_name_plural": "Tarefas executadas", + "ordering": ["-execution_time"], + }, + ), + ] diff --git a/backend/apps/core/models.py b/backend/apps/core/models.py index afa50fa7..c61a4961 100644 --- a/backend/apps/core/models.py +++ b/backend/apps/core/models.py @@ -13,3 +13,19 @@ class Metadata(BaseModel): id = models.UUIDField(primary_key=True, default=uuid4) key = models.JSONField(default=dict, blank=False, null=False) value = models.JSONField(default=dict, blank=False, null=False) + + +class TaskExecution(models.Model): + task_name = models.CharField(max_length=255) + execution_time = models.DateTimeField() + duration = models.FloatField(default=0) + status = models.CharField(max_length=255) + result = models.TextField(null=True, blank=True) + error = models.TextField(null=True, blank=True) + + class Meta: + """Meta definition for TaskExecution.""" + + verbose_name = "Tarefa executada" + verbose_name_plural = "Tarefas executadas" + ordering = ["-execution_time"] diff --git a/backend/apps/core/tasks.py b/backend/apps/core/tasks.py index eed69b11..b827db4f 100644 --- a/backend/apps/core/tasks.py +++ b/backend/apps/core/tasks.py @@ -1,12 +1,14 @@ # -*- coding: utf-8 -*- from datetime import datetime, timedelta +from django.core.management import call_command from huey import crontab from huey.contrib.djhuey import db_periodic_task from backend.apps.core.models import Metadata from backend.custom.client import BetterStackClient -from backend.custom.environment import production_task +from backend.custom.environment import not_production_task, production_task +from backend.custom.task_decorators import log_task_execution @db_periodic_task(crontab(day="1", hour="3", minute="0")) @@ -34,3 +36,13 @@ def get_period(): }, value=client.get_monitor_summary(monitor["id"], since, until), ) + + +@db_periodic_task(crontab(minute="0", hour="*/2")) +@not_production_task +@log_task_execution("sync_database_with_prod") +def sync_database_with_prod(): + """Sync database with production""" + call_command("fetch_metabase") + call_command("populate") + return "Sincronização concluída com sucesso" diff --git a/backend/custom/environment.py b/backend/custom/environment.py index ea2612ba..c0c14390 100644 --- a/backend/custom/environment.py +++ b/backend/custom/environment.py @@ -67,3 +67,16 @@ def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper + + +def not_production_task(func): + """ + Decorator that avoids function call if it is production + """ + + @wraps(func) + def wrapper(*args, **kwargs): + if not is_prd(): + return func(*args, **kwargs) + + return wrapper diff --git a/backend/custom/logger.py b/backend/custom/logger.py index 5d3ebe1d..9dd306d6 100644 --- a/backend/custom/logger.py +++ b/backend/custom/logger.py @@ -8,7 +8,7 @@ LOGGER_LEVEL = getenv("LOGGER_LEVEL", "DEBUG") LOGGER_IGNORE = getenv("LOGGER_IGNORE", "").split(",") LOGGER_SERIALIZE = bool(getenv("LOGGER_SERIALIZE", False)) -LOGGER_FORMAT = "[{time:YYYY-MM-DD HH:mm:ss}] {message}" +LOGGER_FORMAT = "[{time:YYYY-MM-DD HH:mm:ss}] {extra[app_name]}: {message}" class InterceptHandler(Handler): @@ -28,7 +28,14 @@ def emit(self, record: LogRecord): frame = frame.f_back depth += 1 - logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + # Include the logger name (app name) in the log record + app_name = record.name + extra = record.__dict__.get("extra", {}) + extra["app_name"] = app_name + + logger.bind(**extra).opt(depth=depth, exception=record.exc_info).log( + level, record.getMessage() + ) def setup_logger(logging_settings=None): diff --git a/backend/custom/task_decorators.py b/backend/custom/task_decorators.py new file mode 100644 index 00000000..20c91acd --- /dev/null +++ b/backend/custom/task_decorators.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +from datetime import datetime +from functools import wraps + +from backend.apps.core.models import TaskExecution + + +def log_task_execution(task_name): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + status = "success" + result = None + error = None + start_time = datetime.now() + + try: + result = func(*args, **kwargs) + except Exception as e: + status = "falied" + error = str(e) + finally: + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + + TaskExecution.objects.create( + task_name=task_name, + status=status, + result=result, + error=error, + execution_time=start_time, + duration=duration, + ) + return result + + return wrapper + + return decorator