From 402fd406e3d2060522f2e891832401e29fa2ebb0 Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Thu, 6 Jun 2024 12:06:46 -0700 Subject: [PATCH 1/6] impr(UTAPI-103): Add small LRU cache to BucketDClient._get_bucket_attributes --- lib/reindex/s3_bucketd.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/reindex/s3_bucketd.py b/lib/reindex/s3_bucketd.py index b76ed4eb..0fc0002d 100644 --- a/lib/reindex/s3_bucketd.py +++ b/lib/reindex/s3_bucketd.py @@ -1,5 +1,6 @@ import argparse import concurrent.futures as futures +import functools import itertools import json import logging @@ -141,6 +142,7 @@ def _list_bucket(self, bucket, **kwargs): else: is_truncated = len(payload) > 0 + @functools.lru_cache(maxsize=16) def _get_bucket_attributes(self, name): url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name) try: From 34af848b939770cf5c208ba8eaf5bf0c518ce4b2 Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Thu, 6 Jun 2024 12:08:40 -0700 Subject: [PATCH 2/6] impr(UTAPI-103): Add BucketNotFound Exeception for _get_bucket_attributes --- lib/reindex/s3_bucketd.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/reindex/s3_bucketd.py b/lib/reindex/s3_bucketd.py index 0fc0002d..5df0af6b 100644 --- a/lib/reindex/s3_bucketd.py +++ b/lib/reindex/s3_bucketd.py @@ -63,6 +63,10 @@ class InvalidListing(Exception): def __init__(self, bucket): super().__init__('Invalid contents found while listing bucket %s'%bucket) +class BucketNotFound(Exception): + def __init__(self, bucket): + super().__init__('Bucket %s not found'%bucket) + class BucketDClient: '''Performs Listing calls against bucketd''' @@ -151,7 +155,7 @@ def _get_bucket_attributes(self, name): return resp.json() else: _log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code)) - raise InvalidListing(name) + raise BucketNotFound(name) except ValueError as e: _log.exception(e) _log.error('Invalid attributes response body! bucket:%s'%name) From 210ba2fd824b468f5463bbb66083baa58f2b0477 Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Thu, 6 Jun 2024 12:10:40 -0700 Subject: [PATCH 3/6] impr(UTAPI-103): Add BucketDClient.get_bucket_md() --- lib/reindex/s3_bucketd.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/reindex/s3_bucketd.py b/lib/reindex/s3_bucketd.py index 5df0af6b..ff122587 100644 --- a/lib/reindex/s3_bucketd.py +++ b/lib/reindex/s3_bucketd.py @@ -168,6 +168,14 @@ def _get_bucket_attributes(self, name): _log.error('Unhandled exception getting bucket attributes bucket:%s'%name) raise + def get_bucket_md(self, name): + md = self._get_bucket_attributes(name) + canonId = md.get('owner') + if canonId is None: + _log.error('No owner found for bucket %s'%name) + raise InvalidListing(name) + return Bucket(canonId, name, md.get('objectLockEnabled', False)) + def list_buckets(self, name = None): def get_next_marker(p): From f87a65065ad4e7ad325a556ff2042c39e9794ad8 Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Thu, 6 Jun 2024 12:13:25 -0700 Subject: [PATCH 4/6] impr(UTAPI-103): Add --dry-run option --- lib/reindex/s3_bucketd.py | 71 ++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/lib/reindex/s3_bucketd.py b/lib/reindex/s3_bucketd.py index ff122587..c788a95f 100644 --- a/lib/reindex/s3_bucketd.py +++ b/lib/reindex/s3_bucketd.py @@ -36,6 +36,7 @@ def get_options(): parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request") parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy") parser.add_argument("--debug", action='store_true', help="Enable debug logging") + parser.add_argument("--dry-run", action="store_true", help="Do not update redis") return parser.parse_args() def chunks(iterable, size): @@ -443,11 +444,19 @@ def log_report(resource, name, obj_count, total_size): update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size) # Bucket reports can be updated as we get them - pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load - for bucket, report in bucket_reports.items(): - update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size']) - log_report('buckets', bucket, report['obj_count'], report['total_size']) - pipeline.execute() + if options.dry_run: + for bucket, report in bucket_reports.items(): + _log.warning( + "DryRun: resource buckets [%s] will be not updated with obj_count %i and total_size %i" % ( + bucket, report['obj_count'], report['total_size'] + ) + ) + else: + pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load + for bucket, report in bucket_reports.items(): + update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size']) + log_report('buckets', bucket, report['obj_count'], report['total_size']) + pipeline.execute() recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets')) if options.bucket is None: @@ -459,24 +468,35 @@ def log_report(resource, name, obj_count, total_size): stale_buckets = set() _log.info('Found %s stale buckets' % len(stale_buckets)) - for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE): - pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load - for bucket in chunk: - update_redis(pipeline, 'buckets', bucket, 0, 0) - log_report('buckets', bucket, 0, 0) - pipeline.execute() + if options.dry_run: + _log.warning("DryRun: not updating stale buckets") + else: + for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE): + pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load + for bucket in chunk: + update_redis(pipeline, 'buckets', bucket, 0, 0) + log_report('buckets', bucket, 0, 0) + pipeline.execute() # Account metrics are not updated if a bucket is specified if options.bucket is None: # Don't update any accounts with failed listings without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items()) - # Update total account reports in chunks - for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE): - pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load - for userid, report in chunk: - update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size']) - log_report('accounts', userid, report['obj_count'], report['total_size']) - pipeline.execute() + if options.dry_run: + for userid, report in account_reports.items(): + _log.warning( + "DryRun: resource account [%s] will be not updated with obj_count %i and total_size %i" % ( + userid, report['obj_count'], report['total_size'] + ) + ) + else: + # Update total account reports in chunks + for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE): + pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load + for userid, report in chunk: + update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size']) + log_report('accounts', userid, report['obj_count'], report['total_size']) + pipeline.execute() # Include failed_accounts in observed_accounts to avoid clearing metrics observed_accounts = failed_accounts.union(set(account_reports.keys())) @@ -485,9 +505,12 @@ def log_report(resource, name, obj_count, total_size): # Stale accounts and buckets are ones that do not appear in the listing, but have recorded values stale_accounts = recorded_accounts.difference(observed_accounts) _log.info('Found %s stale accounts' % len(stale_accounts)) - for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE): - pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load - for account in chunk: - update_redis(pipeline, 'accounts', account, 0, 0) - log_report('accounts', account, 0, 0) - pipeline.execute() + if options.dry_run: + _log.warning("DryRun: not updating stale accounts") + else: + for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE): + pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load + for account in chunk: + update_redis(pipeline, 'accounts', account, 0, 0) + log_report('accounts', account, 0, 0) + pipeline.execute() From 496ff61a210ab8632f0b6a969f74bbc56944feda Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Thu, 6 Jun 2024 12:22:33 -0700 Subject: [PATCH 5/6] impr(UTAPI-103): Support multiple specified buckets and prep for account support --- lib/reindex/s3_bucketd.py | 80 ++++++++++++++++++++++++++++++--------- 1 file changed, 62 insertions(+), 18 deletions(-) diff --git a/lib/reindex/s3_bucketd.py b/lib/reindex/s3_bucketd.py index c788a95f..ed313b8c 100644 --- a/lib/reindex/s3_bucketd.py +++ b/lib/reindex/s3_bucketd.py @@ -9,6 +9,7 @@ import sys import time import urllib +from pathlib import Path from collections import defaultdict, namedtuple from concurrent.futures import ThreadPoolExecutor @@ -32,12 +33,33 @@ def get_options(): parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name") parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server") parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers") - parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed") parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request") parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy") parser.add_argument("--debug", action='store_true', help="Enable debug logging") parser.add_argument("--dry-run", action="store_true", help="Do not update redis") - return parser.parse_args() + group = parser.add_mutually_exclusive_group() + group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket')) + group.add_argument("--bucket-file", default=None, help="file containing bucket names", type=existing_file) + + options = parser.parse_args() + if options.bucket_file: + with open(options.bucket_file) as f: + options.bucket = [line.strip() for line in f if line.strip()] + + return options + +def nonempty_string(flag): + def inner(value): + if not value.strip(): + raise argparse.ArgumentTypeError("%s: value must not be empty"%flag) + return value + return inner + +def existing_file(path): + path = Path(path).resolve() + if not path.exists(): + raise argparse.ArgumentTypeError("File does not exist") + return path def chunks(iterable, size): it = iter(iterable) @@ -177,7 +199,7 @@ def get_bucket_md(self, name): raise InvalidListing(name) return Bucket(canonId, name, md.get('objectLockEnabled', False)) - def list_buckets(self, name = None): + def list_buckets(self, names = None): def get_next_marker(p): if p is None: @@ -189,12 +211,15 @@ def get_next_marker(p): 'maxKeys': 1000, 'marker': get_next_marker } + + seen_buckets = set() + for _, payload in self._list_bucket(USERS_BUCKET, **params): buckets = [] for result in payload.get('Contents', []): match = re.match("(\w+)..\|..(\w+.*)", result['key']) bucket = Bucket(*match.groups(), False) - if name is None or bucket.name == name: + if names is None or bucket.name in names: # We need to get the attributes for each bucket to determine if it is locked if self._only_latest_when_locked: bucket_attrs = self._get_bucket_attributes(bucket.name) @@ -204,9 +229,11 @@ def get_next_marker(p): if buckets: yield buckets - if name is not None: - # Break on the first matching bucket if a name is given - break + if names: + seen_buckets.update(b.name for b in buckets) + # Break if we have seen all the buckets we are looking for + if all(b in seen_buckets for b in names): + break def list_mpus(self, bucket): @@ -339,6 +366,19 @@ def get_next_marker(p): total_size=total_size ) +def list_all_buckets(bucket_client): + return bucket_client.list_buckets() + +def list_specific_buckets(bucket_client, buckets): + batch = [] + for bucket in buckets: + try: + batch.append(bucket_client.get_bucket_md(bucket)) + except BucketNotFound: + _log.error('Failed to list bucket %s. Removing from results.'%bucket) + continue + + yield batch def index_bucket(client, bucket): ''' @@ -414,18 +454,22 @@ def log_report(resource, name, obj_count, total_size): if __name__ == '__main__': options = get_options() - if options.bucket is not None and not options.bucket.strip(): - print('You must provide a bucket name with the --bucket flag') - sys.exit(1) if options.debug: _log.setLevel(logging.DEBUG) + bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked) redis_client = get_redis_client(options) account_reports = {} observed_buckets = set() failed_accounts = set() + + if options.bucket: + batch_generator = list_specific_buckets(bucket_client, options.bucket) + else: + batch_generator = list_all_buckets(bucket_client) + with ThreadPoolExecutor(max_workers=options.worker) as executor: - for batch in bucket_client.list_buckets(options.bucket): + for batch in batch_generator: bucket_reports = {} jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch } for job in futures.as_completed(jobs.keys()): @@ -458,14 +502,12 @@ def log_report(resource, name, obj_count, total_size): log_report('buckets', bucket, report['obj_count'], report['total_size']) pipeline.execute() + stale_buckets = set() recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets')) - if options.bucket is None: + if not options.bucket: stale_buckets = recorded_buckets.difference(observed_buckets) - elif observed_buckets and options.bucket not in recorded_buckets: - # The provided bucket does not exist, so clean up any metrics - stale_buckets = { options.bucket } - else: - stale_buckets = set() + elif options.bucket: + stale_buckets = { b for b in options.bucket if b not in observed_buckets } _log.info('Found %s stale buckets' % len(stale_buckets)) if options.dry_run: @@ -479,7 +521,9 @@ def log_report(resource, name, obj_count, total_size): pipeline.execute() # Account metrics are not updated if a bucket is specified - if options.bucket is None: + if options.bucket: + _log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags') + else: # Don't update any accounts with failed listings without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items()) if options.dry_run: From 6806dc94fb26d9b116231cb178b1f206eb93fb18 Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Thu, 6 Jun 2024 12:23:18 -0700 Subject: [PATCH 6/6] impr(UTAPI-103): Support reindexing by acccount --- lib/reindex/s3_bucketd.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/lib/reindex/s3_bucketd.py b/lib/reindex/s3_bucketd.py index ed313b8c..9f26b47d 100644 --- a/lib/reindex/s3_bucketd.py +++ b/lib/reindex/s3_bucketd.py @@ -38,6 +38,8 @@ def get_options(): parser.add_argument("--debug", action='store_true', help="Enable debug logging") parser.add_argument("--dry-run", action="store_true", help="Do not update redis") group = parser.add_mutually_exclusive_group() + group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account')) + group.add_argument("--account-file", default=None, help="file containing account canonical IDs", type=existing_file) group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket')) group.add_argument("--bucket-file", default=None, help="file containing bucket names", type=existing_file) @@ -45,6 +47,9 @@ def get_options(): if options.bucket_file: with open(options.bucket_file) as f: options.bucket = [line.strip() for line in f if line.strip()] + elif options.account_file: + with open(options.account_file) as f: + options.account = [line.strip() for line in f if line.strip()] return options @@ -199,7 +204,7 @@ def get_bucket_md(self, name): raise InvalidListing(name) return Bucket(canonId, name, md.get('objectLockEnabled', False)) - def list_buckets(self, names = None): + def list_buckets(self, names = None, account = None): def get_next_marker(p): if p is None: @@ -212,6 +217,9 @@ def get_next_marker(p): 'marker': get_next_marker } + if account is not None: + params['prefix'] = '%s..|..' % account + seen_buckets = set() for _, payload in self._list_bucket(USERS_BUCKET, **params): @@ -369,6 +377,10 @@ def get_next_marker(p): def list_all_buckets(bucket_client): return bucket_client.list_buckets() +def list_specific_accounts(bucket_client, accounts): + for account in accounts: + yield from bucket_client.list_buckets(account=account) + def list_specific_buckets(bucket_client, buckets): batch = [] for bucket in buckets: @@ -463,7 +475,9 @@ def log_report(resource, name, obj_count, total_size): observed_buckets = set() failed_accounts = set() - if options.bucket: + if options.account: + batch_generator = list_specific_accounts(bucket_client, options.account) + elif options.bucket: batch_generator = list_specific_buckets(bucket_client, options.bucket) else: batch_generator = list_all_buckets(bucket_client) @@ -504,10 +518,12 @@ def log_report(resource, name, obj_count, total_size): stale_buckets = set() recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets')) - if not options.bucket: + if not options.bucket and not options.account: stale_buckets = recorded_buckets.difference(observed_buckets) elif options.bucket: stale_buckets = { b for b in options.bucket if b not in observed_buckets } + elif options.account: + _log.warning('Stale buckets will not be cleared when using the --account or --account-file flags') _log.info('Found %s stale buckets' % len(stale_buckets)) if options.dry_run: @@ -542,12 +558,21 @@ def log_report(resource, name, obj_count, total_size): log_report('accounts', userid, report['obj_count'], report['total_size']) pipeline.execute() + if options.account: + for account in options.account: + if account in failed_accounts: + _log.error("No metrics updated for %s, one or more buckets failed" % account) + # Include failed_accounts in observed_accounts to avoid clearing metrics observed_accounts = failed_accounts.union(set(account_reports.keys())) recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts')) - # Stale accounts and buckets are ones that do not appear in the listing, but have recorded values - stale_accounts = recorded_accounts.difference(observed_accounts) + if options.account: + stale_accounts = { a for a in options.account if a not in observed_accounts } + else: + # Stale accounts and buckets are ones that do not appear in the listing, but have recorded values + stale_accounts = recorded_accounts.difference(observed_accounts) + _log.info('Found %s stale accounts' % len(stale_accounts)) if options.dry_run: _log.warning("DryRun: not updating stale accounts")