diff --git a/pubtools/_pulp/tasks/garbage_collect.py b/pubtools/_pulp/tasks/garbage_collect.py index 0d66d674..84cf4129 100644 --- a/pubtools/_pulp/tasks/garbage_collect.py +++ b/pubtools/_pulp/tasks/garbage_collect.py @@ -1,5 +1,6 @@ import logging from datetime import datetime, timedelta +import os from pubtools.pulplib import Criteria, Matcher, RpmUnit @@ -10,6 +11,9 @@ LOG = logging.getLogger("pubtools.pulp") step = PulpTask.step +UNASSOCIATE_BATCH_LIMIT = int( + os.getenv("PULP_GC_UNASSOCIATE_BATCH_LIMIT", "10000")) + class GarbageCollect(PulpClientService, PulpTask): """Perform garbage collection on Pulp data. @@ -80,13 +84,6 @@ def clean_all_rpm_content(self): # Clear out old all-rpm-content LOG.info("Start old all-rpm-content deletion") arc_threshold = self.args.arc_threshold - criteria = Criteria.and_( - Criteria.with_unit_type(RpmUnit), - Criteria.with_field( - "cdn_published", - Matcher.less_than(datetime.utcnow() - timedelta(days=arc_threshold)), - ), - ) clean_repos = list( self.pulp_client.search_repository( Criteria.with_field("id", "all-rpm-content") @@ -96,15 +93,40 @@ def clean_all_rpm_content(self): LOG.info("No repos found for cleaning.") return arc_repo = clean_repos[0] - deleted_arc = list(arc_repo.remove_content(criteria=criteria)) - deleted_content = [] - for task in deleted_arc: - if task.repo_id == "all-rpm-content": - for unit in task.units: - LOG.info("Old all-rpm-content deleted: %s", unit.name) - deleted_content.append(unit) - if not deleted_content: + + search_criteria = Criteria.and_( + Criteria.with_unit_type(RpmUnit, unit_fields=["unit_id"]), + Criteria.with_field( + "cdn_published", + Matcher.less_than( + datetime.utcnow() - timedelta(days=arc_threshold)), + ), + ) + + LOG.info("Collecting unit batches for deletion") + units = list(arc_repo.search_content(criteria=search_criteria)) + if not units: LOG.info("No all-rpm-content found older than %s", arc_threshold) + return + + unit_batches = [units[i:i + UNASSOCIATE_BATCH_LIMIT] for i in + range(0, len(units), UNASSOCIATE_BATCH_LIMIT)] + for batch in unit_batches: + LOG.info("Submitting batch for deletion") + deletion_criteria = Criteria.and_( + Criteria.with_unit_type(RpmUnit), + Criteria.with_field( + "unit_id", + Matcher.in_([unit.unit_id for unit in batch]), + ), + ) + LOG.debug("Submitting batch for deletion") + deletion_task = arc_repo.remove_content( + criteria=deletion_criteria).result() + for task in deletion_task: + if task.repo_id == "all-rpm-content": + for unit in task.units: + LOG.info("Old all-rpm-content deleted: %s", unit.name) def entry_point(): diff --git a/tests/garbage_collect/test_garbage_collect.py b/tests/garbage_collect/test_garbage_collect.py index 39eda4e1..9f310db4 100644 --- a/tests/garbage_collect/test_garbage_collect.py +++ b/tests/garbage_collect/test_garbage_collect.py @@ -1,3 +1,5 @@ +import os + import pytest import datetime from mock import Mock, patch @@ -14,6 +16,7 @@ ) from pubtools._pulp.tasks.garbage_collect import GarbageCollect, entry_point +import pubtools._pulp.tasks.garbage_collect as gc_module @pytest.fixture @@ -227,11 +230,62 @@ def test_arc_garbage_collect(mock_logger): with patch("sys.argv", arg): with _patch_pulp_client(controller.client): gc.main() + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) assert len(updated_rpm) == 1 mock_logger.info.assert_any_call("Old all-rpm-content deleted: %s", rpm2.name) + +def test_arc_garbage_collect_in_batches(mock_logger): + """deletes relevant all-rpm-content content in batches""" + gc_module.UNASSOCIATE_BATCH_LIMIT = 5 + repo = Repository( + id="all-rpm-content", + created=_get_created(7), + ) + controller = _get_fake_controller(repo) + client = controller.client + assert list(client.search_content()) == [] + + all_rpm_content = client.get_repository("all-rpm-content").result() + new_rpms = [RpmUnit( + cdn_published=datetime.datetime.utcnow(), + arch="src", + filename="test-arc-new%02d-1.0-1.src.rpm" % i, + name="test-arc-new%02d" % i, + version="1.0", + release="1", + content_type_id="rpm", + unit_id="gc_arc_new%02d" % i, + ) for i in range(0, 10)] + old_rpms = [RpmUnit( + cdn_published=datetime.datetime.utcnow() - datetime.timedelta( + days=190), + arch="src", + filename="test-arc-old%02d-1.0-1.src.rpm" % i, + name="test-arc-old%02d" % i, + version="1.0", + release="1", + content_type_id="rpm", + unit_id="gc_arc_old%02d" % i, + ) for i in range(0, 23)] + + controller.insert_units(all_rpm_content, new_rpms + old_rpms) + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) + assert len(updated_rpm) == 33 + gc = GarbageCollect() + arg = ["", "--pulp-url", "http://some.url"] + + with patch("sys.argv", arg): + with _patch_pulp_client(controller.client): + gc.main() + updated_rpm = list(client.get_repository("all-rpm-content").search_content()) + assert len(updated_rpm) == 10 + assert len([call for call in mock_logger.debug.call_args_list + if 'Submitting batch for deletion' in call.args]) == 5 + + def test_arc_garbage_collect_0items(mock_logger): """no content deleted from all-rpm-content""" repo = Repository(