Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Radu Carpa committed Sep 28, 2023
1 parent 8ab749b commit 847212f
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 2 deletions.
161 changes: 159 additions & 2 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
import time
import traceback
from collections import defaultdict
from dataclasses import dataclass
from decimal import Decimal, localcontext
from typing import TYPE_CHECKING

from dogpile.cache import make_region
from dogpile.cache.api import NoValue
from sqlalchemy import select, update
from sqlalchemy import delete, exists, func, insert, select, update
from sqlalchemy.exc import IntegrityError

from rucio.common import constants
Expand Down Expand Up @@ -287,6 +288,149 @@ def legacy_sources(self):
return self._legacy_sources


class TransferStatsManager:

@dataclass
class _StatsRecord:
files_failed: int = 0
files_done: int = 0
bytes_done: int = 0

def __init__(self):
self.last_saved = datetime.datetime.utcnow()
self.pending_metrics = defaultdict(lambda: self._StatsRecord())

retentions = sorted([
# resolution, retention
(datetime.timedelta(seconds=1), datetime.timedelta(seconds=10)),
(datetime.timedelta(seconds=5), datetime.timedelta(seconds=20)),
(datetime.timedelta(seconds=20), datetime.timedelta(minutes=2)),
#(datetime.timedelta(minutes=5), datetime.timedelta(hours=1)),
#(datetime.timedelta(hours=1), datetime.timedelta(days=1)),
#(datetime.timedelta(days=1), datetime.timedelta(days=30)),
])

self.retentions = retentions
self.observe_resolution = self.retentions[0][0]

def observe(self, src_rse_id: str, dst_rse_id: str, activity: str, state: RequestState, file_size: int):
if state in (RequestState.DONE, RequestState.FAILED):
record = self.pending_metrics[dst_rse_id, src_rse_id, activity]
if state == RequestState.DONE:
record.files_done += 1
record.bytes_done += file_size
else:
record.files_failed += 1

if self.last_saved < datetime.datetime.utcnow() - self.observe_resolution:
self.commit_stats()

@transactional_session
def commit_stats(self, *, session: "Session"):
rows_to_insert = []
for (dst_rse_id, src_rse_id, activity), record in self.pending_metrics.items():
rows_to_insert.append({
models.TransferStats.timestamp.name: self.last_saved,
models.TransferStats.resolution.name: self.observe_resolution.seconds,
models.TransferStats.src_rse_id.name: src_rse_id,
models.TransferStats.dest_rse_id.name: dst_rse_id,
models.TransferStats.activity.name: activity,
models.TransferStats.files_failed.name: record.files_failed,
models.TransferStats.files_done.name: record.files_done,
models.TransferStats.bytes_done.name: record.bytes_done,
})

if rows_to_insert:
session.execute(insert(models.TransferStats), rows_to_insert)

@transactional_session
def downsample_and_cleanup(self, *, session: "Session"):
now = datetime.datetime.utcnow()

stmt = select(
models.TransferStats.resolution,
func.max(models.TransferStats.timestamp),
# ).where(
# models.TransferStats.resolution.in_([r[0].seconds for r in self.retentions]),
).group_by(
models.TransferStats.resolution,
)
most_recent_db_timestamps = {datetime.timedelta(seconds=r): t for r, t in session.execute(stmt)}

for i in range(1, len(self.retentions)):
src_resolution, src_duration = self.retentions[i - 1]
dst_resolution, dst_duration = self.retentions[i]
oldest_time_to_handle = max(most_recent_db_timestamps.get(dst_resolution, now - dst_duration), now - dst_duration)
for recent_t, older_t in self._slice_time(dst_resolution, start_time=now, end_time=oldest_time_to_handle):
stmt = select(
models.TransferStats.src_rse_id,
models.TransferStats.dest_rse_id,
models.TransferStats.activity,
func.sum(models.TransferStats.files_failed).label(models.TransferStats.files_failed.name),
func.sum(models.TransferStats.files_done).label(models.TransferStats.files_done.name),
func.sum(models.TransferStats.bytes_done).label(models.TransferStats.bytes_done.name),
).where(
models.TransferStats.resolution == src_resolution.seconds,
models.TransferStats.timestamp > older_t,
models.TransferStats.timestamp <= recent_t,
).group_by(
models.TransferStats.src_rse_id,
models.TransferStats.dest_rse_id,
models.TransferStats.activity,
)

additional_fields = {
models.TransferStats.timestamp.name: recent_t,
models.TransferStats.resolution.name: dst_resolution.seconds,
}
downsample_stats = [row._asdict() | additional_fields for row in session.execute(stmt)]
if downsample_stats:
session.execute(insert(models.TransferStats), downsample_stats)

stmt = delete(
models.TransferStats
).where(
models.TransferStats.timestamp < now - src_duration,
models.TransferStats.resolution == src_resolution.seconds
)
session.execute(stmt)

stmt = delete(
models.TransferStats
).where(
models.TransferStats.resolution == self.retentions[-1][0].seconds,
models.TransferStats.timestamp < now - self.retentions[-1][1],
)
session.execute(stmt)

# Cleanup all resolutions which exist in the database but are not desired by rucio anymore
# Probably due to configuration changes.
for resolution_to_cleanup in set(most_recent_db_timestamps).difference(r[0] for r in self.retentions):
stmt = delete(
models.TransferStats
).where(
models.TransferStats.resolution == resolution_to_cleanup.seconds
)
session.execute(stmt)

@staticmethod
def _slice_time(
resolution: datetime.timedelta,
start_time: "Optional[datetime.datetime]" = None,
end_time: "Optional[datetime.datetime]" = None
):
if start_time is None:
start_time = datetime.datetime.utcnow()
newer_t = datetime.datetime.fromtimestamp(int(start_time.timestamp()) // resolution.seconds * resolution.seconds)
older_t = newer_t - resolution
while True:
if end_time and newer_t <= end_time:
break
yield newer_t, older_t
newer_t = older_t
older_t = older_t - resolution


class FlowManager:
class EdgeFlow:
def __init__(self, edge):
Expand Down Expand Up @@ -776,6 +920,19 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S
if request_core.is_intermediate_hop(request):
request_core.handle_failed_intermediate_hop(request, session=session)

stats_manage = TransferStatsManager()
stats_manage.observe(
src_rse_id=request['source_rse_id'],
dst_rse_id=request['dest_rse_id'],
activity=request['activity'],
state=tt_status_report.state,
file_size=request['bytes'],
)
stats_manage.commit_stats(session=session)

import time
time.sleep(21)
stats_manage.downsample_and_cleanup(session=session)
request_core.add_monitor_message(
new_state=tt_status_report.state,
request=request,
Expand All @@ -786,7 +943,7 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S
except UnsupportedOperation as error:
logger(logging.WARNING, "Request %s doesn't exist - Error: %s" % (request_id, str(error).replace('\n', '')))
return False
except Exception:
except Exception as e:
logger(logging.CRITICAL, "Exception", exc_info=True)


Expand Down
18 changes: 18 additions & 0 deletions lib/rucio/db/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,24 @@ class Distance(BASE, ModelBase):
Index('DISTANCES_DEST_RSEID_IDX', 'dest_rse_id'))


class TransferStats(BASE, ModelBase):
"""Represents counters for transfer link usage"""
__tablename__ = 'transfer_stats'
id: Mapped[uuid.UUID] = mapped_column(GUID(), default=utils.generate_uuid)
timestamp: Mapped[datetime] = mapped_column(DateTime)
resolution: Mapped[int] = mapped_column(Integer)
dest_rse_id: Mapped[uuid.UUID] = mapped_column(GUID())
src_rse_id: Mapped[uuid.UUID] = mapped_column(GUID())
activity: Mapped[Optional[str]] = mapped_column(String(50))
files_done: Mapped[int] = mapped_column(BigInteger)
bytes_done: Mapped[int] = mapped_column(BigInteger)
files_failed: Mapped[int] = mapped_column(BigInteger)
_table_args = (PrimaryKeyConstraint('id', name='TRANSFER_STATS_PK'),
ForeignKeyConstraint(['src_rse_id'], ['rses.id'], name='TRANSFER_STATS_SRC_RSE_FK'),
ForeignKeyConstraint(['dest_rse_id'], ['rses.id'], name='TRANSFER_STATS_DEST_RSE_FK'),
Index('TRANSFER_STATS_KEY_IDX', 'resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity'))


class Subscription(BASE, ModelBase):
"""Represents a subscription"""
__tablename__ = 'subscriptions'
Expand Down

0 comments on commit 847212f

Please sign in to comment.