Skip to content

Commit

Permalink
feat: add update table metadata job (#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
vncsna authored Oct 4, 2023
1 parent 653aae6 commit 5e25d96
Show file tree
Hide file tree
Showing 8 changed files with 643 additions and 247 deletions.
47 changes: 36 additions & 11 deletions basedosdados_api/api/v1/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
from django.shortcuts import get_object_or_404, render
from django.utils.html import format_html
from google.api_core.exceptions import BadRequest, NotFound
from google.cloud.bigquery import Client
from google.cloud.bigquery import Client as GBQClient
from google.cloud.storage import Client as GCSClient
from google.oauth2 import service_account
from loguru import logger
from modeltranslation.admin import TabbedTranslationAdmin, TranslationStackedInline
from ordered_model.admin import OrderedInlineModelAdminMixin, OrderedStackedInline
from pandas import read_gbq

from basedosdados_api.api.v1.filters import (
OrganizationImageFilter,
Expand Down Expand Up @@ -223,30 +225,53 @@ def get_credentials():
return credentials


def update_table_metadata(modeladmin, request, queryset: QuerySet):
def update_table_metadata(modeladmin=None, request=None, queryset: QuerySet = None):
"""Update the metadata of selected tables in the database"""

creds = get_credentials()
client = Client(credentials=creds)
bq_client = GBQClient(credentials=creds)
cs_client = GCSClient(credentials=creds)

bucket_name = "basedosdados"
bucket = cs_client.get_bucket(bucket_name)

tables: list[Table] = []
match str(modeladmin):
case "v1.TableAdmin":
tables = queryset
case "v1.DatasetAdmin":
for database in queryset:
for table in database.tables.all():
tables.append(table)
tables = Table.objects.filter(
dataset__in=queryset,
source_bucket_name__isnull=False,
)
case "v1.TableAdmin":
tables = queryset.filter(source_bucket_name__isnull=False)
case _:
tables = Table.objects.filter(source_bucket_name__isnull=False)

for table in tables:
try:
bq_table = client.get_table(table.db_slug)
bq_table = bq_client.get_table(table.db_slug)
table.number_rows = bq_table.num_rows or None
table.number_columns = len(bq_table.schema) or None
table.uncompressed_file_size = bq_table.num_bytes or None
if bq_table.table_type == "VIEW":
# Get number of rows from big query
table.number_rows = read_gbq(
"""
SELECT COUNT(1) AS n_rows
FROM `{table.db_slug}`
"""
).loc[0, "n_rows"]
# Get file size in bytes from storage
file_size = 0
folder_prefix = f"staging/{table.dataset.db_slug}/{table.db_slug}"
for blob in bucket.list_blobs(prefix=folder_prefix):
file_size += blob.size
table.uncompressed_file_size = file_size
table.save()
except (BadRequest, NotFound, ValueError) as e:
logger.debug(e)
logger.warning(e)
except Exception as e:
logger.error(e)


update_table_metadata.short_description = "Atualizar metadados das tabelas"
Expand Down Expand Up @@ -319,7 +344,7 @@ def reorder_columns(modeladmin, request, queryset):
"""
try:
creds = get_credentials()
client = Client(credentials=creds)
client = GBQClient(credentials=creds)
query_job = client.query(query, timeout=90)
except Exception as e:
messages.error(
Expand Down
8 changes: 8 additions & 0 deletions basedosdados_api/api/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ def db_slug(self):
"""Get the database slug"""
return self.full_slug.replace("sa_br", "br")

@property
def get_graphql_db_slug(self):
return self.db_slug

@property
def full_slug(self):
"""Get the full slug or Dataset"""
Expand Down Expand Up @@ -970,6 +974,10 @@ def db_slug(self):
bucket_slug = self.source_bucket_name
return f"{bucket_slug}.{dataset_slug}.{table_slug}"

@property
def get_graphql_db_slug(self):
return self.db_slug

@property
def partitions(self):
"""Returns a list of columns used to partition the table"""
Expand Down
2 changes: 1 addition & 1 deletion basedosdados_api/api/v1/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def handle_save(self, sender, instance, raw, **kwargs):
for ds in datasets or []:
index.update_object(ds, using=using)
except NotHandled as error:
logger.debug(error)
logger.warning(error)
except Exception as error:
logger.error(error)

Expand Down
10 changes: 9 additions & 1 deletion basedosdados_api/api/v1/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
# -*- coding: utf-8 -*-
from huey import crontab
from huey.contrib.djhuey import periodic_task
from loguru import logger

from basedosdados_api.api.v1.admin import _update_table_metadata


@periodic_task(crontab(minute="*/10"))
def every_ten_mins():
print("It's been ten minutes!")
logger.info("Am I alive between these periods?")


@periodic_task(crontab(day_of_week="0", hour="0", minute="0"))
def update_table_metadata():
_update_table_metadata()
4 changes: 2 additions & 2 deletions basedosdados_api/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@
"results": True,
"immediate": False,
"connection": {
"host": getenv("REDIS_URL", "localhost"),
# TODO: Delete this hotfix
"host": getenv("REDIS_URL", "localhost").replace("http://", ""),
"port": getenv("REDIS_PORT", 6379),
"db": getenv("REDIS_DB", 1),
"read_timeout": 1,
Expand All @@ -426,4 +427,3 @@
"periodic": True,
},
}
HUEY = None
21 changes: 16 additions & 5 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
version: "3"

services:
elasticsearch:
index:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.3
container_name: elasticsearch
container_name: index
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
Expand All @@ -26,6 +26,17 @@ services:
retries: 5
start_period: 1m

queue:
image: redis:6.0
container_name: queue
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3

database:
image: postgres:14
container_name: database
Expand All @@ -47,15 +58,15 @@ services:
build:
context: .
dockerfile: Dockerfile
container_name: basedosdados_api
container_name: api
env_file:
- .env
ports:
- 8080:80
depends_on:
database:
index:
condition: service_healthy
elasticsearch:
database:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost/healthcheck/"]
Expand Down
Loading

0 comments on commit 5e25d96

Please sign in to comment.