Skip to content

Commit

Permalink
Restructure metrics collector
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytrotkk committed Nov 1, 2024
1 parent 95f4e5f commit 2cf38af
Showing 1 changed file with 66 additions and 44 deletions.
110 changes: 66 additions & 44 deletions metrics/src/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Tuple, Optional, Dict
from typing import Tuple, Optional, Dict, List
from aiohttp import ClientError, ClientSession

from src.explorer import get_address_counters_url, get_chain_stats
Expand All @@ -36,24 +36,27 @@
GITHUB_RAW_URL,
OFFCHAIN_KEY,
)
from src.metrics_types import AddressCounter, AddressCountersMap, MetricsData, ChainMetrics

from src.metrics_types import (
AddressCounter,
AddressCountersMap,
MetricsData,
ChainMetrics,
AddressType,
)

logger = logging.getLogger(__name__)


def get_metadata_url(network_name: str) -> str:
return f'{GITHUB_RAW_URL}/skalenetwork/skale-network/master/metadata/{network_name}/chains.json'


async def download_metadata(session, network_name: str) -> Dict:
url = get_metadata_url(network_name)
async def download_metadata(session: ClientSession, network_name: str) -> Dict:
"""Download and parse network metadata."""
url = f'{GITHUB_RAW_URL}/skalenetwork/skale-network/master/metadata/{network_name}/chains.json'
async with session.get(url) as response:
metadata_srt = await response.text()
return json.loads(metadata_srt)
metadata_str = await response.text()
return json.loads(metadata_str)


def get_empty_address_counter() -> AddressCounter:
"""Return an empty counter structure with zero values."""
return {
'gas_usage_count': '0',
'token_transfers_count': '0',
Expand All @@ -65,47 +68,59 @@ def get_empty_address_counter() -> AddressCounter:
}


async def fetch_address_data(session: ClientSession, url: str) -> AddressCounter:
async def fetch_address_data(
session: ClientSession, url: str, chain_name: str, app_name: str, address: str
) -> AddressCounter:
"""Fetch and process address data, updating DB immediately."""
async with session.get(url) as response:
if response.status == 404:
data = await response.json()
if data.get('message') == 'Not found':
logger.warning(f'Address not found at {url}. Returning empty counter.')
return get_empty_address_counter()
response.raise_for_status()
return await response.json()

current_data: Dict = await response.json()

await update_transaction_counts(chain_name, app_name, address, current_data)

today = datetime.now().date()
yesterday = today - timedelta(days=1)
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)

transactions_last_day = await get_address_transaction_counts(
chain_name, app_name, address, yesterday, yesterday
)
transactions_last_7_days = await get_address_transaction_counts(
chain_name, app_name, address, week_ago, yesterday
)
transactions_last_30_days = await get_address_transaction_counts(
chain_name, app_name, address, month_ago, yesterday
)

result: AddressCounter = {
'gas_usage_count': str(current_data.get('gas_usage_count', '0')),
'token_transfers_count': str(current_data.get('token_transfers_count', '0')),
'transactions_count': str(current_data.get('transactions_count', '0')),
'validations_count': str(current_data.get('validations_count', '0')),
'transactions_last_day': transactions_last_day,
'transactions_last_7_days': transactions_last_7_days,
'transactions_last_30_days': transactions_last_30_days,
}
logger.info(f'Fetched data for {address} at {url}: {result}')
return result


async def get_address_counters(
session: ClientSession, network: str, chain_name: str, app_name: str, address: str
) -> AddressCounter:
"""Get address counters with retries."""
url = get_address_counters_url(network, chain_name, address)

for attempt in range(API_ERROR_RETRIES):
try:
data = await fetch_address_data(session, url)

today = datetime.now().date()
yesterday = today - timedelta(days=1)
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)

transactions_last_day = await get_address_transaction_counts(
chain_name, app_name, address, yesterday, yesterday
)
transactions_last_7_days = await get_address_transaction_counts(
chain_name, app_name, address, week_ago, yesterday
)
transactions_last_30_days = await get_address_transaction_counts(
chain_name, app_name, address, month_ago, yesterday
)

data['transactions_last_day'] = transactions_last_day
data['transactions_last_7_days'] = transactions_last_7_days
data['transactions_last_30_days'] = transactions_last_30_days

await update_transaction_counts(chain_name, app_name, address, data)

return data
return await fetch_address_data(session, url, chain_name, app_name, address)
except ClientError as e:
if attempt < API_ERROR_RETRIES - 1:
logger.warning(f'Attempt {attempt + 1} failed for {url}. Retrying... Error: {e}')
Expand All @@ -117,19 +132,22 @@ async def get_address_counters(


async def get_all_address_counters(
session, network, chain_name, app_name, addresses
session: ClientSession, network: str, chain_name: str, app_name: str, addresses: List[str]
) -> AddressCountersMap:
results = [
await get_address_counters(session, network, chain_name, app_name, address)
"""Get counters for multiple addresses concurrently."""
tasks = [
get_address_counters(session, network, chain_name, app_name, address)
for address in addresses
]
return dict(zip(addresses, results))
results = await asyncio.gather(*tasks)
return {AddressType(addr): counter for addr, counter in zip(addresses, results)}


async def fetch_counters_for_app(
session, network_name, chain_name, app_name, app_info
session: ClientSession, network_name: str, chain_name: str, app_name: str, app_info: Dict
) -> Tuple[str, Optional[AddressCountersMap]]:
logger.info(f'fetching counters for app {app_name}')
"""Fetch counters for a specific app."""
logger.info(f'Fetching counters for app {app_name}')
if 'contracts' in app_info:
counters = await get_all_address_counters(
session, network_name, chain_name, app_name, app_info['contracts']
Expand All @@ -138,7 +156,10 @@ async def fetch_counters_for_app(
return app_name, None


async def fetch_counters_for_apps(session, chain_info, network_name, chain_name):
async def fetch_counters_for_apps(
session: ClientSession, chain_info: Dict, network_name: str, chain_name: str
) -> List[Tuple[str, Optional[AddressCountersMap]]]:
"""Fetch counters for all apps in a chain concurrently."""
tasks = [
fetch_counters_for_app(session, network_name, chain_name, app_name, app_info)
for app_name, app_info in chain_info['apps'].items()
Expand All @@ -147,6 +168,7 @@ async def fetch_counters_for_apps(session, chain_info, network_name, chain_name)


async def collect_metrics(network_name: str) -> MetricsData:
"""Collect all metrics and save to file."""
async with aiohttp.ClientSession() as session:
metadata = await download_metadata(session, network_name)
metrics: Dict[str, ChainMetrics] = {}
Expand Down

0 comments on commit 2cf38af

Please sign in to comment.