Skip to content

Commit

Permalink
Transfers: implement preliminary support for packet marking. rucio#5856
Browse files Browse the repository at this point in the history
Fetch the marking configuration once, on startup, then every 2 days.
If fetching fails, re-fetch again more rapidly to recover from
transient failures.

For now, the marking is added as a file_metadata field. Waiting for
the final decision from FTS developers about the correct way of
passing this parameter
  • Loading branch information
Radu Carpa committed Oct 5, 2023
1 parent fc3973a commit 835d6e2
Show file tree
Hide file tree
Showing 7 changed files with 582 additions and 46 deletions.
63 changes: 62 additions & 1 deletion lib/rucio/transfertool/fts3.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from requests.packages.urllib3 import disable_warnings # pylint: disable=import-error

from rucio.common.cache import make_region_memcached
from rucio.common.config import config_get, config_get_bool
from rucio.common.config import config_get, config_get_bool, config_get_int
from rucio.common.constants import FTS_JOB_TYPE, FTS_STATE, FTS_COMPLETE_STATE
from rucio.common.exception import TransferToolTimeout, TransferToolWrongAnswer, DuplicateFileTransferSubmission
from rucio.common.stopwatch import Stopwatch
Expand Down Expand Up @@ -104,6 +104,61 @@
('none', 'none'): 'none',
}

_SCITAGS_NEXT_REFRESH = datetime.datetime.utcnow()
_SCITAGS_EXP_ID = None
_SCITAGS_ACTIVITY_IDS = {}


def _scitags_ids(logger: Callable[..., Any] = logging.log) -> "tuple[int | None, dict[str, int]]":
"""
Re-fetch if needed and return the scitags ids
"""
enabled = config_get_bool('packet-marking', 'enabled', default=False)
if not enabled:
return None, {}

now = datetime.datetime.utcnow()
global _SCITAGS_ACTIVITY_IDS
global _SCITAGS_EXP_ID
global _SCITAGS_NEXT_REFRESH
if _SCITAGS_NEXT_REFRESH < now:
exp_name = config_get('packet-marking', 'exp_name', default='')
fetch_url = config_get('packet-marking', 'fetch_url', default='https://www.scitags.org/api.json')
fetch_interval = config_get_int('packet-marking', 'fetch_interval', default=datetime.timedelta(hours=48).seconds)
fetch_timeout = config_get_int('packet-marking', 'fetch_timeout', default=5)

_SCITAGS_NEXT_REFRESH = now + datetime.timedelta(seconds=fetch_interval)

if exp_name:
had_exception = False
exp_id = None
activity_ids = {}
try:
result = requests.get(fetch_url, timeout=fetch_timeout)
if result and result.status_code == 200:
marks = result.json()
for experiment in marks.get('experiments', []):
if experiment.get('expName') == exp_name:
exp_id = experiment.get('expId')
for activity_dict in experiment.get('activities', []):
activity_name = activity_dict.get('activityName')
activity_id = activity_dict.get('activityId')
if activity_name and activity_id:
activity_ids[activity_name] = int(activity_id)
break
except (requests.exceptions.RequestException, TypeError, ValueError):
had_exception = True
logger(logging.WARNING, 'Failed to fetch the scitags markings', exc_info=True)

if had_exception:
# Retry quicker after fetch errors
_SCITAGS_NEXT_REFRESH = min(_SCITAGS_NEXT_REFRESH, now + datetime.timedelta(minutes=5))
else:
_SCITAGS_EXP_ID = exp_id
_SCITAGS_ACTIVITY_IDS = activity_ids

return _SCITAGS_EXP_ID, _SCITAGS_ACTIVITY_IDS


def _pick_cert_file(vo: "Optional[str]") -> "Optional[str]":
cert = None
Expand Down Expand Up @@ -791,6 +846,8 @@ def __init__(self, external_host, oidc_account=None, vo=None, group_bulk=1, grou
self.cert = None
self.verify = True # True is the default setting of a requests.* method

self.scitags_exp_id, self.scitags_activity_ids = _scitags_ids(logger=logger)

@classmethod
def _pick_fts_servers(cls, source_rse: "RseData", dest_rse: "RseData"):
"""
Expand Down Expand Up @@ -882,6 +939,10 @@ def _file_from_transfer(self, transfer, job_params):
'selection_strategy': self.source_strategy if self.source_strategy else _configured_source_strategy(transfer.rws.activity, logger=self.logger),
'activity': rws.activity
}
if isinstance(self.scitags_exp_id, int):
activity_id = self.scitags_activity_ids.get(rws.activity)
if isinstance(activity_id, int):
t_file['metadata']['scitags_id'] = self.scitags_exp_id << 6 | activity_id
return t_file

def submit(self, transfers, job_params, timeout=None):
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def db_session():

def __get_fixture_param(request):
fixture_param = getattr(request, "param", None)
if not fixture_param:
if not fixture_param and request.instance:
# Parametrize support is incomplete for legacy unittest test cases
# Manually retrieve the parameters from the list of marks:
mark = next(iter(filter(lambda m: m.name == 'parametrize', request.instance.pytestmark)), None)
Expand Down
7 changes: 6 additions & 1 deletion tests/inputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
import os
from pathlib import Path

DIRECTORY = Path(os.path.abspath(__file__)).parent

# GeoLite2-City-Test.mmdb downloaded on 26/Jan/2022 from ./test-data/ in
# https://github.com/maxmind/MaxMind-DB/tree/2f0ef0249245c7f19feffa366793a6fffd529701/
# Check ./source-data/GeoLite2-City-Test.json in this repository for IPs to use in tests
GEOIP_LITE2_CITY_TEST_DB = Path(os.path.abspath(__file__)).parent / 'GeoLite2-City-Test.tar.gz'
GEOIP_LITE2_CITY_TEST_DB = DIRECTORY / 'GeoLite2-City-Test.tar.gz'

# Downloaded on 05/oct/2023 from https://www.scitags.org/api.json
SCITAGS_JSON = DIRECTORY / 'scitags.json'
Loading

0 comments on commit 835d6e2

Please sign in to comment.