From 17fa1fcf6e232a7938258d1d4a452f6fa853095b Mon Sep 17 00:00:00 2001 From: Aidan McMahon-Smith Date: Tue, 14 Nov 2023 16:46:10 +0100 Subject: [PATCH] Do batching with unit IDs [RHELDST-20725] We want to do garbage collection in batches since doing it all at once can cause OOM exceptions. The original attempt used the limit value in criteria submitted to the unassociation endpoint, which did not work. This change instead creates batches using unit searching instead. The size of the batches can be changed by setting the PULP_GC_UNASSOCIATE_BATCH_LIMIT env-var. --- pubtools/_pulp/tasks/garbage_collect.py | 52 ++++++++++++------ tests/garbage_collect/test_garbage_collect.py | 54 +++++++++++++++++++ 2 files changed, 91 insertions(+), 15 deletions(-) diff --git a/pubtools/_pulp/tasks/garbage_collect.py b/pubtools/_pulp/tasks/garbage_collect.py index 0d66d674..84cf4129 100644 --- a/pubtools/_pulp/tasks/garbage_collect.py +++ b/pubtools/_pulp/tasks/garbage_collect.py @@ -1,5 +1,6 @@ import logging from datetime import datetime, timedelta +import os from pubtools.pulplib import Criteria, Matcher, RpmUnit @@ -10,6 +11,9 @@ LOG = logging.getLogger("pubtools.pulp") step = PulpTask.step +UNASSOCIATE_BATCH_LIMIT = int( + os.getenv("PULP_GC_UNASSOCIATE_BATCH_LIMIT", "10000")) + class GarbageCollect(PulpClientService, PulpTask): """Perform garbage collection on Pulp data. @@ -80,13 +84,6 @@ def clean_all_rpm_content(self): # Clear out old all-rpm-content LOG.info("Start old all-rpm-content deletion") arc_threshold = self.args.arc_threshold - criteria = Criteria.and_( - Criteria.with_unit_type(RpmUnit), - Criteria.with_field( - "cdn_published", - Matcher.less_than(datetime.utcnow() - timedelta(days=arc_threshold)), - ), - ) clean_repos = list( self.pulp_client.search_repository( Criteria.with_field("id", "all-rpm-content") @@ -96,15 +93,40 @@ def clean_all_rpm_content(self): LOG.info("No repos found for cleaning.") return arc_repo = clean_repos[0] - deleted_arc = list(arc_repo.remove_content(criteria=criteria)) - deleted_content = [] - for task in deleted_arc: - if task.repo_id == "all-rpm-content": - for unit in task.units: - LOG.info("Old all-rpm-content deleted: %s", unit.name) - deleted_content.append(unit) - if not deleted_content: + + search_criteria = Criteria.and_( + Criteria.with_unit_type(RpmUnit, unit_fields=["unit_id"]), + Criteria.with_field( + "cdn_published", + Matcher.less_than( + datetime.utcnow() - timedelta(days=arc_threshold)), + ), + ) + + LOG.info("Collecting unit batches for deletion") + units = list(arc_repo.search_content(criteria=search_criteria)) + if not units: LOG.info("No all-rpm-content found older than %s", arc_threshold) + return + + unit_batches = [units[i:i + UNASSOCIATE_BATCH_LIMIT] for i in + range(0, len(units), UNASSOCIATE_BATCH_LIMIT)] + for batch in unit_batches: + LOG.info("Submitting batch for deletion") + deletion_criteria = Criteria.and_( + Criteria.with_unit_type(RpmUnit), + Criteria.with_field( + "unit_id", + Matcher.in_([unit.unit_id for unit in batch]), + ), + ) + LOG.debug("Submitting batch for deletion") + deletion_task = arc_repo.remove_content( + criteria=deletion_criteria).result() + for task in deletion_task: + if task.repo_id == "all-rpm-content": + for unit in task.units: + LOG.info("Old all-rpm-content deleted: %s", unit.name) def entry_point(): diff --git a/tests/garbage_collect/test_garbage_collect.py b/tests/garbage_collect/test_garbage_collect.py index 39eda4e1..b7a041d2 100644 --- a/tests/garbage_collect/test_garbage_collect.py +++ b/tests/garbage_collect/test_garbage_collect.py @@ -1,3 +1,5 @@ +import os + import pytest import datetime from mock import Mock, patch @@ -14,6 +16,7 @@ ) from pubtools._pulp.tasks.garbage_collect import GarbageCollect, entry_point +import pubtools._pulp.tasks.garbage_collect as gc_module @pytest.fixture @@ -227,11 +230,62 @@ def test_arc_garbage_collect(mock_logger): with patch("sys.argv", arg): with _patch_pulp_client(controller.client): gc.main() + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) assert len(updated_rpm) == 1 mock_logger.info.assert_any_call("Old all-rpm-content deleted: %s", rpm2.name) + +def test_arc_garbage_collect_in_batches(mock_logger, monkeypatch): + """deletes relevant all-rpm-content content in batches""" + monkeypatch.setattr(gc_module, 'UNASSOCIATE_BATCH_LIMIT', 5) + repo = Repository( + id="all-rpm-content", + created=_get_created(7), + ) + controller = _get_fake_controller(repo) + client = controller.client + assert list(client.search_content()) == [] + + all_rpm_content = client.get_repository("all-rpm-content").result() + new_rpms = [RpmUnit( + cdn_published=datetime.datetime.utcnow(), + arch="src", + filename="test-arc-new%02d-1.0-1.src.rpm" % i, + name="test-arc-new%02d" % i, + version="1.0", + release="1", + content_type_id="rpm", + unit_id="gc_arc_new%02d" % i, + ) for i in range(0, 10)] + old_rpms = [RpmUnit( + cdn_published=datetime.datetime.utcnow() - datetime.timedelta( + days=190), + arch="src", + filename="test-arc-old%02d-1.0-1.src.rpm" % i, + name="test-arc-old%02d" % i, + version="1.0", + release="1", + content_type_id="rpm", + unit_id="gc_arc_old%02d" % i, + ) for i in range(0, 23)] + + controller.insert_units(all_rpm_content, new_rpms + old_rpms) + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) + assert len(updated_rpm) == 33 + gc = GarbageCollect() + arg = ["", "--pulp-url", "http://some.url"] + + with patch("sys.argv", arg): + with _patch_pulp_client(controller.client): + gc.main() + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) + assert len(updated_rpm) == 10 + assert len([call for call in mock_logger.debug.call_args_list + if 'Submitting batch for deletion' in call.args]) == 5 + + def test_arc_garbage_collect_0items(mock_logger): """no content deleted from all-rpm-content""" repo = Repository(