Skip to content

Commit

Permalink
Transfers: gather and store transfer statistics. rucio#6189
Browse files Browse the repository at this point in the history
Each time the poller or receiver transitions a transfer to a different
state, use the information to update the transfer statistic.
The goal is to be able to answer following questions in rucio:
 - how many successful transfers happened towards RSE1 yesterday
 - what was the average transfer rate from RSE1 to RSE2 last hour
Short-term it will allow to visualize the transfer status. Longer-term,
we may use this data to take better scheduling decisions (selection of
destination RSE during rule evaluation; selection of source RSE for
transfers).

Each poller/receiver will gather statistics over a certain time window
before committing the aggregated stats gathered over this time interval
into the database. This reduces the number of database writes at the
cost of potentially losing more metrics in case of a crash. We keep
track of the total number of failed/successful transfers in the given
time window; plus the number of transferred bytes.

Metrics will be aggregated regularly into lower-resolution samples.
Aggregation is done by summing higher resolution metrics. Lower
resolution samples will be stored for a longer period in the database.
Downsampling is performed automatically by each involved daemon. As
a result, multiple downsample operations could potentially be done in
parallel. Some basic mitigation was put in place to reduce the likely-
hood of parallel downsampling. Moreover, the code tolerates
parallel execution at the cost of allowing duplicate values at lower
resolution in the database. These duplicate values must be explicitly
handled at query time. Special care is needed to avoid double-counting.

The behavior was heavily inspired by existing time series databases,
like prometheus and influxdb. The possibility to use an external tool
for this job was discussed, but rejected.
  • Loading branch information
Radu Carpa committed Nov 27, 2023
1 parent 356b4d1 commit 2092f6c
Show file tree
Hide file tree
Showing 15 changed files with 962 additions and 69 deletions.
23 changes: 23 additions & 0 deletions etc/sql/oracle/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1996,3 +1996,26 @@ CREATE GLOBAL TEMPORARY TABLE TEMPORARY_ID_4
id RAW(16),
CONSTRAINT TEMPORARY_ID_4_PK PRIMARY KEY (id)
) ON COMMIT DELETE ROWS;

-- 73 ) ========================================= TRANSFER_STATS table ===================================

CREATE TABLE TRANSFER_STATS
(
ID RAW(16),
RESOLUTION INTEGER,
TIMESTAMP DATE,
DEST_RSE_ID RAW(16),
SRC_RSE_ID RAW(16),
ACTIVITY VARCHAR2(50),
FILES_DONE NUMBER(19,0),
BYTES_DONE NUMBER(19,0),
FILES_FAILED NUMBER(19,0),
CREATED_AT DATE,
UPDATED_AT DATE,
CONSTRAINT TRANSFER_STATS_PK PRIMARY KEY (ID),
CONSTRAINT TRANSFER_STATS_DEST_RSE_FK FOREIGN KEY(DEST_RSE_ID) REFERENCES RSES (ID),
CONSTRAINT TRANSFER_STATS_SRC_RSE_FK FOREIGN KEY(SRC_RSE_ID) REFERENCES RSES (ID),
CONSTRAINT TRANSFER_STATS_CREATED_NN CHECK (CREATED_AT IS NOT NULL),
CONSTRAINT TRANSFER_STATS_UPDATED_NN CHECK (UPDATED_AT IS NOT NULL)
);
CREATE INDEX TRANSFER_STATS_KEY_IDX ON TRANSFER_STATS (RESOLUTION, TIMESTAMP, DEST_RSE_ID, SRC_RSE_ID, ACTIVITY);
2 changes: 1 addition & 1 deletion lib/rucio/alembicrevision.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

ALEMBIC_REVISION = '4df2c5ddabc0' # the current alembic head revision
ALEMBIC_REVISION = 'a08fa8de1545' # the current alembic head revision
22 changes: 14 additions & 8 deletions lib/rucio/api/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
Interface for the requests abstraction layer
"""

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

from rucio.api import permission
from rucio.common import exception
Expand Down Expand Up @@ -219,19 +219,25 @@ def list_requests_history(src_rses, dst_rses, states, issuer, vo='def', offset=N
yield api_update_return_dict(req, session=session)


@stream_session
def get_request_stats(state, issuer, vo='def', *, session: "Session"):
@read_session
def get_request_metrics(src_rse: Optional[str], dst_rse: Optional[str], activity: Optional[str], issuer, vo='def', *, session: "Session"):
"""
Get statistics of requests in a specific state grouped by source RSE, destination RSE, and activity.
:param state: request state.
:param src_rse: source RSE.
:param dst_rse: destination RSE.
:param activity: activity
:param issuer: Issuing account as a string.
:param session: The database session in use.
"""
src_rse_id = None
if src_rse:
src_rse_id = get_rse_id(rse=src_rse, vo=vo, session=session)
dst_rse_id = None
if dst_rse:
dst_rse_id = get_rse_id(rse=dst_rse, vo=vo, session=session)
kwargs = {'issuer': issuer}
if not permission.has_permission(issuer=issuer, vo=vo, action='get_request_stats', kwargs=kwargs, session=session):
if not permission.has_permission(issuer=issuer, vo=vo, action='get_request_metrics', kwargs=kwargs, session=session):
raise exception.AccessDenied(f'{issuer} cannot get request statistics')

for req in request.get_request_stats(state, session=session):
req = req.to_dict()
yield api_update_return_dict(req, session=session)
return request.get_request_metrics(dest_rse_id=dst_rse_id, src_rse_id=src_rse_id, activity=activity, session=session)
13 changes: 13 additions & 0 deletions lib/rucio/core/permission/atlas.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def has_permission(issuer, action, kwargs, *, session: "Optional[Session]" = Non
'set_rse_usage': perm_set_rse_usage,
'set_rse_limits': perm_set_rse_limits,
'get_request_by_did': perm_get_request_by_did,
'get_request_metrics': perm_get_request_metrics,
'cancel_request': perm_cancel_request,
'get_next': perm_get_next,
'set_local_account_limit': perm_set_local_account_limit,
Expand Down Expand Up @@ -940,6 +941,18 @@ def perm_get_request_by_did(issuer, kwargs, *, session: "Optional[Session]" = No
return True


def perm_get_request_metrics(issuer, kwargs, *, session: "Optional[Session]" = None):
"""
Checks if an account can get the request stats
:param issuer: Account identifier which issues the command.
:param kwargs: List of arguments for the action.
:param session: The DB session to use
:returns: True if account is allowed, otherwise False
"""
return _is_root(issuer) or has_account_attribute(account=issuer, key='admin', session=session)


def perm_cancel_request(issuer, kwargs, *, session: "Optional[Session]" = None):
"""
Checks if an account can cancel a request.
Expand Down
Loading

0 comments on commit 2092f6c

Please sign in to comment.