Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Add a DAG for backfilling license_url when meta_data is null #1005

Merged
merged 13 commits into from
Mar 10, 2023

Conversation

obulat
Copy link
Contributor

@obulat obulat commented Feb 17, 2023

Fixes

Related to WordPress/openverse#1565 by @obulat

Description

This PR adds license_url to the meta_data jsonb field where the meta_data field is NULL.
To make the query faster, we split it into several steps:

  1. Count the number of rows where meta_data is NULL.
  2. Update the meta_data:
    a. Select all identifiers of the items where meta_data is NULL. Having identifiers will enable us not to do sequential scan through all of the database (i.e., not to test each row individually), and only work with the 366 974items identified in the related issue.
    b. For each license and license_version pair, we run the UPDATE query setting the meta_data to {"license_url": <URL>}. Note: I'm using Json here although the column is jsonb. I'm not sure if it's correct, but it works locally.
  3. The final report checks for rows where meta_data is NULL. Normally, the count should be 0, but due to the possible lack of license_version in some of the rows (see Possible problems below), it might not be.

Possible problems

There is an issue in the CC repository about missing license_version in some items. If there are still items without license_version in the database, their count will be reported in the last step.

Limitations

This PR only solves a part of the problem for items where meta_data is NULL.

I have confirmed that there are some items with a non-null meta_data that don't have license_url field in meta_data (e.g., item with identifier d876532b-01da-4e14-aa73-237c0a160d1f). For this, the possible solution is to add the license_url check in the ingestion server, adding it if necessary, and then propagate that data up to the catalog. This will be much faster than doing this on the catalog side because here, we would need to "sequentially scan" hundreds of millions of records.

I'm also not sure about the reporting step, and open to suggestions about it. Although, I'm not sure how much we need to report for a one-off maintenance DAG.

Testing Instructions

Run some provider API DAGs if you have no data in your local database.
Set meta_data to NULL:
UPDATE image SET meta_data = null
You can also set some of the license_version values to NULL:

WITH rows AS (SELECT identifier FROM image LIMIT 10)
UPDATE image SET license_version=null
WHERE identifier IN (SELECT identifier FROM image LIMIT 10);

Run the add_license_url DAG and check that it adds a json object to the meta_data field with the license URL in it. Also, if you set the license_version to NULL, you should see that reported in the last step.

Checklist

  • My pull request has a descriptive title (not a vague title like
    Update index.md).
  • My pull request targets the default branch of the repository (main) or
    a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible
    errors.
  • I ran the DAG documentation generator (if applicable).

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@obulat obulat requested a review from a team as a code owner February 17, 2023 07:05
@github-actions github-actions bot added the 🚦 status: awaiting triage Has not been triaged & therefore, not ready for work label Feb 17, 2023
@obulat obulat added 🟨 priority: medium Not blocking but should be addressed soon ✨ goal: improvement Improvement to an existing user-facing feature 💻 aspect: code Concerns the software code in the repository and removed 🚦 status: awaiting triage Has not been triaged & therefore, not ready for work labels Feb 17, 2023
@obulat obulat changed the title Add a DAG for backfilling license_url Add a DAG for backfilling license_url when meta_data is null Feb 23, 2023
Copy link
Contributor

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super straightforward! I really like your approach of 1) select identifiers to update 2) determine update 3) issue update query. I wonder if it might be a good idea to create a more generic interface for that set of actions, since it's one I can image us doing many times in the future 😄 I don't think we need to do it as part of this PR, but it's something to consider as we make more of these down the line.

I have a few suggestions for ways we can simplify this!

openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
postgres_conn_id: str,
) -> Literal["update_license_url", "final_report"]:
logger.info("Getting image records without license_url in meta_data.")
postgres = PostgresHook(postgres_conn_id=postgres_conn_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that if this gets in after #717 we'll want to add timeouts to this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add it in the DAG call, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to make any changes to the DAG, the task variable will just need to be handed into the PostgresHook like some of the examples modified in that PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the PostgresHook directly we pass a default_statement_timeout instead of just the task, like:

postgres = PostgresHook(
        postgres_conn_id=postgres_conn_id,
        default_statement_timeout=PostgresHook.get_execution_timeout(task),
    )

If not, it'll fall back to the 1-hr default timeout from DAG_DEFAULT_ARGS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this 1-hr default timeout is OK here, @stacimc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect it would be, but I don't really have a good guess either! If for some reason it's not long enough, we only lose an hour + the time to open a new PR.

I actually have a PR open (which just needs one more review!) that would let us override specific task timeouts with an Airflow variable, so we can adjust them without having to make a code change. Would be useful for this sort of thing 😃

openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
@obulat
Copy link
Contributor Author

obulat commented Feb 28, 2023

This is super straightforward! I really like your approach of 1) select identifiers to update 2) determine update 3) issue update query. I wonder if it might be a good idea to create a more generic interface for that set of actions, since it's one I can image us doing many times in the future 😄 I don't think we need to do it as part of this PR, but it's something to consider as we make more of these down the line.

I love the idea of creating a generic interface for the actions to update the catalog. I think that in the future we will have several updates that will not need the first step, the selection of identifiers, in the catalog. They will probably be provided by the API (from the cleanup step, as in WordPress/openverse-api#1126) or the Polite crawler, as TSV or some other format. Is it easy to deploy such one-off DAGs in Airflow? Maybe we could even use something else (like Kafka?) in the future for updating the upstream database?

@AetherUnbound
Copy link
Contributor

I love the idea of creating a generic interface for the actions to update the catalog. I think that in the future we will have several updates that will not need the first step, the selection of identifiers, in the catalog. They will probably be provided by the API (from the cleanup step, as in WordPress/openverse-api#1126) or the Polite crawler, as TSV or some other format. Is it easy to deploy such one-off DAGs in Airflow? Maybe we could even use something else (like Kafka?) in the future for updating the upstream database?

That's such a good point! It'd definitely be a case to consider as part of this. For something dynamic like you're describing, having to deploy a new DAG could be cumbersome. But it does also provide us a "paper trail" of sorts for migrations, so that has its own advantages. I could see a case where we use Variables or DAG run config to entirely dictate the set of operations, which would remove the need to add a new DAG each time. Something to consider for the RFC there!

Copy link
Contributor

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran this locally and it worked exactly as expected! I have one note but otherwise this is good to go IMO ⭐

openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
Copy link
Contributor

@stacimc stacimc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worked great for me! Thank you for the testing instructions, and the refresher on the context behind this. I really like the approach and think it'll be a really interesting template for a generic interface sometime down the road :)

No blocking feedback, this looks good to me!

)

get_statistics >> [update_license_url, final_report]
update_license_url >> final_report
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be written more simply as get_statistics >> update_license_url >> final_report

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we simplify it, would it skip the update_license_url step if the get_statistics says that we don't need to update anything? The intention was to skip the update step if we ever run this on a fully cleaned dataset, but I'm not very good with branching operators :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, that's right! Do you mind adding a comment to that effect Olga? You're totally right, the current setup is required for the branching operator, but I could see making that change in the future and accidentally breaking something 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally correct, I missed that there was a branching operator! 😅

openverse_catalog/dags/maintenance/add_license_url.py Outdated Show resolved Hide resolved
postgres_conn_id: str,
) -> Literal["update_license_url", "final_report"]:
logger.info("Getting image records without license_url in meta_data.")
postgres = PostgresHook(postgres_conn_id=postgres_conn_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the PostgresHook directly we pass a default_statement_timeout instead of just the task, like:

postgres = PostgresHook(
        postgres_conn_id=postgres_conn_id,
        default_statement_timeout=PostgresHook.get_execution_timeout(task),
    )

If not, it'll fall back to the 1-hr default timeout from DAG_DEFAULT_ARGS.

@AetherUnbound
Copy link
Contributor

@obulat this could be merged if you suspect the default timeout (1h) will be sufficient for each posgtres operation made by the hook, but otherwise we'll need to add the task timeout to the hook (or something more appropriate) per Staci's comment.

@obulat
Copy link
Contributor Author

obulat commented Mar 7, 2023

@stacimc, thank you for your instructions for setting the timeout. We have 366,974 images that should be affected. I have no idea what timeout value we should use. 5 hours?.. What do you think?

@obulat obulat self-assigned this Mar 9, 2023
Signed-off-by: Olga Bulat <[email protected]>
@obulat obulat added the 🧱 stack: catalog Related to the catalog and Airflow DAGs label Mar 9, 2023
Signed-off-by: Olga Bulat <[email protected]>
@stacimc
Copy link
Contributor

stacimc commented Mar 10, 2023

The new timeout looks good to me 👍 I think this can be merged and enabled in production. The DAG will be off by default so you'll need to enable it through the Airflow UI.

@obulat obulat merged commit d212dcf into main Mar 10, 2023
@obulat obulat deleted the add/license_url branch March 10, 2023 17:23
@obulat
Copy link
Contributor Author

obulat commented Mar 13, 2023

I triggered this query at 05.53 UTC today (2023-03-13). Here are some notes on the run:

The first step reported a different number of records with NULL in metadata, only 28 824, compared with the query results from the last time we tried (more than 300 000 records):

[2023-03-13, 06:19:11 UTC] {add_license_url.py:60} INFO - There are 28824 records with NULL in meta_data.

The second step takes around 30 minutes for each license pair (license name and license version), so it will probably time out. If it does, to improve the performance next time, instead of running a select query for every license pair, we could select the records' identifier, license, license_url from rows where meta_data is NULL.
Then, in Python, we group them into a dictionary with the license and license version pair as a key, and identifiers as a value:

license_dict = {}
select_query = "SELECT identifier, license, license_url FROM image WHERE meta_data IS NULL;"
results = postgres.get_records(select_query)

for result in results:
    identifier, license_, version = result
    license_pair = f"{license_},{version}"
    if not license_pair in licenses:
        licenses[license_pair] = [identifier]
    else:
        licenses[license_pair].append(identifier)

for license_pair in license_dict:
    license_, license_version = license_pair.split(",")
    license_url =  get_license_info_from_license_pair(license_, license_version)
    update_query = dedent(
            f"""
            UPDATE image
            SET meta_data = {Json(license_url)}
            WHERE identifier IN ({','.join([f"'{r}'" for r in license_dict[license_pair]])});
            """
     updated_count = postgres.run(
            update_license_url_query, autocommit=True, handler=RETURN_ROW_COUNT
        )

@obulat
Copy link
Contributor Author

obulat commented Mar 13, 2023

I just realized that each Postgres statement has a timeout of 5 hours. Since each separate license pair query runs about 30 minutes, the whole DAG should complete successfully. But in the future, it's better not to make many SELECT queries if its possible to select items in Python. There is a task timeout of 5 hours, and a statement timeout of the same length. The DAG failed after 5 hours :( I am going to open a new PR with the suggestions from the previous comment.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ensure that all media have license_url in meta_data field
3 participants