Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Add a DAG for backfilling license_url when meta_data is null (#1005)
Browse files Browse the repository at this point in the history
* Add a DAG for backfilling license_url

Signed-off-by: Olga Bulat <[email protected]>

* Add a DAG for backfilling license_url

Signed-off-by: Olga Bulat <[email protected]>

* Update docs

Signed-off-by: Olga Bulat <[email protected]>

* Apply suggestions from code review

Co-authored-by: Madison Swain-Bowden <[email protected]>

* Add changes from code review

Signed-off-by: Olga Bulat <[email protected]>

* Use constants for `task_id`s

Signed-off-by: Olga Bulat <[email protected]>

* Use constant for `default_args`s

Signed-off-by: Olga Bulat <[email protected]>

* Extract common functionality

Signed-off-by: Olga Bulat <[email protected]>

* Update dag docs

Signed-off-by: Olga Bulat <[email protected]>

* Update openverse_catalog/dags/maintenance/add_license_url.py

Co-authored-by: Madison Swain-Bowden <[email protected]>

* Improve counts logging

Signed-off-by: Olga Bulat <[email protected]>

* Add the timeouts and docs

Signed-off-by: Olga Bulat <[email protected]>

* Update docs

Signed-off-by: Olga Bulat <[email protected]>

---------

Signed-off-by: Olga Bulat <[email protected]>
Co-authored-by: Madison Swain-Bowden <[email protected]>
  • Loading branch information
obulat and AetherUnbound authored Mar 10, 2023
1 parent 4c0ceb0 commit d212dcf
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 0 deletions.
20 changes: 20 additions & 0 deletions DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@ The DAGs are shown in two forms:

The following are DAGs grouped by their primary tag:

1. [Data Normalization](#data_normalization)
1. [Data Refresh](#data_refresh)
1. [Database](#database)
1. [Maintenance](#maintenance)
1. [Oauth](#oauth)
1. [Provider](#provider)
1. [Provider Reingestion](#provider-reingestion)

## Data Normalization

| DAG ID | Schedule Interval |
| ------------------------------------- | ----------------- |
| [`add_license_url`](#add_license_url) | `None` |

## Data Refresh

| DAG ID | Schedule Interval |
Expand Down Expand Up @@ -93,6 +100,7 @@ The following are DAGs grouped by their primary tag:

The following is documentation associated with each DAG (where available):

1. [`add_license_url`](#add_license_url)
1. [`airflow_log_cleanup`](#airflow_log_cleanup)
1. [`audio_data_refresh`](#audio_data_refresh)
1. [`check_silenced_dags`](#check_silenced_dags)
Expand Down Expand Up @@ -127,6 +135,18 @@ The following is documentation associated with each DAG (where available):
1. [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow)
1. [`wordpress_workflow`](#wordpress_workflow)

## `add_license_url`

### Add license URL

Add `license_url` to all rows that have `NULL` in their `meta_data` fields. This
PR sets the meta_data value to "{license_url: https://... }", where the url is
constructed from the `license` and `license_version` columns.

This is a maintenance DAG that should be run once. If all the null values in the
`meta_data` column are updated, the DAG will only run the first and the last
step, logging the statistics.

## `airflow_log_cleanup`

### Clean up airflow logs
Expand Down
183 changes: 183 additions & 0 deletions openverse_catalog/dags/maintenance/add_license_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""
# Add license URL
Add `license_url` to all rows that have `NULL` in their `meta_data` fields.
This PR sets the meta_data value to "{license_url: https://... }", where the
url is constructed from the `license` and `license_version` columns.
This is a maintenance DAG that should be run once. If all the null values in
the `meta_data` column are updated, the DAG will only run the first and the
last step, logging the statistics.
"""
import logging
from datetime import timedelta
from textwrap import dedent
from typing import Literal

from airflow.models import DAG
from airflow.models.abstractoperator import AbstractOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID, XCOM_PULL_TEMPLATE
from common.licenses.constants import get_reverse_license_path_map
from common.loader.sql import RETURN_ROW_COUNT
from common.slack import send_message
from common.sql import PostgresHook
from psycopg2._json import Json


DAG_ID = "add_license_url"
UPDATE_LICENSE_URL = "update_license_url"
FINAL_REPORT = "final_report"

ALERT_EMAIL_ADDRESSES = ""

logger = logging.getLogger(__name__)

base_url = "https://creativecommons.org/"


def get_null_counts(
postgres_conn_id: str,
task: AbstractOperator,
) -> int:
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)
null_meta_data_count = postgres.get_first(
dedent("SELECT COUNT(*) from image WHERE meta_data IS NULL;")
)[0]
return null_meta_data_count


def get_statistics(
postgres_conn_id: str,
task: AbstractOperator,
) -> Literal[UPDATE_LICENSE_URL, FINAL_REPORT]:
null_meta_data_count = get_null_counts(postgres_conn_id, task)

next_task = UPDATE_LICENSE_URL if null_meta_data_count > 0 else FINAL_REPORT
logger.info(
f"There are {null_meta_data_count} records with NULL in meta_data. \n"
f"Next step: {next_task}"
)

return next_task


def update_license_url(postgres_conn_id: str, task: AbstractOperator) -> dict[str, int]:
"""Add license_url to meta_data batching all records with the same license.
:param task: automatically passed by Airflow, used to set the execution timeout
:param postgres_conn_id: Postgres connection id
"""

logger.info("Getting image records without license_url.")
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)
license_map = get_reverse_license_path_map()

total_count = 0
total_counts = {}
for license_items, path in license_map.items():
license_name, license_version = license_items
logger.info(f"Processing {license_name} {license_version}, {license_items}.")
license_url = f"{base_url}{path}/"

select_query = dedent(
f"""
SELECT identifier FROM image
WHERE (
meta_data is NULL AND license = '{license_name}'
AND license_version = '{license_version}')
"""
)
result = postgres.get_records(select_query)

if not result:
logger.info(f"No records to update with {license_url}.")
continue
logger.info(f"{len(result)} records to update with {license_url}.")
license_url_col = {"license_url": license_url}
update_license_url_query = dedent(
f"""
UPDATE image
SET meta_data = {Json(license_url_col)}
WHERE identifier IN ({','.join([f"'{r[0]}'" for r in result])});
"""
)

updated_count = postgres.run(
update_license_url_query, autocommit=True, handler=RETURN_ROW_COUNT
)
logger.info(f"{updated_count} records updated with {license_url}.")
total_counts[license_url] = updated_count
total_count += updated_count

logger.info(f"{total_count} image records with missing license_url updated.")
for license_url, count in total_counts.items():
logger.info(f"{count} records with {license_url}.")
return total_counts


def final_report(
postgres_conn_id: str,
item_count: int,
task: AbstractOperator,
):
null_meta_data_count = get_null_counts(postgres_conn_id, task)

message = f"""
Added license_url to *{item_count}* items`
Now, there are {null_meta_data_count} records with NULL meta_data left.
"""
send_message(
message,
username="Airflow DAG Data Normalization - license_url",
dag_id=DAG_ID,
)

logger.info(message)


dag = DAG(
dag_id=DAG_ID,
default_args={
**DAG_DEFAULT_ARGS,
"retries": 0,
"execution_timeout": timedelta(hours=5),
},
schedule_interval=None,
catchup=False,
# Use the docstring at the top of the file as md docs in the UI
doc_md=__doc__,
tags=["data_normalization"],
)

with dag:
get_statistics = BranchPythonOperator(
task_id="get_stats",
python_callable=get_statistics,
op_kwargs={"postgres_conn_id": POSTGRES_CONN_ID},
)
update_license_url = PythonOperator(
task_id=UPDATE_LICENSE_URL,
python_callable=update_license_url,
op_kwargs={"postgres_conn_id": POSTGRES_CONN_ID},
)
final_report = PythonOperator(
task_id=FINAL_REPORT,
python_callable=final_report,
op_kwargs={
"postgres_conn_id": POSTGRES_CONN_ID,
"item_count": XCOM_PULL_TEMPLATE.format(
update_license_url.task_id, "return_value"
),
},
)

# If there are no records with NULL meta_data, skip the update step
# and go straight to the final report
get_statistics >> [update_license_url, final_report]
update_license_url >> final_report

0 comments on commit d212dcf

Please sign in to comment.