Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure metrics collector #76

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions metrics/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
ruff==0.6.4
pytest==8.3.3
Faker==28.4.1
eth-typing==4.1.0
pytest-aiohttp==1.0.5
pytest-aiohttp==1.0.5
eth-typing==4.0.0
eth-utils==4.0.0
110 changes: 66 additions & 44 deletions metrics/src/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Tuple, Optional, Dict
from typing import Tuple, Optional, Dict, List
from aiohttp import ClientError, ClientSession

from src.explorer import get_address_counters_url, get_chain_stats
Expand All @@ -36,24 +36,27 @@
GITHUB_RAW_URL,
OFFCHAIN_KEY,
)
from src.metrics_types import AddressCounter, AddressCountersMap, MetricsData, ChainMetrics

from src.metrics_types import (
AddressCounter,
AddressCountersMap,
MetricsData,
ChainMetrics,
AddressType,
)

logger = logging.getLogger(__name__)


def get_metadata_url(network_name: str) -> str:
return f'{GITHUB_RAW_URL}/skalenetwork/skale-network/master/metadata/{network_name}/chains.json'


async def download_metadata(session, network_name: str) -> Dict:
url = get_metadata_url(network_name)
async def download_metadata(session: ClientSession, network_name: str) -> Dict:
"""Download and parse network metadata."""
url = f'{GITHUB_RAW_URL}/skalenetwork/skale-network/master/metadata/{network_name}/chains.json'
async with session.get(url) as response:
metadata_srt = await response.text()
return json.loads(metadata_srt)
metadata_str = await response.text()
return json.loads(metadata_str)


def get_empty_address_counter() -> AddressCounter:
"""Return an empty counter structure with zero values."""
return {
'gas_usage_count': '0',
'token_transfers_count': '0',
Expand All @@ -65,47 +68,59 @@ def get_empty_address_counter() -> AddressCounter:
}


async def fetch_address_data(session: ClientSession, url: str) -> AddressCounter:
async def fetch_address_data(
session: ClientSession, url: str, chain_name: str, app_name: str, address: str
) -> AddressCounter:
"""Fetch and process address data, updating DB immediately."""
async with session.get(url) as response:
if response.status == 404:
data = await response.json()
if data.get('message') == 'Not found':
logger.warning(f'Address not found at {url}. Returning empty counter.')
return get_empty_address_counter()
response.raise_for_status()
return await response.json()

current_data: Dict = await response.json()

await update_transaction_counts(chain_name, app_name, address, current_data)

today = datetime.now().date()
yesterday = today - timedelta(days=1)
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)

transactions_last_day = await get_address_transaction_counts(
chain_name, app_name, address, yesterday, yesterday
)
transactions_last_7_days = await get_address_transaction_counts(
chain_name, app_name, address, week_ago, yesterday
)
transactions_last_30_days = await get_address_transaction_counts(
chain_name, app_name, address, month_ago, yesterday
)

result: AddressCounter = {
'gas_usage_count': str(current_data.get('gas_usage_count', '0')),
'token_transfers_count': str(current_data.get('token_transfers_count', '0')),
'transactions_count': str(current_data.get('transactions_count', '0')),
'validations_count': str(current_data.get('validations_count', '0')),
'transactions_last_day': transactions_last_day,
'transactions_last_7_days': transactions_last_7_days,
'transactions_last_30_days': transactions_last_30_days,
}
logger.info(f'Fetched data for {address} at {url}: {result}')
return result


async def get_address_counters(
session: ClientSession, network: str, chain_name: str, app_name: str, address: str
) -> AddressCounter:
"""Get address counters with retries."""
url = get_address_counters_url(network, chain_name, address)

for attempt in range(API_ERROR_RETRIES):
try:
data = await fetch_address_data(session, url)

today = datetime.now().date()
yesterday = today - timedelta(days=1)
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)

transactions_last_day = await get_address_transaction_counts(
chain_name, app_name, address, yesterday, yesterday
)
transactions_last_7_days = await get_address_transaction_counts(
chain_name, app_name, address, week_ago, yesterday
)
transactions_last_30_days = await get_address_transaction_counts(
chain_name, app_name, address, month_ago, yesterday
)

data['transactions_last_day'] = transactions_last_day
data['transactions_last_7_days'] = transactions_last_7_days
data['transactions_last_30_days'] = transactions_last_30_days

await update_transaction_counts(chain_name, app_name, address, data)

return data
return await fetch_address_data(session, url, chain_name, app_name, address)
except ClientError as e:
if attempt < API_ERROR_RETRIES - 1:
logger.warning(f'Attempt {attempt + 1} failed for {url}. Retrying... Error: {e}')
Expand All @@ -117,19 +132,22 @@ async def get_address_counters(


async def get_all_address_counters(
session, network, chain_name, app_name, addresses
session: ClientSession, network: str, chain_name: str, app_name: str, addresses: List[str]
) -> AddressCountersMap:
results = [
await get_address_counters(session, network, chain_name, app_name, address)
"""Get counters for multiple addresses concurrently."""
tasks = [
get_address_counters(session, network, chain_name, app_name, address)
for address in addresses
]
return dict(zip(addresses, results))
results = await asyncio.gather(*tasks)
return {AddressType(addr): counter for addr, counter in zip(addresses, results)}


async def fetch_counters_for_app(
session, network_name, chain_name, app_name, app_info
session: ClientSession, network_name: str, chain_name: str, app_name: str, app_info: Dict
) -> Tuple[str, Optional[AddressCountersMap]]:
logger.info(f'fetching counters for app {app_name}')
"""Fetch counters for a specific app."""
logger.info(f'Fetching counters for app {app_name}')
if 'contracts' in app_info:
counters = await get_all_address_counters(
session, network_name, chain_name, app_name, app_info['contracts']
Expand All @@ -138,7 +156,10 @@ async def fetch_counters_for_app(
return app_name, None


async def fetch_counters_for_apps(session, chain_info, network_name, chain_name):
async def fetch_counters_for_apps(
session: ClientSession, chain_info: Dict, network_name: str, chain_name: str
) -> List[Tuple[str, Optional[AddressCountersMap]]]:
"""Fetch counters for all apps in a chain concurrently."""
tasks = [
fetch_counters_for_app(session, network_name, chain_name, app_name, app_info)
for app_name, app_info in chain_info['apps'].items()
Expand All @@ -147,6 +168,7 @@ async def fetch_counters_for_apps(session, chain_info, network_name, chain_name)


async def collect_metrics(network_name: str) -> MetricsData:
"""Collect all metrics and save to file."""
async with aiohttp.ClientSession() as session:
metadata = await download_metadata(session, network_name)
metrics: Dict[str, ChainMetrics] = {}
Expand Down
20 changes: 20 additions & 0 deletions metrics/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
TEST_NETWORK = 'testnet'
TEST_CHAIN = 'chain2'
TEST_ADDRESS = '0x1234'
TEST_APP = 'test-app'


@pytest.fixture
Expand Down Expand Up @@ -82,6 +83,25 @@ def latest_day_counters():
return get_latest_day_counters()


@pytest.fixture
def mock_db_data():
return {
'transactions_last_day': 50,
'transactions_last_7_days': 300,
'transactions_last_30_days': 1000,
}


@pytest.fixture
def mock_address_data():
return {
'gas_usage_count': '16935',
'token_transfers_count': '174',
'transactions_count': '1734',
'validations_count': '22',
}


@pytest.fixture
def mock_chain_stats_data():
return CHAIN_STATS
Expand Down
41 changes: 33 additions & 8 deletions metrics/tests/test_metrics_collector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from unittest.mock import patch
from typing import Dict
from src.collector import get_chain_stats, fetch_address_data
from conftest import TEST_NETWORK, TEST_CHAIN
from conftest import TEST_NETWORK, TEST_CHAIN, TEST_ADDRESS, TEST_APP

pytestmark = pytest.mark.asyncio
pytest_plugins = ('pytest_asyncio',)
Expand All @@ -13,10 +14,34 @@ async def test_get_chain_stats(mock_chain_stats_data: Dict, client, mock_explore
assert result == mock_chain_stats_data


async def test_fetch_address_data(client, mock_explorer_url) -> None:
result = await fetch_address_data(client, '/api/v2/addresses/0x1234/counters')
assert isinstance(result, dict)
assert result['gas_usage_count'] == '16935'
assert result['token_transfers_count'] == '174'
assert result['transactions_count'] == '1734'
assert result['validations_count'] == '22'
async def test_fetch_address_data_success(
client, mock_explorer_url, mock_address_data: Dict[str, str], mock_db_data: Dict[str, int]
) -> None:
with (
patch('src.collector.update_transaction_counts') as mock_update,
patch('src.collector.get_address_transaction_counts') as mock_get_counts,
):
mock_get_counts.side_effect = [
mock_db_data['transactions_last_day'],
mock_db_data['transactions_last_7_days'],
mock_db_data['transactions_last_30_days'],
]

result = await fetch_address_data(
client, '/api/v2/addresses/0x1234/counters', TEST_CHAIN, TEST_APP, TEST_ADDRESS
)

# Verify the result type and content
assert isinstance(result, dict)
assert result['gas_usage_count'] == mock_address_data['gas_usage_count']
assert result['token_transfers_count'] == mock_address_data['token_transfers_count']
assert result['transactions_count'] == mock_address_data['transactions_count']
assert result['validations_count'] == mock_address_data['validations_count']

# Verify historical data
assert result['transactions_last_day'] == mock_db_data['transactions_last_day']
assert result['transactions_last_7_days'] == mock_db_data['transactions_last_7_days']
assert result['transactions_last_30_days'] == mock_db_data['transactions_last_30_days']

# Verify database was updated
mock_update.assert_called_once()
Loading