Skip to content

Commit

Permalink
feat: check if raw source urls return ok
Browse files Browse the repository at this point in the history
  • Loading branch information
vncsna committed Feb 14, 2024
1 parent 94d5034 commit a4dfdb0
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 43 deletions.
70 changes: 34 additions & 36 deletions bd_api/apps/api/v1/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from huey.contrib.djhuey import periodic_task
from loguru import logger
from pandas import read_gbq
from requests import get

from bd_api.apps.api.v1.models import Dataset, Table
from bd_api.custom.client import get_gbq_client, get_gcs_client, send_discord_message
from bd_api.apps.api.v1.models import Dataset, RawDataSource, Table
from bd_api.custom.client import Messenger, get_gbq_client, get_gcs_client
from bd_api.utils import production_task

logger = logger.bind(module="api.v1")
Expand All @@ -33,9 +34,6 @@ def rebuild_search_index_task():
def update_table_metadata_task(table_pks: list[str] = None):
"""Update the metadata of selected tables in the database"""

msg = "Verifique os metadados dos conjuntos: "
link = lambda pk: f"https://api.basedosdados.org/admin/v1/table/{pk}/change/" # noqa

def get_number_of_rows(table: Table, bq_table: GBQTable) -> int | None:
"""Get number of rows from big query"""

Expand All @@ -51,8 +49,8 @@ def get_number_of_rows(table: Table, bq_table: GBQTable) -> int | None:
number_rows = read_gbq(query)
number_rows = number_rows.loc[0, "n_rows"]
return number_rows or None
except Exception as e:
logger.warning(e)
except Exception as exc:
logger.warning(exc)

def get_number_of_columns(table: Table, bq_table: GBQTable):
"""Get number of columns from big query"""
Expand All @@ -71,58 +69,43 @@ def get_uncompressed_file_size(table: Table, bq_table: GBQTable) -> int | None:
for blob in cs_bucket.list_blobs(prefix=table.gcs_slug):
file_size += blob.size
return file_size
except Exception as e:
logger.warning(e)

def fmt_msg(table: Table = None, error: Exception = None) -> str:
"""
- Add line if error exists
- Return none if no error exists
"""
nonlocal msg
if table and error:
line = f"\n- [{table}]({link(table.pk)}): `{error}` "
if len(msg) + len(line) <= 2000:
msg += line
return msg
if msg.count("\n"):
return msg
except Exception as exc:
logger.warning(exc)

bq_client = get_gbq_client()
cs_client = get_gcs_client()
cs_bucket = cs_client.get_bucket("basedosdados")

messenger = Messenger("Verifique os metadados dos conjuntos:")

if not table_pks:
tables = Table.objects.order_by("updated_at").all()
else:
tables = Table.objects.filter(pk__in=table_pks).order_by("updated_at").all()

for table in tables:
if len(msg) > 1600:
break
if not table.gbq_slug:
continue
if messenger.is_soft_full:
break
try:
bq_table = bq_client.get_table(table.gbq_slug)
table.number_rows = get_number_of_rows(table, bq_table)
table.number_columns = get_number_of_columns(table, bq_table)
table.uncompressed_file_size = get_uncompressed_file_size(table, bq_table)
table.save()
logger.info(f"{table}")
except GoogleAPICallError as e:
e = e.response.json()["error"]
e = e["errors"][0]["message"]
msg = fmt_msg(table, e)
logger.warning(f"{table}: {e}")
except Exception as e:
msg = fmt_msg(table, e)
logger.warning(f"{table}: {e}")
except Exception as exc:
if isinstance(exc, GoogleAPICallError):
exc = exc.response.json()["error"]
exc = exc["errors"][0]["message"]
messenger.append(table, exc)
logger.warning(f"{table}: {exc}")

if msg := fmt_msg():
send_discord_message(msg)
messenger.send()


@periodic_task(crontab(hour="8", minute="0"))
@periodic_task(crontab(day_of_week="1-5", hour="7", minute="0"))
@production_task
def update_page_views_task(backfill: bool = False):
if backfill:
Expand Down Expand Up @@ -171,3 +154,18 @@ def update_page_views_task(backfill: bool = False):
if dataset := Dataset.objects.filter(id=dataset_id).first():
dataset.page_views += page_views
dataset.save()


@periodic_task(crontab(day_of_week="1-5", hour="8", minute="0"))
@production_task
def check_links_task():
messenger = Messenger("Revise os seguintes links:")
for entity in RawDataSource.objects.filter(url__isnull=False).all():
if messenger.is_soft_full:
break
try:
response = get(entity.url)
response.raise_for_status()
except Exception as exc:
messenger.append(entity, exc)
messenger.send()
38 changes: 38 additions & 0 deletions bd_api/custom/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from json import loads
from pathlib import Path
from typing import Any

from django.conf import settings
from google.cloud.bigquery import Client as GBQClient
Expand All @@ -9,6 +10,8 @@
from loguru import logger
from requests import post

from bd_api.custom.model import BaseModel


def get_gcloud_credentials():
"""Get google cloud credentials"""
Expand Down Expand Up @@ -38,3 +41,38 @@ def send_discord_message(message: str):
message = message[:2000]
response = post(url, data={"content": message})
logger.debug(f"{response.status_code}: {response.text}")


class Messenger:
_length: int
_message: str
_soft_limit: int = 1600
_hard_limit: int = 2000

@property
def length(self) -> int:
return len(self._message)

@property
def is_soft_full(self) -> bool:
return len(self._message) >= self._soft_limit

@property
def is_hard_full(self) -> bool:
return len(self._message) >= self._hard_limit

def __init__(self, header: str):
self._message = f"{header.strip()} "

def append(self, entity: BaseModel = None, text: Any = None):
"""Append line if message length is less than limit"""
if entity and text:
line = f"\n- [{entity}]({entity.admin_url}): `{text}`"
if len(self._message) + len(line) <= self._length:
self._message += line
return self._message

def send(self):
"""Send message if it has body text"""
if self._message.count("\n"):
send_discord_message(self._message)
11 changes: 9 additions & 2 deletions bd_api/custom/model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
from typing import Callable, List

from django.conf import settings
from django.db import models
from django.urls import reverse
from graphql_jwt.decorators import staff_member_required

default_blacklist_fields = [
Expand All @@ -16,8 +18,7 @@


class BaseModel(models.Model):
"""
Abstract base class model that provides whitelist
"""Abstract base class model that provides whitelist
of fields to be used for filtering in the GraphQL API
Attributes:
Expand Down Expand Up @@ -83,3 +84,9 @@ def get_graphql_nested_filter_fields_whitelist(cls) -> List[models.Field]:
if f.name not in cls.graphql_nested_filter_fields_blacklist
]
return cls._meta.get_fields()

@property
def admin_url(self):
viewname = f"admin:{self._meta.app_label}_{self._meta.model_name}_change"
endpoint = reverse(viewname, kwargs={"object_id": self.pk})
return f"{settings.BACKEND_URL}{endpoint}"
4 changes: 4 additions & 0 deletions bd_api/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,5 +293,9 @@
},
}

# URLs
BACKEND_URL = getenv("BASE_URL_API", "https://localhost:8080")
FRONTEND_URL = getenv("BASE_URL_FRONTEND", "https://localhost:3080")

# Discord
DISCORD_BACKEND_WEBHOOK_URL = getenv("DISCORD_BACKEND_WEBHOOK_URL")
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ google-cloud-storage = "^2.11.0"
stripe = "^4.2.0"
dj-stripe = "^2.8.3"
pydantic = "^2.5.3"
requests = "^2.31.0"

[tool.poetry.group.dev.dependencies]
pre-commit = "^3.3.3"
Expand Down

0 comments on commit a4dfdb0

Please sign in to comment.