Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Radu Carpa committed Oct 13, 2023
1 parent d139114 commit 75b3eff
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 2 deletions.
223 changes: 221 additions & 2 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
import datetime
import logging
import re
import threading
import time
import traceback
from collections import defaultdict
from dataclasses import dataclass
from decimal import Decimal, localcontext
from typing import TYPE_CHECKING

from dogpile.cache import make_region
from dogpile.cache.api import NoValue
from sqlalchemy import select, update
from sqlalchemy import delete, func, insert, select, update
from sqlalchemy.exc import IntegrityError

from rucio.common import constants
Expand Down Expand Up @@ -287,6 +289,205 @@ def legacy_sources(self):
return self._legacy_sources


class TransferStatsManager:

@dataclass
class _StatsRecord:
files_failed: int = 0
files_done: int = 0
bytes_done: int = 0

def __init__(self):
self.lock = threading.Lock()

retentions = sorted([
# resolution, retention
(datetime.timedelta(seconds=5), datetime.timedelta(seconds=10)),
(datetime.timedelta(seconds=10), datetime.timedelta(seconds=20)),
(datetime.timedelta(seconds=20), datetime.timedelta(minutes=2)),
#(datetime.timedelta(minutes=5), datetime.timedelta(hours=1)),
#(datetime.timedelta(hours=1), datetime.timedelta(days=1)),
#(datetime.timedelta(days=1), datetime.timedelta(days=30)),
])

self.retentions = retentions
self.observe_resolution = self.retentions[0][0]

self.current_timestamp, _ = next(self.slice_time(self.observe_resolution, start_time=datetime.datetime.utcnow() + self.observe_resolution))
self.current_samples = defaultdict(lambda: self._StatsRecord())

def observe(self, src_rse_id: str, dst_rse_id: str, activity: str, state: RequestState, file_size: int, *, session: "Optional[Session]" = None):
now = datetime.datetime.utcnow()
samples_to_save = None
with self.lock:
if now > self.current_timestamp:
if self.current_samples:
samples_to_save = {self.current_timestamp: self.current_samples}
self.current_timestamp, _ = next(self.slice_time(self.observe_resolution, start_time=now + self.observe_resolution))

if state in (RequestState.DONE, RequestState.FAILED):
record = self.current_samples[dst_rse_id, src_rse_id, activity]
if state == RequestState.DONE:
record.files_done += 1
record.bytes_done += file_size
else:
record.files_failed += 1
if samples_to_save:
self._save_samples(samples_to_save, session=session)

@transactional_session
def _save_samples(self, to_save, *, session: "Session"):
rows_to_insert = []
for timestamp, samples in to_save.items():
for (dst_rse_id, src_rse_id, activity), record in samples.items():
rows_to_insert.append({
models.TransferStats.timestamp.name: timestamp,
models.TransferStats.resolution.name: self.observe_resolution.seconds,
models.TransferStats.src_rse_id.name: src_rse_id,
models.TransferStats.dest_rse_id.name: dst_rse_id,
models.TransferStats.activity.name: activity,
models.TransferStats.files_failed.name: record.files_failed,
models.TransferStats.files_done.name: record.files_done,
models.TransferStats.bytes_done.name: record.bytes_done,
})
if rows_to_insert:
session.execute(insert(models.TransferStats), rows_to_insert)

@transactional_session
def downsample_and_cleanup(self, *, session: "Session"):
"""
Housekeeping of samples in the database:
- create lower-resolution (but higher-retention) samples from higher-resolution ones;
- delete the samples which are older than the desired retention time.
"""
now = datetime.datetime.utcnow()

stmt = select(
models.TransferStats.resolution,
func.max(models.TransferStats.timestamp),
func.min(models.TransferStats.timestamp),
).group_by(
models.TransferStats.resolution,
)
db_time_ranges = {
datetime.timedelta(seconds=res): (newest_t, oldest_t)
for res, newest_t, oldest_t in session.execute(stmt)
}

for i in range(1, len(self.retentions)):
src_resolution, desired_src_retention = self.retentions[i - 1]
dst_resolution, desired_dst_retention = self.retentions[i]

# Always keep samples at source resolution aligned to the destination resolution interval.
# Keep, at least, the amount of samples needed to cover the first interval at
# destination resolution, but keep more samples if explicitly configured to do so.
_, oldest_desired_src_timestamp = next(self.slice_time(dst_resolution, start_time=now - desired_src_retention))

_, oldest_available_src_timestamp = db_time_ranges.get(src_resolution, (None, None))
newest_available_dst_timestamp, oldest_available_dst_timestamp = db_time_ranges.get(dst_resolution, (None, None))
# Only generate down-samples at destination resolution for interval in which:
# - are within the desired retention window
oldest_time_to_handle = now - desired_dst_retention - dst_resolution
# - we didn't already generate the corresponding sample at destination resolution
if newest_available_dst_timestamp:
oldest_time_to_handle = max(oldest_time_to_handle, newest_available_dst_timestamp)
# - we have samples at source resolution to do it
if oldest_available_src_timestamp:
oldest_time_to_handle = max(oldest_time_to_handle, oldest_available_src_timestamp - src_resolution)
else:
oldest_time_to_handle = now

# Create samples at lower resolution from samples at higher resolution
for recent_t, older_t in self.slice_time(dst_resolution, start_time=now, end_time=oldest_time_to_handle):
stmt = select(
models.TransferStats.src_rse_id,
models.TransferStats.dest_rse_id,
models.TransferStats.activity,
func.sum(models.TransferStats.files_failed).label(models.TransferStats.files_failed.name),
func.sum(models.TransferStats.files_done).label(models.TransferStats.files_done.name),
func.sum(models.TransferStats.bytes_done).label(models.TransferStats.bytes_done.name),
).where(
models.TransferStats.resolution == src_resolution.seconds,
models.TransferStats.timestamp > older_t,
models.TransferStats.timestamp <= recent_t,
).group_by(
models.TransferStats.src_rse_id,
models.TransferStats.dest_rse_id,
models.TransferStats.activity,
)

additional_fields = {
models.TransferStats.timestamp.name: recent_t,
models.TransferStats.resolution.name: dst_resolution.seconds,
}
downsample_stats = [row._asdict() | additional_fields for row in session.execute(stmt)]
if downsample_stats:
session.execute(insert(models.TransferStats), downsample_stats)
if not oldest_available_dst_timestamp or recent_t < oldest_available_dst_timestamp:
oldest_available_dst_timestamp = recent_t
if not newest_available_dst_timestamp or recent_t > newest_available_dst_timestamp:
newest_available_dst_timestamp = recent_t

if oldest_available_dst_timestamp and newest_available_dst_timestamp:
db_time_ranges[dst_resolution] = (newest_available_dst_timestamp, oldest_available_dst_timestamp)

# Delete from the database the samples which are older than desired
if oldest_available_src_timestamp and oldest_available_src_timestamp < oldest_desired_src_timestamp:
stmt = delete(
models.TransferStats
).where(
models.TransferStats.timestamp < oldest_desired_src_timestamp,
models.TransferStats.resolution == src_resolution.seconds
)
session.execute(stmt)

# Cleanup samples at the lowest resolution, which were not handled by the previous loop
last_resolution, last_retention = self.retentions[-1]
_, oldest_desired_timestamp = next(self.slice_time(last_resolution, start_time=now - last_retention))
if db_time_ranges.get(last_resolution, (now, now))[1] < oldest_desired_timestamp:
stmt = delete(
models.TransferStats
).where(
models.TransferStats.timestamp < oldest_desired_timestamp,
models.TransferStats.resolution == last_resolution.seconds,
)
session.execute(stmt)

# Cleanup all resolutions which exist in the database but are not desired by rucio anymore
# (probably due to configuration changes).
for resolution_to_cleanup in set(db_time_ranges).difference(r[0] for r in self.retentions):
stmt = delete(
models.TransferStats
).where(
models.TransferStats.resolution == resolution_to_cleanup.seconds
)
session.execute(stmt)

@staticmethod
def slice_time(
resolution: datetime.timedelta,
start_time: "Optional[datetime.datetime]" = None,
end_time: "Optional[datetime.datetime]" = None
):
"""
Iterates, back in time, over time intervals of length `resolution` which are fully
included within the input interval (start_time, end_time).
Intervals are aligned on boundaries divisible by resolution.
For example: for start_time=17:09:59, end_time=16:20:01 and resolution = 10minutes, it will yield
(17:00:00, 16:50:00), (16:50:00, 16:40:00), (16:40:00, 16:30:00)
"""

if start_time is None:
start_time = datetime.datetime.utcnow()
newer_t = datetime.datetime.fromtimestamp(int(start_time.timestamp()) // resolution.seconds * resolution.seconds)
older_t = newer_t - resolution
while not end_time or older_t >= end_time:
yield newer_t, older_t
newer_t = older_t
older_t = older_t - resolution


class FlowManager:
class EdgeFlow:
def __init__(self, edge):
Expand Down Expand Up @@ -776,6 +977,24 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S
if request_core.is_intermediate_hop(request):
request_core.handle_failed_intermediate_hop(request, session=session)

stats_manage = TransferStatsManager()
stats_manage.observe(
src_rse_id=request['source_rse_id'],
dst_rse_id=request['dest_rse_id'],
activity=request['activity'],
state=tt_status_report.state,
file_size=request['bytes'],
)
import time
time.sleep(21)
stats_manage.observe(
src_rse_id=request['source_rse_id'],
dst_rse_id=request['dest_rse_id'],
activity=request['activity'],
state=tt_status_report.state,
file_size=request['bytes'],
)
stats_manage.downsample_and_cleanup(session=session)
request_core.add_monitor_message(
new_state=tt_status_report.state,
request=request,
Expand All @@ -786,7 +1005,7 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S
except UnsupportedOperation as error:
logger(logging.WARNING, "Request %s doesn't exist - Error: %s" % (request_id, str(error).replace('\n', '')))
return False
except Exception:
except Exception as e:
logger(logging.CRITICAL, "Exception", exc_info=True)


Expand Down
18 changes: 18 additions & 0 deletions lib/rucio/db/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,24 @@ class Distance(BASE, ModelBase):
Index('DISTANCES_DEST_RSEID_IDX', 'dest_rse_id'))


class TransferStats(BASE, ModelBase):
"""Represents counters for transfer link usage"""
__tablename__ = 'transfer_stats'
id: Mapped[uuid.UUID] = mapped_column(GUID(), default=utils.generate_uuid)
timestamp: Mapped[datetime] = mapped_column(DateTime)
resolution: Mapped[int] = mapped_column(Integer)
dest_rse_id: Mapped[uuid.UUID] = mapped_column(GUID())
src_rse_id: Mapped[uuid.UUID] = mapped_column(GUID())
activity: Mapped[Optional[str]] = mapped_column(String(50))
files_done: Mapped[int] = mapped_column(BigInteger)
bytes_done: Mapped[int] = mapped_column(BigInteger)
files_failed: Mapped[int] = mapped_column(BigInteger)
_table_args = (PrimaryKeyConstraint('id', name='TRANSFER_STATS_PK'),
ForeignKeyConstraint(['src_rse_id'], ['rses.id'], name='TRANSFER_STATS_SRC_RSE_FK'),
ForeignKeyConstraint(['dest_rse_id'], ['rses.id'], name='TRANSFER_STATS_DEST_RSE_FK'),
Index('TRANSFER_STATS_KEY_IDX', 'resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity'))


class Subscription(BASE, ModelBase):
"""Represents a subscription"""
__tablename__ = 'subscriptions'
Expand Down

0 comments on commit 75b3eff

Please sign in to comment.