-
Notifications
You must be signed in to change notification settings - Fork 54
Add a DAG for backfilling license_url when meta_data is null #1005
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super straightforward! I really like your approach of 1) select identifiers to update 2) determine update 3) issue update query. I wonder if it might be a good idea to create a more generic interface for that set of actions, since it's one I can image us doing many times in the future 😄 I don't think we need to do it as part of this PR, but it's something to consider as we make more of these down the line.
I have a few suggestions for ways we can simplify this!
postgres_conn_id: str, | ||
) -> Literal["update_license_url", "final_report"]: | ||
logger.info("Getting image records without license_url in meta_data.") | ||
postgres = PostgresHook(postgres_conn_id=postgres_conn_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noting that if this gets in after #717 we'll want to add timeouts to this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add it in the DAG
call, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to make any changes to the DAG, the task
variable will just need to be handed into the PostgresHook
like some of the examples modified in that PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the PostgresHook
directly we pass a default_statement_timeout
instead of just the task
, like:
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)
If not, it'll fall back to the 1-hr default timeout from DAG_DEFAULT_ARGS
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this 1-hr default timeout is OK here, @stacimc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect it would be, but I don't really have a good guess either! If for some reason it's not long enough, we only lose an hour + the time to open a new PR.
I actually have a PR open (which just needs one more review!) that would let us override specific task timeouts with an Airflow variable, so we can adjust them without having to make a code change. Would be useful for this sort of thing 😃
I love the idea of creating a generic interface for the actions to update the catalog. I think that in the future we will have several updates that will not need the first step, the selection of identifiers, in the catalog. They will probably be provided by the API (from the cleanup step, as in WordPress/openverse-api#1126) or the Polite crawler, as TSV or some other format. Is it easy to deploy such one-off DAGs in Airflow? Maybe we could even use something else (like Kafka?) in the future for updating the upstream database? |
That's such a good point! It'd definitely be a case to consider as part of this. For something dynamic like you're describing, having to deploy a new DAG could be cumbersome. But it does also provide us a "paper trail" of sorts for migrations, so that has its own advantages. I could see a case where we use Variables or DAG run config to entirely dictate the set of operations, which would remove the need to add a new DAG each time. Something to consider for the RFC there! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran this locally and it worked exactly as expected! I have one note but otherwise this is good to go IMO ⭐
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This worked great for me! Thank you for the testing instructions, and the refresher on the context behind this. I really like the approach and think it'll be a really interesting template for a generic interface sometime down the road :)
No blocking feedback, this looks good to me!
) | ||
|
||
get_statistics >> [update_license_url, final_report] | ||
update_license_url >> final_report |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be written more simply as get_statistics >> update_license_url >> final_report
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we simplify it, would it skip the update_license_url
step if the get_statistics
says that we don't need to update anything? The intention was to skip the update step if we ever run this on a fully cleaned dataset, but I'm not very good with branching operators :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, that's right! Do you mind adding a comment to that effect Olga? You're totally right, the current setup is required for the branching operator, but I could see making that change in the future and accidentally breaking something 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're totally correct, I missed that there was a branching operator! 😅
postgres_conn_id: str, | ||
) -> Literal["update_license_url", "final_report"]: | ||
logger.info("Getting image records without license_url in meta_data.") | ||
postgres = PostgresHook(postgres_conn_id=postgres_conn_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the PostgresHook
directly we pass a default_statement_timeout
instead of just the task
, like:
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)
If not, it'll fall back to the 1-hr default timeout from DAG_DEFAULT_ARGS
.
@obulat this could be merged if you suspect the default timeout (1h) will be sufficient for each posgtres operation made by the hook, but otherwise we'll need to add the task timeout to the hook (or something more appropriate) per Staci's comment. |
@stacimc, thank you for your instructions for setting the timeout. We have 366,974 images that should be affected. I have no idea what timeout value we should use. 5 hours?.. What do you think? |
Signed-off-by: Olga Bulat <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Co-authored-by: Madison Swain-Bowden <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Co-authored-by: Madison Swain-Bowden <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
Signed-off-by: Olga Bulat <[email protected]>
The new timeout looks good to me 👍 I think this can be merged and enabled in production. The DAG will be off by default so you'll need to enable it through the Airflow UI. |
I triggered this query at 05.53 UTC today (2023-03-13). Here are some notes on the run: The first step reported a different number of records with
The second step takes around 30 minutes for each license pair (license name and license version), so it will probably time out. If it does, to improve the performance next time, instead of running a select query for every license pair, we could select the records' license_dict = {}
select_query = "SELECT identifier, license, license_url FROM image WHERE meta_data IS NULL;"
results = postgres.get_records(select_query)
for result in results:
identifier, license_, version = result
license_pair = f"{license_},{version}"
if not license_pair in licenses:
licenses[license_pair] = [identifier]
else:
licenses[license_pair].append(identifier)
for license_pair in license_dict:
license_, license_version = license_pair.split(",")
license_url = get_license_info_from_license_pair(license_, license_version)
update_query = dedent(
f"""
UPDATE image
SET meta_data = {Json(license_url)}
WHERE identifier IN ({','.join([f"'{r}'" for r in license_dict[license_pair]])});
"""
updated_count = postgres.run(
update_license_url_query, autocommit=True, handler=RETURN_ROW_COUNT
) |
|
Fixes
Related to WordPress/openverse#1565 by @obulat
Description
This PR adds
license_url
to themeta_data
jsonb field where themeta_data
field isNULL
.To make the query faster, we split it into several steps:
meta_data
isNULL
.meta_data
:a. Select all
identifier
s of the items wheremeta_data
isNULL
. Havingidentifier
s will enable us not to dosequential scan
through all of the database (i.e., not to test each row individually), and only work with the 366 974items identified in the related issue.b. For each
license
andlicense_version
pair, we run theUPDATE
query setting themeta_data
to{"license_url": <URL>}
. Note: I'm usingJson
here although the column isjsonb
. I'm not sure if it's correct, but it works locally.meta_data
isNULL
. Normally, the count should be 0, but due to the possible lack oflicense_version
in some of the rows (see Possible problems below), it might not be.Possible problems
There is an issue in the CC repository about missing
license_version
in some items. If there are still items withoutlicense_version
in the database, their count will be reported in the last step.Limitations
This PR only solves a part of the problem for items where
meta_data
isNULL
.I have confirmed that there are some items with a non-null
meta_data
that don't havelicense_url
field inmeta_data
(e.g., item with identifierd876532b-01da-4e14-aa73-237c0a160d1f
). For this, the possible solution is to add thelicense_url
check in the ingestion server, adding it if necessary, and then propagate that data up to the catalog. This will be much faster than doing this on the catalog side because here, we would need to "sequentially scan" hundreds of millions of records.I'm also not sure about the reporting step, and open to suggestions about it. Although, I'm not sure how much we need to report for a one-off maintenance DAG.
Testing Instructions
Run some provider API DAGs if you have no data in your local database.
Set
meta_data
toNULL
:UPDATE image SET meta_data = null
You can also set some of the
license_version
values toNULL
:Run the
add_license_url
DAG and check that it adds a json object to themeta_data
field with the license URL in it. Also, if you set the license_version toNULL
, you should see that reported in the last step.Checklist
Update index.md
).main
) ora parent feature branch.
errors.
Developer Certificate of Origin
Developer Certificate of Origin