From fc384170bbace222a35f6df979abb807ad348ec9 Mon Sep 17 00:00:00 2001 From: Aidan McMahon-Smith Date: Tue, 14 Nov 2023 16:46:10 +0100 Subject: [PATCH] Do batching with unit IDs [RHELDST-20725] We want to do garbage collection in batches since doing it all at once can cause OOM exceptions. The original attempt used the limit value in criteria submitted to the unassociation endpoint, which did not work. This change instead creates batches using unit searching instead. The size of the batches can be changed by setting the PULP_GC_UNASSOCIATE_BATCH_LIMIT env-var. --- pubtools/_pulp/tasks/garbage_collect.py | 52 ++++++++++++------ tests/garbage_collect/test_garbage_collect.py | 53 +++++++++++++++++++ 2 files changed, 90 insertions(+), 15 deletions(-) diff --git a/pubtools/_pulp/tasks/garbage_collect.py b/pubtools/_pulp/tasks/garbage_collect.py index 0d66d674..84cf4129 100644 --- a/pubtools/_pulp/tasks/garbage_collect.py +++ b/pubtools/_pulp/tasks/garbage_collect.py @@ -1,5 +1,6 @@ import logging from datetime import datetime, timedelta +import os from pubtools.pulplib import Criteria, Matcher, RpmUnit @@ -10,6 +11,9 @@ LOG = logging.getLogger("pubtools.pulp") step = PulpTask.step +UNASSOCIATE_BATCH_LIMIT = int( + os.getenv("PULP_GC_UNASSOCIATE_BATCH_LIMIT", "10000")) + class GarbageCollect(PulpClientService, PulpTask): """Perform garbage collection on Pulp data. @@ -80,13 +84,6 @@ def clean_all_rpm_content(self): # Clear out old all-rpm-content LOG.info("Start old all-rpm-content deletion") arc_threshold = self.args.arc_threshold - criteria = Criteria.and_( - Criteria.with_unit_type(RpmUnit), - Criteria.with_field( - "cdn_published", - Matcher.less_than(datetime.utcnow() - timedelta(days=arc_threshold)), - ), - ) clean_repos = list( self.pulp_client.search_repository( Criteria.with_field("id", "all-rpm-content") @@ -96,15 +93,40 @@ def clean_all_rpm_content(self): LOG.info("No repos found for cleaning.") return arc_repo = clean_repos[0] - deleted_arc = list(arc_repo.remove_content(criteria=criteria)) - deleted_content = [] - for task in deleted_arc: - if task.repo_id == "all-rpm-content": - for unit in task.units: - LOG.info("Old all-rpm-content deleted: %s", unit.name) - deleted_content.append(unit) - if not deleted_content: + + search_criteria = Criteria.and_( + Criteria.with_unit_type(RpmUnit, unit_fields=["unit_id"]), + Criteria.with_field( + "cdn_published", + Matcher.less_than( + datetime.utcnow() - timedelta(days=arc_threshold)), + ), + ) + + LOG.info("Collecting unit batches for deletion") + units = list(arc_repo.search_content(criteria=search_criteria)) + if not units: LOG.info("No all-rpm-content found older than %s", arc_threshold) + return + + unit_batches = [units[i:i + UNASSOCIATE_BATCH_LIMIT] for i in + range(0, len(units), UNASSOCIATE_BATCH_LIMIT)] + for batch in unit_batches: + LOG.info("Submitting batch for deletion") + deletion_criteria = Criteria.and_( + Criteria.with_unit_type(RpmUnit), + Criteria.with_field( + "unit_id", + Matcher.in_([unit.unit_id for unit in batch]), + ), + ) + LOG.debug("Submitting batch for deletion") + deletion_task = arc_repo.remove_content( + criteria=deletion_criteria).result() + for task in deletion_task: + if task.repo_id == "all-rpm-content": + for unit in task.units: + LOG.info("Old all-rpm-content deleted: %s", unit.name) def entry_point(): diff --git a/tests/garbage_collect/test_garbage_collect.py b/tests/garbage_collect/test_garbage_collect.py index 39eda4e1..1095df2a 100644 --- a/tests/garbage_collect/test_garbage_collect.py +++ b/tests/garbage_collect/test_garbage_collect.py @@ -1,3 +1,5 @@ +import os + import pytest import datetime from mock import Mock, patch @@ -227,11 +229,62 @@ def test_arc_garbage_collect(mock_logger): with patch("sys.argv", arg): with _patch_pulp_client(controller.client): gc.main() + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) assert len(updated_rpm) == 1 mock_logger.info.assert_any_call("Old all-rpm-content deleted: %s", rpm2.name) + +def test_arc_garbage_collect_in_batches(mock_logger): + """deletes relevant all-rpm-content content in batches""" + os.environ["PULP_GC_UNASSOCIATE_BATCH_LIMIT"] = "5" + repo = Repository( + id="all-rpm-content", + created=_get_created(7), + ) + controller = _get_fake_controller(repo) + client = controller.client + assert list(client.search_content()) == [] + + all_rpm_content = client.get_repository("all-rpm-content").result() + new_rpms = [RpmUnit( + cdn_published=datetime.datetime.utcnow(), + arch="src", + filename="test-arc-new%02d-1.0-1.src.rpm" % i, + name="test-arc-new%02d" % i, + version="1.0", + release="1", + content_type_id="rpm", + unit_id="gc_arc_new%02d" % i, + ) for i in range(0, 10)] + old_rpms = [RpmUnit( + cdn_published=datetime.datetime.utcnow() - datetime.timedelta( + days=190), + arch="src", + filename="test-arc-old%02d-1.0-1.src.rpm" % i, + name="test-arc-old%02d" % i, + version="1.0", + release="1", + content_type_id="rpm", + unit_id="gc_arc_old%02d" % i, + ) for i in range(0, 23)] + + controller.insert_units(all_rpm_content, new_rpms + old_rpms) + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) + assert len(updated_rpm) == 33 + gc = GarbageCollect() + arg = ["", "--pulp-url", "http://some.url"] + + with patch("sys.argv", arg): + with _patch_pulp_client(controller.client): + gc.main() + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) + assert len(updated_rpm) == 10 + assert len([call for call in mock_logger.debug.call_args_list + if 'Submitting batch for deletion' in call.args]) == 5 + + def test_arc_garbage_collect_0items(mock_logger): """no content deleted from all-rpm-content""" repo = Repository(