diff --git a/lib/rucio/api/request.py b/lib/rucio/api/request.py index c4eaf57ec51..0286f348436 100644 --- a/lib/rucio/api/request.py +++ b/lib/rucio/api/request.py @@ -219,19 +219,16 @@ def list_requests_history(src_rses, dst_rses, states, issuer, vo='def', offset=N yield api_update_return_dict(req, session=session) -@stream_session -def get_request_stats(state, issuer, vo='def', *, session: "Session"): +@read_session +def get_request_metrics(issuer, vo='def', *, session: "Session"): """ Get statistics of requests in a specific state grouped by source RSE, destination RSE, and activity. - :param state: request state. :param issuer: Issuing account as a string. :param session: The database session in use. """ kwargs = {'issuer': issuer} - if not permission.has_permission(issuer=issuer, vo=vo, action='get_request_stats', kwargs=kwargs, session=session): + if not permission.has_permission(issuer=issuer, vo=vo, action='get_request_metrics', kwargs=kwargs, session=session): raise exception.AccessDenied(f'{issuer} cannot get request statistics') - for req in request.get_request_stats(state, session=session): - req = req.to_dict() - yield api_update_return_dict(req, session=session) + return request.get_request_metrics(session=session) diff --git a/lib/rucio/core/permission/atlas.py b/lib/rucio/core/permission/atlas.py index 9c96ef1a112..4acf2bcae62 100644 --- a/lib/rucio/core/permission/atlas.py +++ b/lib/rucio/core/permission/atlas.py @@ -87,6 +87,7 @@ def has_permission(issuer, action, kwargs, *, session: "Optional[Session]" = Non 'set_rse_usage': perm_set_rse_usage, 'set_rse_limits': perm_set_rse_limits, 'get_request_by_did': perm_get_request_by_did, + 'get_request_metrics': perm_get_request_metrics, 'cancel_request': perm_cancel_request, 'get_next': perm_get_next, 'set_local_account_limit': perm_set_local_account_limit, @@ -940,6 +941,18 @@ def perm_get_request_by_did(issuer, kwargs, *, session: "Optional[Session]" = No return True +def perm_get_request_metrics(issuer, kwargs, *, session: "Optional[Session]" = None): + """ + Checks if an account can get the request stats + + :param issuer: Account identifier which issues the command. + :param kwargs: List of arguments for the action. + :param session: The DB session to use + :returns: True if account is allowed, otherwise False + """ + return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) + + def perm_cancel_request(issuer, kwargs, *, session: "Optional[Session]" = None): """ Checks if an account can cancel a request. diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index 3f740bd93fa..9663da4fa3e 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -14,6 +14,7 @@ # limitations under the License. import datetime +import itertools import json import logging import math @@ -35,9 +36,10 @@ from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, InvalidRSEExpression from rucio.common.types import InternalAccount, InternalScope from rucio.common.utils import generate_uuid, chunks +from rucio.core.distance import get_distances from rucio.core.message import add_message, add_messages from rucio.core.monitor import MetricManager -from rucio.core.rse import get_rse_attribute, get_rse_name, get_rse_vo, RseData +from rucio.core.rse import get_rse_attribute, get_rse_name, get_rse_vo, RseData, RseCollection from rucio.core.rse_expression_parser import parse_expression from rucio.db.sqla import models, filter_thread_work from rucio.db.sqla.constants import RequestState, RequestType, LockState, RequestErrMsg, ReplicaState, TransferLimitDirection @@ -47,7 +49,6 @@ RequestAndState = namedtuple('RequestAndState', ['request_id', 'request_state']) if TYPE_CHECKING: - from rucio.core.rse import RseCollection from sqlalchemy.orm import Session @@ -1337,21 +1338,29 @@ def __init__(self): self._rollover_samples(rollover_time=datetime.datetime.utcnow()) self.record_stats = True - self.timer = None + self.save_timer = None + self.downsample_timer = None self.downsample_period = math.ceil(raw_retention.total_seconds()) self.next_downsample = None def __enter__(self): self.record_stats = config_get_bool('transfers', 'stats_enabled', default=self.record_stats) self.downsample_period = config_get_int('transfers', 'stats_downsample_period', default=self.downsample_period) + # Introduce some voluntary jitter to reduce the likely-hood of performing this database + # operation multiple times in parallel. + self.downsample_period = random.randint(self.downsample_period, 2 * self.downsample_period) if self.record_stats: - self.timer = threading.Timer(self.raw_resolution.total_seconds(), self.save) - self.timer.start() + self.save_timer = threading.Timer(self.raw_resolution.total_seconds(), self.save) + self.save_timer.start() + self.downsample_timer = threading.Timer(self.downsample_period, self.periodic_downsample_and_cleanup) + self.downsample_timer.start() return self def __exit__(self, exc_type, exc_val, exc_tb): - if self.timer is not None: - self.timer.cancel() + if self.save_timer is not None: + self.save_timer.cancel() + if self.downsample_timer is not None: + self.downsample_timer.cancel() if self.record_stats: self.force_save() @@ -1373,7 +1382,9 @@ def observe( return now = datetime.datetime.utcnow() with self.lock: - save_timestamp, save_samples = self._rollover_samples(now) + save_timestamp, save_samples = now, {} + if now >= self.current_timestamp + self.raw_resolution: + save_timestamp, save_samples = self._rollover_samples(now) if state in (RequestState.DONE, RequestState.FAILED): record = self.current_samples[dst_rse_id, src_rse_id, activity] @@ -1393,7 +1404,9 @@ def save(self, *, session: "Session") -> None: """ now = datetime.datetime.utcnow() with self.lock: - save_timestamp, save_samples = self._rollover_samples(now) + save_timestamp, save_samples = now, {} + if now >= self.current_timestamp + self.raw_resolution: + save_timestamp, save_samples = self._rollover_samples(now) if save_samples: self._save_samples(timestamp=save_timestamp, samples=save_samples, session=session) @@ -1413,11 +1426,9 @@ def force_save(self, *, session: "Session") -> None: self._save_samples(timestamp=save_timestamp, samples=save_samples, session=session) def _rollover_samples(self, rollover_time: datetime.datetime) -> "tuple[datetime.datetime, Mapping[tuple[str, str, str], TransferStatsManager._StatsRecord]]": - previous_samples = (rollover_time, {}) - if rollover_time >= self.current_timestamp + self.raw_resolution: - previous_samples = (self.current_timestamp, self.current_samples) - self.current_samples = defaultdict(lambda: self._StatsRecord()) - _, self.current_timestamp = next(self.slice_time(self.raw_resolution, start_time=rollover_time + self.raw_resolution)) + previous_samples = (self.current_timestamp, self.current_samples) + self.current_samples = defaultdict(lambda: self._StatsRecord()) + _, self.current_timestamp = next(self.slice_time(self.raw_resolution, start_time=rollover_time + self.raw_resolution)) return previous_samples @transactional_session @@ -1450,11 +1461,9 @@ def _save_samples( def periodic_downsample_and_cleanup(self, *, session: "Session") -> None: """ Periodically create lower resolution samples from higher resolution ones. - Introduce some voluntary jitter to reduce the likely-hood of performing this database - operation multiple times in parallel. """ now = datetime.datetime.utcnow() - timestamp = now + datetime.timedelta(seconds=random.randint(self.downsample_period, 2 * self.downsample_period)) + timestamp = now + datetime.timedelta(seconds=self.downsample_period) if not self.next_downsample: # Wait for a whole period before first downsample operation. Otherwise we'll perform it in each daemon on startup. self.next_downsample = timestamp @@ -1464,14 +1473,16 @@ def periodic_downsample_and_cleanup(self, *, session: "Session") -> None: return self.next_downsample = timestamp - self.downsample_and_cleanup(session=session) + while self.downsample_and_cleanup(session=session): + continue @transactional_session - def downsample_and_cleanup(self, *, session: "Session") -> None: + def downsample_and_cleanup(self, *, session: "Session") -> bool: """ Housekeeping of samples in the database: - create lower-resolution (but higher-retention) samples from higher-resolution ones; - delete the samples which are older than the desired retention time. + Return True if it thinks there is still more cleanup. This function handles safely to be executed in parallel from multiple daemons at the same time. However, this is achieved at the cost of introducing duplicate samples at lower @@ -1492,6 +1503,8 @@ def downsample_and_cleanup(self, *, session: "Session") -> None: for res, newest_t, oldest_t in session.execute(stmt) } + more_to_delete = False + id_temp_table = temp_table_mngr(session).create_id_table() for i in range(1, len(self.retentions)): src_resolution, desired_src_retention = self.retentions[i - 1] dst_resolution, desired_dst_retention = self.retentions[i] @@ -1534,18 +1547,33 @@ def downsample_and_cleanup(self, *, session: "Session") -> None: db_time_ranges[dst_resolution] = (newest_available_dst_timestamp, oldest_available_dst_timestamp) # Delete from the database the samples which are older than desired - self._cleanup(resolution=src_resolution, timestamp=oldest_desired_src_timestamp, session=session) + more_to_delete |= self._cleanup( + id_temp_table=id_temp_table, + resolution=src_resolution, + timestamp=oldest_desired_src_timestamp, + session=session + ) # Cleanup samples at the lowest resolution, which were not handled by the previous loop last_resolution, last_retention = self.retentions[-1] _, oldest_desired_timestamp = next(self.slice_time(last_resolution, start_time=now - last_retention)) if db_time_ranges.get(last_resolution, (now, now))[1] < oldest_desired_timestamp: - self._cleanup(resolution=last_resolution, timestamp=oldest_desired_timestamp, session=session) + more_to_delete |= self._cleanup( + id_temp_table=id_temp_table, + resolution=last_resolution, + timestamp=oldest_desired_timestamp, + session=session + ) # Cleanup all resolutions which exist in the database but are not desired by rucio anymore # (probably due to configuration changes). for resolution_to_cleanup in set(db_time_ranges).difference(r[0] for r in self.retentions): - self._cleanup(resolution=resolution_to_cleanup, timestamp=now, session=session) + more_to_delete |= self._cleanup( + id_temp_table=id_temp_table, + resolution=resolution_to_cleanup, + timestamp=now, + session=session + ) @stream_session def load_totals( @@ -1553,9 +1581,12 @@ def load_totals( resolution: "datetime.timedelta", recent_t: "datetime.datetime", older_t: "datetime.datetime", + dest_rse_id: "Optional[str]" = None, + src_rse_id: "Optional[str]" = None, + activity: "Optional[str]" = None, *, session: "Session" - ) -> Iterator[Mapping[str, str]]: + ) -> "Iterator[Mapping[str, str | int]]": """ Load aggregated totals for the given resolution and time interval. @@ -1595,7 +1626,21 @@ def load_totals( models.TransferStats.resolution == resolution.total_seconds(), models.TransferStats.timestamp >= older_t, models.TransferStats.timestamp < recent_t - ).subquery() + ) + if dest_rse_id: + sub_query = sub_query.where( + models.Request.dest_rse_id == dest_rse_id + ) + if src_rse_id: + sub_query = sub_query.where( + models.Request.source_rse_id == src_rse_id + ) + if activity: + sub_query = sub_query.where( + models.Request.dest_rse_id == activity + ) + + sub_query = sub_query.subquery() stmt = select( sub_query.c.src_rse_id, @@ -1615,12 +1660,13 @@ def load_totals( @staticmethod def _cleanup( + id_temp_table, resolution: "datetime.timedelta", timestamp: "datetime.datetime", - limit: "Optional[int]" = None, + limit: "Optional[int]" = 10000, *, session: "Session" - ) -> None: + ) -> bool: """ Delete, from the database, the stats older than the given time. Skip locked rows, to tolerate parallel executions by multiple daemons. @@ -1648,13 +1694,19 @@ def _cleanup( else: stmt = stmt.with_for_update(skip_locked=True) - for stats in session.execute(stmt).scalars().partitions(10): - stmt = delete( - models.TransferStats - ).where( - models.TransferStats.id.in_(stats) - ) - session.execute(stmt) + session.execute(delete(id_temp_table)) + session.execute(insert(id_temp_table).from_select(['id'], stmt)) + + stmt = delete( + models.TransferStats + ).where( + exists(select(1).where(models.TransferStats.id == id_temp_table.id)) + ).execution_options( + synchronize_session=False + ) + res = session.execute(stmt) + return res.rowcount > 0 + @staticmethod def slice_time( @@ -1682,13 +1734,101 @@ def slice_time( @read_session -def get_request_stats(state, *, session: "Session"): +def get_request_metrics( + dest_rse_id: "Optional[str]" = None, + src_rse_id: "Optional[str]" = None, + activity: "Optional[str]" = None, + *, + session: "Session" +): + metrics = {} + now = datetime.datetime.utcnow() + now_str = now.strftime('%Y-%m-%dT%H:%M:%S') + + # Add the current queues + db_stats = get_request_stats( + state=[ + RequestState.QUEUED, + ], + src_rse_id=src_rse_id, + dest_rse_id=dest_rse_id, + activity=activity, + session=session, + ) + for stat in db_stats: + if not stat.source_rse_id: + continue + + resp_elem = metrics.setdefault((stat.source_rse_id, stat.dest_rse_id), {}) + + bytes_elem = resp_elem.setdefault('bytes', {}) + bytes_elem.setdefault('queued', {})[stat.activity] = {'latest': stat.bytes, 'timestamp': now_str} + bytes_elem['queued-total'] = bytes_elem.get('queued-total', 0) + stat.bytes + + files_elem = resp_elem.setdefault('files', {}) + files_elem.setdefault('queued', {})[stat.activity] = {'latest': stat.counter, 'timestamp': now_str} + files_elem['queued-total'] = files_elem.get('queued-total', 0) + stat.counter + + # Add the historical data + for duration, duration_label, src_resolution in ( + (datetime.timedelta(hours=1), '1h', datetime.timedelta(minutes=5)), + (datetime.timedelta(hours=6), '6h', datetime.timedelta(hours=1)) + ): + db_stats = TransferStatsManager().load_totals( + resolution=src_resolution, + recent_t=now, + older_t=now - duration, + dest_rse_id=dest_rse_id, + src_rse_id=src_rse_id, + activity=activity, + session=session, + ) + + for stat in db_stats: + resp_elem = metrics.setdefault((stat['src_rse_id'], stat['dest_rse_id']), {}) + + bytes_elem = resp_elem.setdefault('bytes', {}) + bytes_elem.setdefault('done', {})[stat['activity']] = {'latest': stat['bytes_done'], 'timestamp': now_str} + bytes_elem[f'done-total-{duration_label}'] = bytes_elem.get(f'done-total-{duration_label}', 0) + stat['bytes_done'] + + files_elem = resp_elem.setdefault('files', {}) + files_elem.setdefault('done', {})[stat['activity']] = {'latest': stat['files_done'], 'timestamp': now_str} + files_elem.setdefault('failed', {})[stat['activity']] = {'latest': stat['files_failed'], 'timestamp': now_str} + files_elem[f'done-total-{duration_label}'] = files_elem.get(f'done-total-{duration_label}', 0) + stat['files_done'] + files_elem[f'failed-total-{duration_label}'] = files_elem.get(f'failed-total-{duration_label}', 0) + stat['files_failed'] + + # Add distances + for distance in get_distances(dest_rse_id=dest_rse_id, src_rse_id=src_rse_id): + resp_elem = metrics.setdefault((distance['src_rse_id'], distance['dest_rse_id']), {}) + + resp_elem['distance'] = {'latest': distance['distance'], 'timestamp': now_str} + + # Fill RSE names + rses = RseCollection(rse_ids=itertools.chain.from_iterable(metrics)) + rses.ensure_loaded(load_name=True) + response = {} + for (src_id, dst_id), metric in metrics.items(): + src_rse = rses[src_id] + dst_rse = rses[dst_id] + metric['src'] = src_rse.name + metric['dst'] = dst_rse.name + + response[f'{src_rse.name}:{dst_rse.name}'] = metric + + return response + + +@read_session +def get_request_stats( + state: "RequestState | list[RequestState]", + dest_rse_id: "Optional[str]" = None, + src_rse_id: "Optional[str]" = None, + activity: "Optional[str]" = None, + *, + session: "Session" +): """ Retrieve statistics about requests by destination, activity and state. - - :param state: Request state. - :param session: Database session to use. - :returns: List of (activity, dest_rse_id, state, counter). """ if type(state) is not list: @@ -1715,6 +1855,18 @@ def get_request_stats(state, *, session: "Session"): models.Request.source_rse_id, models.Request.activity, ) + if src_rse_id: + stmt = stmt.where( + models.Request.source_rse_id == src_rse_id + ) + if dest_rse_id: + stmt = stmt.where( + models.Request.dest_rse_id == dest_rse_id + ) + if activity: + stmt = stmt.where( + models.Request.dest_rse_id == activity + ) return session.execute(stmt).all() diff --git a/lib/rucio/web/rest/flaskapi/v1/requests.py b/lib/rucio/web/rest/flaskapi/v1/requests.py index a5f77d4d153..7b7b5fbdc76 100644 --- a/lib/rucio/web/rest/flaskapi/v1/requests.py +++ b/lib/rucio/web/rest/flaskapi/v1/requests.py @@ -818,7 +818,7 @@ def generate(issuer, vo): return try_stream(generate(issuer=flask.request.environ.get('issuer'), vo=flask.request.environ.get('vo'))) -class RequestStatsGet(ErrorHandlingMethodView): +class RequestMetricsGet(ErrorHandlingMethodView): """ REST API to get request stats. """ @check_accept_header_wrapper_flask(['application/x-json-stream']) @@ -830,9 +830,19 @@ def get(self): tags: - Requests parameters: - - name: state + - name: dest_rse in: query - description: The accepted request state. + description: The destination RSE name + schema: + type: string + - name: source_rse + in: query + description: The source RSE name + schema: + type: string + - name: activity + in: query + description: The activity schema: type: string responses: @@ -882,21 +892,12 @@ def get(self): 406: description: Not acceptable """ - state = flask.request.args.get('state', default=None) - - if not state: - return generate_http_error_flask(400, 'MissingParameter', 'Request state is missing') + dst_rse = flask.request.args.get('dest_rse', default=None) + src_rse = flask.request.args.get('source_rse', default=None) + activity = flask.request.args.get('activity', default=None) - try: - request_state = RequestState(state) - except ValueError: - return generate_http_error_flask(400, 'Invalid', 'Request state value is invalid') - - def generate(issuer, vo): - for result in request.get_request_stats(request_state, issuer=issuer, vo=vo): - yield render_json(**result) + '\n' - - return try_stream(generate(issuer=flask.request.environ.get('issuer'), vo=flask.request.environ.get('vo'))) + metrics = request.get_request_metrics(issuer=flask.request.environ.get('issuer'), vo=flask.request.environ.get('vo')) + return Response(json.dumps(metrics, cls=APIEncoder), content_type='application/json') def blueprint(): @@ -910,7 +911,7 @@ def blueprint(): bp.add_url_rule('/list', view_func=request_list_view, methods=['get', ]) request_history_list_view = RequestHistoryList.as_view('request_history_list') bp.add_url_rule('/history/list', view_func=request_history_list_view, methods=['get', ]) - request_stats_view = RequestStatsGet.as_view('request_stats') + request_stats_view = RequestMetricsGet.as_view('request_metrics_get') bp.add_url_rule('/stats', view_func=request_stats_view, methods=['get', ]) bp.after_request(response_headers) diff --git a/tests/test_request.py b/tests/test_request.py index 7f811a6b0fd..7f01b445b0e 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json from datetime import datetime from typing import Union @@ -20,8 +21,9 @@ from rucio.common.config import config_get_bool from rucio.common.utils import generate_uuid, parse_response +from rucio.core.distance import add_distance from rucio.core.replica import add_replica -from rucio.core.request import queue_requests, get_request_by_did, list_requests, list_requests_history, set_transfer_limit +from rucio.core.request import queue_requests, get_request_by_did, list_requests, list_requests_history, set_transfer_limit, TransferStatsManager from rucio.core.rse import add_rse_attribute from rucio.db.sqla import models, constants from rucio.db.sqla.constants import RequestType, RequestState @@ -286,3 +288,80 @@ def check_error_api(params, exception_class, exception_message, code): params = {'request_states': 'S', 'src_site': source_site, 'dst_site': 'unknown'} check_error_api(params, 'NotFound', 'Could not resolve site name unknown to RSE', 404) + + +@pytest.mark.parametrize("file_config_mock", [{"overrides": [ + ('transfers', 'stats_enabled', 'True'), +]}], indirect=True) +def test_api_metrics(vo, rest_client, auth_token, rse_factory, did_factory, root_account, file_config_mock): + + src_rse, src_rse_id = rse_factory.make_mock_rse() + dst_rse, dst_rse_id = rse_factory.make_mock_rse() + add_distance(src_rse_id, dst_rse_id, distance=10) + + replica_bytes = 20 + + did1 = did_factory.random_file_did() + activity1 = 'User Subscription' + add_replica(rse_id=src_rse_id, bytes_=replica_bytes, adler32='beefdead', account=root_account, **did1) + + did2 = did_factory.random_file_did() + activity2 = 'Test' + add_replica(rse_id=src_rse_id, bytes_=replica_bytes, adler32='beefdead', account=root_account, **did2) + + requests = [ + { + 'dest_rse_id': dst_rse_id, + 'source_rse_id': src_rse_id, + 'request_type': RequestType.TRANSFER, + 'request_id': generate_uuid(), + 'name': did1['name'], + 'scope': did1['scope'], + 'rule_id': generate_uuid(), + 'retry_count': 1, + 'attributes': { + 'activity': activity, + 'bytes': replica_bytes, + 'md5': '', + 'adler32': '' + } + } + for did, activity in ((did1, activity1), (did2, activity2)) + ] + queue_requests(requests) + + stats_manager = TransferStatsManager() + stats_manager.observe( + src_rse_id=src_rse_id, + dst_rse_id=dst_rse_id, + activity=activity1, + state=RequestState.DONE, + file_size=367, + ) + stats_manager.observe( + src_rse_id=src_rse_id, + dst_rse_id=dst_rse_id, + activity=activity2, + state=RequestState.FAILED, + file_size=1020, + ) + stats_manager.force_save() + stats_manager.downsample_and_cleanup() + + api_endpoint = '/requests/stats' + params = {'state': 'Q', 'dst_rse': dst_rse, 'src_rse': src_rse} + headers_dict = {'X-Rucio-Type': 'user', 'X-Rucio-Account': root_account.external} + response = rest_client.get(api_endpoint, query_string=params, headers=headers(auth(auth_token), vohdr(vo), hdrdict(headers_dict))) + response = json.loads(response.get_data(as_text=True)) + metric = response.get(f'{src_rse}:{dst_rse}') + assert metric is not None + assert metric['distance']['latest'] == 10 + assert metric['bytes']['queued'][activity1]['latest'] == replica_bytes + assert metric['bytes']['queued'][activity2]['latest'] == replica_bytes + assert metric['bytes']['queued-total'] == 2 * replica_bytes + assert metric['files']['queued'][activity1]['latest'] == 1 + assert metric['files']['queued'][activity2]['latest'] == 1 + assert metric['files']['queued-total'] == 2 + assert metric['files']['done'][activity1]['latest'] == 1 + assert metric['bytes']['done'][activity1]['latest'] == 367 + assert metric['files']['failed'][activity2]['latest'] == 1