Skip to content

Commit

Permalink
Do batching with unit IDs [RHELDST-20725]
Browse files Browse the repository at this point in the history
We want to do garbage collection in batches since doing it all at once
can cause OOM exceptions. The original attempt used the limit value in
criteria submitted to the unassociation endpoint, which did not work.
This change instead creates batches using unit searching instead. The
size of the batches can be changed by setting the
PULP_GC_UNASSOCIATE_BATCH_LIMIT env-var.
  • Loading branch information
amcmahon-rh committed Nov 22, 2023
1 parent 1baf9f7 commit e3fd37a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 15 deletions.
52 changes: 37 additions & 15 deletions pubtools/_pulp/tasks/garbage_collect.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from datetime import datetime, timedelta
import os

from pubtools.pulplib import Criteria, Matcher, RpmUnit

Expand All @@ -10,6 +11,9 @@
LOG = logging.getLogger("pubtools.pulp")
step = PulpTask.step

UNASSOCIATE_BATCH_LIMIT = int(
os.getenv("PULP_GC_UNASSOCIATE_BATCH_LIMIT", "10000"))


class GarbageCollect(PulpClientService, PulpTask):
"""Perform garbage collection on Pulp data.
Expand Down Expand Up @@ -80,13 +84,6 @@ def clean_all_rpm_content(self):
# Clear out old all-rpm-content
LOG.info("Start old all-rpm-content deletion")
arc_threshold = self.args.arc_threshold
criteria = Criteria.and_(
Criteria.with_unit_type(RpmUnit),
Criteria.with_field(
"cdn_published",
Matcher.less_than(datetime.utcnow() - timedelta(days=arc_threshold)),
),
)
clean_repos = list(
self.pulp_client.search_repository(
Criteria.with_field("id", "all-rpm-content")
Expand All @@ -96,15 +93,40 @@ def clean_all_rpm_content(self):
LOG.info("No repos found for cleaning.")
return
arc_repo = clean_repos[0]
deleted_arc = list(arc_repo.remove_content(criteria=criteria))
deleted_content = []
for task in deleted_arc:
if task.repo_id == "all-rpm-content":
for unit in task.units:
LOG.info("Old all-rpm-content deleted: %s", unit.name)
deleted_content.append(unit)
if not deleted_content:

search_criteria = Criteria.and_(
Criteria.with_unit_type(RpmUnit, unit_fields=["unit_id"]),
Criteria.with_field(
"cdn_published",
Matcher.less_than(
datetime.utcnow() - timedelta(days=arc_threshold)),
),
)

LOG.info("Collecting unit batches for deletion")
units = list(arc_repo.search_content(criteria=search_criteria))
if not units:
LOG.info("No all-rpm-content found older than %s", arc_threshold)
return

unit_batches = [units[i:i + UNASSOCIATE_BATCH_LIMIT] for i in
range(0, len(units), UNASSOCIATE_BATCH_LIMIT)]
for batch in unit_batches:
LOG.info("Submitting batch for deletion")
deletion_criteria = Criteria.and_(
Criteria.with_unit_type(RpmUnit),
Criteria.with_field(
"unit_id",
Matcher.in_([unit.unit_id for unit in batch]),
),
)
LOG.debug("Submitting batch for deletion")
deletion_task = arc_repo.remove_content(
criteria=deletion_criteria).result()
for task in deletion_task:
if task.repo_id == "all-rpm-content":
for unit in task.units:
LOG.info("Old all-rpm-content deleted: %s", unit.name)


def entry_point():
Expand Down
53 changes: 53 additions & 0 deletions tests/garbage_collect/test_garbage_collect.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

import pytest
import datetime
from mock import Mock, patch
Expand Down Expand Up @@ -227,11 +229,62 @@ def test_arc_garbage_collect(mock_logger):
with patch("sys.argv", arg):
with _patch_pulp_client(controller.client):
gc.main()

updated_rpm = list(client.get_repository("all-rpm-content").search_content())
assert len(updated_rpm) == 1
mock_logger.info.assert_any_call("Old all-rpm-content deleted: %s", rpm2.name)



def test_arc_garbage_collect_in_batches(mock_logger):
"""deletes relevant all-rpm-content content in batches"""
os.environ["PUBTOOLS_PULPLIB_PAGE_SIZE"] = "5"
repo = Repository(
id="all-rpm-content",
created=_get_created(7),
)
controller = _get_fake_controller(repo)
client = controller.client
assert list(client.search_content()) == []

all_rpm_content = client.get_repository("all-rpm-content").result()
new_rpms = [RpmUnit(
cdn_published=datetime.datetime.utcnow(),
arch="src",
filename="test-arc-new%02d-1.0-1.src.rpm" % i,
name="test-arc-new%02d" % i,
version="1.0",
release="1",
content_type_id="rpm",
unit_id="gc_arc_new%02d" % i,
) for i in range(0, 10)]
old_rpms = [RpmUnit(
cdn_published=datetime.datetime.utcnow() - datetime.timedelta(
days=190),
arch="src",
filename="test-arc-old%02d-1.0-1.src.rpm" % i,
name="test-arc-old%02d" % i,
version="1.0",
release="1",
content_type_id="rpm",
unit_id="gc_arc_old%02d" % i,
) for i in range(0, 23)]

controller.insert_units(all_rpm_content, new_rpms + old_rpms)
updated_rpm = list(client.get_repository("all-rpm-content").search_content())
assert len(updated_rpm) == 33
gc = GarbageCollect()
arg = ["", "--pulp-url", "http://some.url"]

with patch("sys.argv", arg):
with _patch_pulp_client(controller.client):
gc.main()
updated_rpm = list(client.get_repository("all-rpm-content").search_content())
assert len(updated_rpm) == 10
assert len([call for call in mock_logger.debug.call_args_list
if 'Submitting batch for deletion' in call.args]) == 5


def test_arc_garbage_collect_0items(mock_logger):
"""no content deleted from all-rpm-content"""
repo = Repository(
Expand Down

0 comments on commit e3fd37a

Please sign in to comment.