Skip to content

Commit

Permalink
Do batching with unit IDs [RHELDST-20725]
Browse files Browse the repository at this point in the history
We want to do garbage collection in batches since doing it all at once
can cause OOM exceptions. The original attempt used the limit value in
criteria submitted to the unassociation endpoint, which did not work.
This change instead creates batches using unit searching instead. The
size of the batches can be changed by setting the
PUBTOOLS_PULPLIB_PAGE_SIZE env-var.
  • Loading branch information
amcmahon-rh committed Nov 21, 2023
1 parent 1baf9f7 commit f8582c3
Showing 1 changed file with 41 additions and 13 deletions.
54 changes: 41 additions & 13 deletions pubtools/_pulp/tasks/garbage_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pubtools._pulp.task import PulpTask
from pubtools._pulp.services import PulpClientService

from more_executors.futures import f_flat_map, f_return

LOG = logging.getLogger("pubtools.pulp")
step = PulpTask.step
Expand Down Expand Up @@ -80,13 +81,6 @@ def clean_all_rpm_content(self):
# Clear out old all-rpm-content
LOG.info("Start old all-rpm-content deletion")
arc_threshold = self.args.arc_threshold
criteria = Criteria.and_(
Criteria.with_unit_type(RpmUnit),
Criteria.with_field(
"cdn_published",
Matcher.less_than(datetime.utcnow() - timedelta(days=arc_threshold)),
),
)
clean_repos = list(
self.pulp_client.search_repository(
Criteria.with_field("id", "all-rpm-content")
Expand All @@ -96,13 +90,47 @@ def clean_all_rpm_content(self):
LOG.info("No repos found for cleaning.")
return
arc_repo = clean_repos[0]
deleted_arc = list(arc_repo.remove_content(criteria=criteria))

search_criteria = Criteria.and_(
Criteria.with_unit_type(RpmUnit, unit_fields=["unit_id"]),
Criteria.with_field(
"cdn_published",
Matcher.less_than(
datetime.utcnow() - timedelta(days=arc_threshold)),
),
)
deleted_content = []
for task in deleted_arc:
if task.repo_id == "all-rpm-content":
for unit in task.units:
LOG.info("Old all-rpm-content deleted: %s", unit.name)
deleted_content.append(unit)

LOG.info("Collecting unit batches for deletion")
first_page = arc_repo.search_content(criteria=search_criteria)

def handle_pages(page):
batch_accumulator = [[unit.unit_id for unit in page.data]]
if page.next:
batch_accumulator.extend(
f_flat_map(page.next, handle_pages).result())
return f_return(batch_accumulator)

batches = f_flat_map(first_page, handle_pages).result()

for batch in batches:
if not batch:
continue
LOG.info("Submitting batch for deletion")
deletion_criteria = Criteria.and_(
Criteria.with_unit_type(RpmUnit),
Criteria.with_field(
"unit_id",
Matcher.in_(batch),
),
)
deletion_task = arc_repo.remove_content(criteria=deletion_criteria).result()
for task in deletion_task:
if task.repo_id == "all-rpm-content":
for unit in task.units:
LOG.info("Old all-rpm-content deleted: %s", unit.name)
deleted_content.append(unit)

if not deleted_content:
LOG.info("No all-rpm-content found older than %s", arc_threshold)

Expand Down

0 comments on commit f8582c3

Please sign in to comment.