From 2d01d1b867cc4172c90c93d8f6a1256b02731ab9 Mon Sep 17 00:00:00 2001 From: Radu Carpa Date: Tue, 21 Nov 2023 12:43:53 +0100 Subject: [PATCH] wip --- lib/rucio/api/request.py | 22 +- lib/rucio/core/permission/atlas.py | 13 ++ lib/rucio/core/request.py | 234 +++++++++++++++++---- lib/rucio/core/rse.py | 22 +- lib/rucio/web/rest/flaskapi/v1/requests.py | 45 ++-- tests/test_request.py | 81 ++++++- 6 files changed, 346 insertions(+), 71 deletions(-) diff --git a/lib/rucio/api/request.py b/lib/rucio/api/request.py index c4eaf57ec51..98a229dfd3b 100644 --- a/lib/rucio/api/request.py +++ b/lib/rucio/api/request.py @@ -17,7 +17,7 @@ Interface for the requests abstraction layer """ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from rucio.api import permission from rucio.common import exception @@ -219,19 +219,25 @@ def list_requests_history(src_rses, dst_rses, states, issuer, vo='def', offset=N yield api_update_return_dict(req, session=session) -@stream_session -def get_request_stats(state, issuer, vo='def', *, session: "Session"): +@read_session +def get_request_metrics(src_rse: Optional[str], dst_rse: Optional[str], activity: Optional[str], issuer, vo='def', *, session: "Session"): """ Get statistics of requests in a specific state grouped by source RSE, destination RSE, and activity. - :param state: request state. + :param src_rse: source RSE. + :param dst_rse: destination RSE. + :param activity: activity :param issuer: Issuing account as a string. :param session: The database session in use. """ + src_rse_id = None + if src_rse: + src_rse_id = get_rse_id(rse=src_rse, vo=vo, session=session) + dst_rse_id = None + if dst_rse: + dst_rse_id = get_rse_id(rse=dst_rse, vo=vo, session=session) kwargs = {'issuer': issuer} - if not permission.has_permission(issuer=issuer, vo=vo, action='get_request_stats', kwargs=kwargs, session=session): + if not permission.has_permission(issuer=issuer, vo=vo, action='get_request_metrics', kwargs=kwargs, session=session): raise exception.AccessDenied(f'{issuer} cannot get request statistics') - for req in request.get_request_stats(state, session=session): - req = req.to_dict() - yield api_update_return_dict(req, session=session) + return request.get_request_metrics(dest_rse_id=dst_rse_id, src_rse_id=src_rse_id, activity=activity, session=session) diff --git a/lib/rucio/core/permission/atlas.py b/lib/rucio/core/permission/atlas.py index 9c96ef1a112..4acf2bcae62 100644 --- a/lib/rucio/core/permission/atlas.py +++ b/lib/rucio/core/permission/atlas.py @@ -87,6 +87,7 @@ def has_permission(issuer, action, kwargs, *, session: "Optional[Session]" = Non 'set_rse_usage': perm_set_rse_usage, 'set_rse_limits': perm_set_rse_limits, 'get_request_by_did': perm_get_request_by_did, + 'get_request_metrics': perm_get_request_metrics, 'cancel_request': perm_cancel_request, 'get_next': perm_get_next, 'set_local_account_limit': perm_set_local_account_limit, @@ -940,6 +941,18 @@ def perm_get_request_by_did(issuer, kwargs, *, session: "Optional[Session]" = No return True +def perm_get_request_metrics(issuer, kwargs, *, session: "Optional[Session]" = None): + """ + Checks if an account can get the request stats + + :param issuer: Account identifier which issues the command. + :param kwargs: List of arguments for the action. + :param session: The DB session to use + :returns: True if account is allowed, otherwise False + """ + return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session) + + def perm_cancel_request(issuer, kwargs, *, session: "Optional[Session]" = None): """ Checks if an account can cancel a request. diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index 3f740bd93fa..0cf24728dbb 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -14,6 +14,7 @@ # limitations under the License. import datetime +import itertools import json import logging import math @@ -35,9 +36,10 @@ from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, InvalidRSEExpression from rucio.common.types import InternalAccount, InternalScope from rucio.common.utils import generate_uuid, chunks +from rucio.core.distance import get_distances from rucio.core.message import add_message, add_messages from rucio.core.monitor import MetricManager -from rucio.core.rse import get_rse_attribute, get_rse_name, get_rse_vo, RseData +from rucio.core.rse import get_rse_attribute, get_rse_name, get_rse_vo, RseData, RseCollection from rucio.core.rse_expression_parser import parse_expression from rucio.db.sqla import models, filter_thread_work from rucio.db.sqla.constants import RequestState, RequestType, LockState, RequestErrMsg, ReplicaState, TransferLimitDirection @@ -47,7 +49,6 @@ RequestAndState = namedtuple('RequestAndState', ['request_id', 'request_state']) if TYPE_CHECKING: - from rucio.core.rse import RseCollection from sqlalchemy.orm import Session @@ -1337,21 +1338,29 @@ def __init__(self): self._rollover_samples(rollover_time=datetime.datetime.utcnow()) self.record_stats = True - self.timer = None + self.save_timer = None + self.downsample_timer = None self.downsample_period = math.ceil(raw_retention.total_seconds()) self.next_downsample = None def __enter__(self): self.record_stats = config_get_bool('transfers', 'stats_enabled', default=self.record_stats) self.downsample_period = config_get_int('transfers', 'stats_downsample_period', default=self.downsample_period) + # Introduce some voluntary jitter to reduce the likely-hood of performing this database + # operation multiple times in parallel. + self.downsample_period = random.randint(self.downsample_period, 2 * self.downsample_period) if self.record_stats: - self.timer = threading.Timer(self.raw_resolution.total_seconds(), self.save) - self.timer.start() + self.save_timer = threading.Timer(self.raw_resolution.total_seconds(), self.save) + self.save_timer.start() + self.downsample_timer = threading.Timer(self.downsample_period, self.periodic_downsample_and_cleanup) + self.downsample_timer.start() return self def __exit__(self, exc_type, exc_val, exc_tb): - if self.timer is not None: - self.timer.cancel() + if self.save_timer is not None: + self.save_timer.cancel() + if self.downsample_timer is not None: + self.downsample_timer.cancel() if self.record_stats: self.force_save() @@ -1373,7 +1382,9 @@ def observe( return now = datetime.datetime.utcnow() with self.lock: - save_timestamp, save_samples = self._rollover_samples(now) + save_timestamp, save_samples = now, {} + if now >= self.current_timestamp + self.raw_resolution: + save_timestamp, save_samples = self._rollover_samples(now) if state in (RequestState.DONE, RequestState.FAILED): record = self.current_samples[dst_rse_id, src_rse_id, activity] @@ -1393,7 +1404,9 @@ def save(self, *, session: "Session") -> None: """ now = datetime.datetime.utcnow() with self.lock: - save_timestamp, save_samples = self._rollover_samples(now) + save_timestamp, save_samples = now, {} + if now >= self.current_timestamp + self.raw_resolution: + save_timestamp, save_samples = self._rollover_samples(now) if save_samples: self._save_samples(timestamp=save_timestamp, samples=save_samples, session=session) @@ -1413,11 +1426,9 @@ def force_save(self, *, session: "Session") -> None: self._save_samples(timestamp=save_timestamp, samples=save_samples, session=session) def _rollover_samples(self, rollover_time: datetime.datetime) -> "tuple[datetime.datetime, Mapping[tuple[str, str, str], TransferStatsManager._StatsRecord]]": - previous_samples = (rollover_time, {}) - if rollover_time >= self.current_timestamp + self.raw_resolution: - previous_samples = (self.current_timestamp, self.current_samples) - self.current_samples = defaultdict(lambda: self._StatsRecord()) - _, self.current_timestamp = next(self.slice_time(self.raw_resolution, start_time=rollover_time + self.raw_resolution)) + previous_samples = (self.current_timestamp, self.current_samples) + self.current_samples = defaultdict(lambda: self._StatsRecord()) + _, self.current_timestamp = next(self.slice_time(self.raw_resolution, start_time=rollover_time + self.raw_resolution)) return previous_samples @transactional_session @@ -1450,11 +1461,9 @@ def _save_samples( def periodic_downsample_and_cleanup(self, *, session: "Session") -> None: """ Periodically create lower resolution samples from higher resolution ones. - Introduce some voluntary jitter to reduce the likely-hood of performing this database - operation multiple times in parallel. """ now = datetime.datetime.utcnow() - timestamp = now + datetime.timedelta(seconds=random.randint(self.downsample_period, 2 * self.downsample_period)) + timestamp = now + datetime.timedelta(seconds=self.downsample_period) if not self.next_downsample: # Wait for a whole period before first downsample operation. Otherwise we'll perform it in each daemon on startup. self.next_downsample = timestamp @@ -1464,21 +1473,25 @@ def periodic_downsample_and_cleanup(self, *, session: "Session") -> None: return self.next_downsample = timestamp - self.downsample_and_cleanup(session=session) + while self.downsample_and_cleanup(session=session): + continue @transactional_session - def downsample_and_cleanup(self, *, session: "Session") -> None: + def downsample_and_cleanup(self, *, session: "Session") -> bool: """ Housekeeping of samples in the database: - create lower-resolution (but higher-retention) samples from higher-resolution ones; - delete the samples which are older than the desired retention time. + Return True if it thinks there is still more cleanup. This function handles safely to be executed in parallel from multiple daemons at the same time. However, this is achieved at the cost of introducing duplicate samples at lower resolution into the database. The possibility of having duplicates at lower resolutions must be considered during work with those sample. Code must tolerate duplicates and avoid double-counting. """ - now = datetime.datetime.utcnow() + + # Delay processing to leave time for all raw metrics to be correctly saved to the database + now = datetime.datetime.utcnow() - 4 * self.raw_resolution stmt = select( models.TransferStats.resolution, @@ -1492,6 +1505,8 @@ def downsample_and_cleanup(self, *, session: "Session") -> None: for res, newest_t, oldest_t in session.execute(stmt) } + more_to_delete = False + id_temp_table = temp_table_mngr(session).create_id_table() for i in range(1, len(self.retentions)): src_resolution, desired_src_retention = self.retentions[i - 1] dst_resolution, desired_dst_retention = self.retentions[i] @@ -1534,18 +1549,34 @@ def downsample_and_cleanup(self, *, session: "Session") -> None: db_time_ranges[dst_resolution] = (newest_available_dst_timestamp, oldest_available_dst_timestamp) # Delete from the database the samples which are older than desired - self._cleanup(resolution=src_resolution, timestamp=oldest_desired_src_timestamp, session=session) + more_to_delete |= self._cleanup( + id_temp_table=id_temp_table, + resolution=src_resolution, + timestamp=oldest_desired_src_timestamp, + session=session + ) # Cleanup samples at the lowest resolution, which were not handled by the previous loop last_resolution, last_retention = self.retentions[-1] _, oldest_desired_timestamp = next(self.slice_time(last_resolution, start_time=now - last_retention)) if db_time_ranges.get(last_resolution, (now, now))[1] < oldest_desired_timestamp: - self._cleanup(resolution=last_resolution, timestamp=oldest_desired_timestamp, session=session) + more_to_delete |= self._cleanup( + id_temp_table=id_temp_table, + resolution=last_resolution, + timestamp=oldest_desired_timestamp, + session=session + ) # Cleanup all resolutions which exist in the database but are not desired by rucio anymore # (probably due to configuration changes). for resolution_to_cleanup in set(db_time_ranges).difference(r[0] for r in self.retentions): - self._cleanup(resolution=resolution_to_cleanup, timestamp=now, session=session) + more_to_delete |= self._cleanup( + id_temp_table=id_temp_table, + resolution=resolution_to_cleanup, + timestamp=now, + session=session + ) + return more_to_delete @stream_session def load_totals( @@ -1553,9 +1584,12 @@ def load_totals( resolution: "datetime.timedelta", recent_t: "datetime.datetime", older_t: "datetime.datetime", + dest_rse_id: "Optional[str]" = None, + src_rse_id: "Optional[str]" = None, + activity: "Optional[str]" = None, *, session: "Session" - ) -> Iterator[Mapping[str, str]]: + ) -> "Iterator[Mapping[str, str | int]]": """ Load aggregated totals for the given resolution and time interval. @@ -1595,7 +1629,21 @@ def load_totals( models.TransferStats.resolution == resolution.total_seconds(), models.TransferStats.timestamp >= older_t, models.TransferStats.timestamp < recent_t - ).subquery() + ) + if dest_rse_id: + sub_query = sub_query.where( + models.TransferStats.dest_rse_id == dest_rse_id + ) + if src_rse_id: + sub_query = sub_query.where( + models.TransferStats.src_rse_id == src_rse_id + ) + if activity: + sub_query = sub_query.where( + models.TransferStats.activity == activity + ) + + sub_query = sub_query.subquery() stmt = select( sub_query.c.src_rse_id, @@ -1615,12 +1663,13 @@ def load_totals( @staticmethod def _cleanup( + id_temp_table, resolution: "datetime.timedelta", timestamp: "datetime.datetime", - limit: "Optional[int]" = None, + limit: "Optional[int]" = 10000, *, session: "Session" - ) -> None: + ) -> bool: """ Delete, from the database, the stats older than the given time. Skip locked rows, to tolerate parallel executions by multiple daemons. @@ -1648,13 +1697,18 @@ def _cleanup( else: stmt = stmt.with_for_update(skip_locked=True) - for stats in session.execute(stmt).scalars().partitions(10): - stmt = delete( - models.TransferStats - ).where( - models.TransferStats.id.in_(stats) - ) - session.execute(stmt) + session.execute(delete(id_temp_table)) + session.execute(insert(id_temp_table).from_select(['id'], stmt)) + + stmt = delete( + models.TransferStats + ).where( + exists(select(1).where(models.TransferStats.id == id_temp_table.id)) + ).execution_options( + synchronize_session=False + ) + res = session.execute(stmt) + return res.rowcount > 0 @staticmethod def slice_time( @@ -1682,13 +1736,103 @@ def slice_time( @read_session -def get_request_stats(state, *, session: "Session"): +def get_request_metrics( + dest_rse_id: "Optional[str]" = None, + src_rse_id: "Optional[str]" = None, + activity: "Optional[str]" = None, + *, + session: "Session" +): + metrics = {} + now = datetime.datetime.utcnow() + + # Add the current queues + db_stats = get_request_stats( + state=[ + RequestState.QUEUED, + ], + src_rse_id=src_rse_id, + dest_rse_id=dest_rse_id, + activity=activity, + session=session, + ) + for stat in db_stats: + if not stat.source_rse_id: + continue + + resp_elem = metrics.setdefault((stat.source_rse_id, stat.dest_rse_id), {}) + + files_elem = resp_elem.setdefault('files', {}) + files_elem.setdefault('queued', {})[stat.activity] = stat.counter + files_elem['queued-total'] = files_elem.get('queued-total', 0) + stat.counter + + bytes_elem = resp_elem.setdefault('bytes', {}) + bytes_elem.setdefault('queued', {})[stat.activity] = stat.bytes + bytes_elem['queued-total'] = bytes_elem.get('queued-total', 0) + stat.bytes + + # Add the historical data + for duration, duration_label, src_resolution in ( + (datetime.timedelta(hours=1), '1h', datetime.timedelta(minutes=5)), + (datetime.timedelta(hours=6), '6h', datetime.timedelta(hours=1)) + ): + db_stats = TransferStatsManager().load_totals( + resolution=src_resolution, + recent_t=now, + older_t=now - duration, + dest_rse_id=dest_rse_id, + src_rse_id=src_rse_id, + activity=activity, + session=session, + ) + + for stat in db_stats: + resp_elem = metrics.setdefault((stat['src_rse_id'], stat['dest_rse_id']), {}) + + files_elem = resp_elem.setdefault('files', {}) + if stat['files_done']: + files_elem.setdefault('done', {}).setdefault(stat['activity'], {})[duration_label] = stat['files_done'] + files_elem[f'done-total-{duration_label}'] = files_elem.get(f'done-total-{duration_label}', 0) + stat['files_done'] + if stat['files_failed']: + files_elem.setdefault('failed', {}).setdefault(stat['activity'], {})[duration_label] = stat['files_failed'] + files_elem[f'failed-total-{duration_label}'] = files_elem.get(f'failed-total-{duration_label}', 0) + stat['files_failed'] + + bytes_elem = resp_elem.setdefault('bytes', {}) + if stat['bytes_done']: + bytes_elem.setdefault('done', {}).setdefault(stat['activity'], {})[duration_label] = stat['bytes_done'] + bytes_elem[f'done-total-{duration_label}'] = bytes_elem.get(f'done-total-{duration_label}', 0) + stat['bytes_done'] + + # Add distances + for distance in get_distances(dest_rse_id=dest_rse_id, src_rse_id=src_rse_id): + resp_elem = metrics.setdefault((distance['src_rse_id'], distance['dest_rse_id']), {}) + + resp_elem['distance'] = distance['distance'] + + # Fill RSE names + rses = RseCollection(rse_ids=itertools.chain.from_iterable(metrics)) + rses.ensure_loaded(load_name=True, include_deleted=True) + response = {} + for (src_id, dst_id), metric in metrics.items(): + src_rse = rses[src_id] + dst_rse = rses[dst_id] + metric['src'] = src_rse.name + metric['dst'] = dst_rse.name + + response[f'{src_rse.name}:{dst_rse.name}'] = metric + + return response + + +@read_session +def get_request_stats( + state: "RequestState | list[RequestState]", + dest_rse_id: "Optional[str]" = None, + src_rse_id: "Optional[str]" = None, + activity: "Optional[str]" = None, + *, + session: "Session" +): """ Retrieve statistics about requests by destination, activity and state. - - :param state: Request state. - :param session: Database session to use. - :returns: List of (activity, dest_rse_id, state, counter). """ if type(state) is not list: @@ -1715,6 +1859,18 @@ def get_request_stats(state, *, session: "Session"): models.Request.source_rse_id, models.Request.activity, ) + if src_rse_id: + stmt = stmt.where( + models.Request.source_rse_id == src_rse_id + ) + if dest_rse_id: + stmt = stmt.where( + models.Request.dest_rse_id == dest_rse_id + ) + if activity: + stmt = stmt.where( + models.Request.activity == activity + ) return session.execute(stmt).all() diff --git a/lib/rucio/core/rse.py b/lib/rucio/core/rse.py index 4b47d22f473..40ebefc6350 100644 --- a/lib/rucio/core/rse.py +++ b/lib/rucio/core/rse.py @@ -153,8 +153,18 @@ def ensure_loaded(self, load_name=False, load_columns=False, load_attributes=Fal @staticmethod @read_session - def bulk_load(rse_id_to_data: "dict[str, RseData]", load_name=False, load_columns=False, load_attributes=False, - load_info=False, load_usage=False, load_limits=False, *, session: "Session"): + def bulk_load( + rse_id_to_data: "dict[str, RseData]", + load_name: bool = False, + load_columns: bool = False, + load_attributes: bool = False, + load_info: bool = False, + load_usage: bool = False, + load_limits: bool = False, + include_deleted: bool = False, + *, + session: "Session" + ): """ Given a dict of RseData objects indexed by rse_id, ensure that the desired fields are initialised in all objects from the input. @@ -203,9 +213,11 @@ def bulk_load(rse_id_to_data: "dict[str, RseData]", load_name=False, load_column temp_table, models.RSE, models.RSE.id == temp_table.id - ).where( - models.RSE.deleted == false() ) + if not include_deleted: + stmt = stmt.where( + models.RSE.deleted == false() + ) db_rses_by_id = {str(db_rse.id): db_rse for db_rse in session.execute(stmt).scalars()} if len(db_rses_by_id) != len(rse_ids_to_load): @@ -331,6 +343,7 @@ def ensure_loaded( load_info: bool = False, load_usage: bool = False, load_limits: bool = False, + include_deleted: bool = False, *, session: "Session", ): @@ -342,6 +355,7 @@ def ensure_loaded( load_info=load_info, load_usage=load_usage, load_limits=load_limits, + include_deleted=include_deleted, session=session, ) diff --git a/lib/rucio/web/rest/flaskapi/v1/requests.py b/lib/rucio/web/rest/flaskapi/v1/requests.py index a5f77d4d153..48323263080 100644 --- a/lib/rucio/web/rest/flaskapi/v1/requests.py +++ b/lib/rucio/web/rest/flaskapi/v1/requests.py @@ -818,7 +818,7 @@ def generate(issuer, vo): return try_stream(generate(issuer=flask.request.environ.get('issuer'), vo=flask.request.environ.get('vo'))) -class RequestStatsGet(ErrorHandlingMethodView): +class RequestMetricsGet(ErrorHandlingMethodView): """ REST API to get request stats. """ @check_accept_header_wrapper_flask(['application/x-json-stream']) @@ -830,9 +830,19 @@ def get(self): tags: - Requests parameters: - - name: state + - name: dest_rse + in: query + description: The destination RSE name + schema: + type: string + - name: source_rse + in: query + description: The source RSE name + schema: + type: string + - name: activity in: query - description: The accepted request state. + description: The activity schema: type: string responses: @@ -882,21 +892,18 @@ def get(self): 406: description: Not acceptable """ - state = flask.request.args.get('state', default=None) - - if not state: - return generate_http_error_flask(400, 'MissingParameter', 'Request state is missing') - - try: - request_state = RequestState(state) - except ValueError: - return generate_http_error_flask(400, 'Invalid', 'Request state value is invalid') - - def generate(issuer, vo): - for result in request.get_request_stats(request_state, issuer=issuer, vo=vo): - yield render_json(**result) + '\n' + dst_rse = flask.request.args.get('dst_rse', default=None) + src_rse = flask.request.args.get('src_rse', default=None) + activity = flask.request.args.get('activity', default=None) - return try_stream(generate(issuer=flask.request.environ.get('issuer'), vo=flask.request.environ.get('vo'))) + metrics = request.get_request_metrics( + dst_rse=dst_rse, + src_rse=src_rse, + activity=activity, + issuer=flask.request.environ.get('issuer'), + vo=flask.request.environ.get('vo') + ) + return Response(json.dumps(metrics, cls=APIEncoder), content_type='application/json') def blueprint(): @@ -910,8 +917,8 @@ def blueprint(): bp.add_url_rule('/list', view_func=request_list_view, methods=['get', ]) request_history_list_view = RequestHistoryList.as_view('request_history_list') bp.add_url_rule('/history/list', view_func=request_history_list_view, methods=['get', ]) - request_stats_view = RequestStatsGet.as_view('request_stats') - bp.add_url_rule('/stats', view_func=request_stats_view, methods=['get', ]) + request_metrics_view = RequestMetricsGet.as_view('request_metrics_get') + bp.add_url_rule('/metrics', view_func=request_metrics_view, methods=['get', ]) bp.after_request(response_headers) return bp diff --git a/tests/test_request.py b/tests/test_request.py index 7f811a6b0fd..664393b5e92 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json from datetime import datetime from typing import Union @@ -20,8 +21,9 @@ from rucio.common.config import config_get_bool from rucio.common.utils import generate_uuid, parse_response +from rucio.core.distance import add_distance from rucio.core.replica import add_replica -from rucio.core.request import queue_requests, get_request_by_did, list_requests, list_requests_history, set_transfer_limit +from rucio.core.request import queue_requests, get_request_by_did, list_requests, list_requests_history, set_transfer_limit, TransferStatsManager from rucio.core.rse import add_rse_attribute from rucio.db.sqla import models, constants from rucio.db.sqla.constants import RequestType, RequestState @@ -286,3 +288,80 @@ def check_error_api(params, exception_class, exception_message, code): params = {'request_states': 'S', 'src_site': source_site, 'dst_site': 'unknown'} check_error_api(params, 'NotFound', 'Could not resolve site name unknown to RSE', 404) + + +@pytest.mark.parametrize("file_config_mock", [{"overrides": [ + ('transfers', 'stats_enabled', 'True'), +]}], indirect=True) +def test_api_metrics(vo, rest_client, auth_token, rse_factory, did_factory, root_account, file_config_mock): + + src_rse, src_rse_id = rse_factory.make_mock_rse() + dst_rse, dst_rse_id = rse_factory.make_mock_rse() + add_distance(src_rse_id, dst_rse_id, distance=10) + + replica_bytes = 20 + + did1 = did_factory.random_file_did() + activity1 = 'User Subscription' + add_replica(rse_id=src_rse_id, bytes_=replica_bytes, adler32='beefdead', account=root_account, **did1) + + did2 = did_factory.random_file_did() + activity2 = 'Test' + add_replica(rse_id=src_rse_id, bytes_=replica_bytes, adler32='beefdead', account=root_account, **did2) + + requests = [ + { + 'dest_rse_id': dst_rse_id, + 'source_rse_id': src_rse_id, + 'request_type': RequestType.TRANSFER, + 'request_id': generate_uuid(), + 'name': did1['name'], + 'scope': did1['scope'], + 'rule_id': generate_uuid(), + 'retry_count': 1, + 'attributes': { + 'activity': activity, + 'bytes': replica_bytes, + 'md5': '', + 'adler32': '' + } + } + for did, activity in ((did1, activity1), (did2, activity2)) + ] + queue_requests(requests) + + stats_manager = TransferStatsManager() + stats_manager.observe( + src_rse_id=src_rse_id, + dst_rse_id=dst_rse_id, + activity=activity1, + state=RequestState.DONE, + file_size=367, + ) + stats_manager.observe( + src_rse_id=src_rse_id, + dst_rse_id=dst_rse_id, + activity=activity2, + state=RequestState.FAILED, + file_size=1020, + ) + stats_manager.force_save() + stats_manager.downsample_and_cleanup() + + api_endpoint = '/requests/metrics' + params = {'dst_rse': dst_rse, 'src_rse': src_rse} + headers_dict = {'X-Rucio-Type': 'user', 'X-Rucio-Account': root_account.external} + response = rest_client.get(api_endpoint, query_string=params, headers=headers(auth(auth_token), vohdr(vo), hdrdict(headers_dict))) + response = json.loads(response.get_data(as_text=True)) + metric = response.get(f'{src_rse}:{dst_rse}') + assert metric is not None + assert metric['distance'] == 10 + assert metric['bytes']['queued'][activity1] == replica_bytes + assert metric['bytes']['queued'][activity2] == replica_bytes + assert metric['bytes']['queued-total'] == 2 * replica_bytes + assert metric['files']['queued'][activity1] == 1 + assert metric['files']['queued'][activity2] == 1 + assert metric['files']['queued-total'] == 2 + assert metric['files']['done'][activity1]['1h'] == 1 + assert metric['bytes']['done'][activity1]['1h'] == 367 + assert metric['files']['failed'][activity2]['1h'] == 1