From 2cf38afe130c334274036c71fc9d8c0a47e96906 Mon Sep 17 00:00:00 2001 From: Dmytro Date: Fri, 1 Nov 2024 18:05:02 +0000 Subject: [PATCH 1/2] Restructure metrics collector --- metrics/src/collector.py | 110 +++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 44 deletions(-) diff --git a/metrics/src/collector.py b/metrics/src/collector.py index 187a807..fb5208e 100644 --- a/metrics/src/collector.py +++ b/metrics/src/collector.py @@ -22,7 +22,7 @@ import asyncio import aiohttp from datetime import datetime, timedelta -from typing import Tuple, Optional, Dict +from typing import Tuple, Optional, Dict, List from aiohttp import ClientError, ClientSession from src.explorer import get_address_counters_url, get_chain_stats @@ -36,24 +36,27 @@ GITHUB_RAW_URL, OFFCHAIN_KEY, ) -from src.metrics_types import AddressCounter, AddressCountersMap, MetricsData, ChainMetrics - +from src.metrics_types import ( + AddressCounter, + AddressCountersMap, + MetricsData, + ChainMetrics, + AddressType, +) logger = logging.getLogger(__name__) -def get_metadata_url(network_name: str) -> str: - return f'{GITHUB_RAW_URL}/skalenetwork/skale-network/master/metadata/{network_name}/chains.json' - - -async def download_metadata(session, network_name: str) -> Dict: - url = get_metadata_url(network_name) +async def download_metadata(session: ClientSession, network_name: str) -> Dict: + """Download and parse network metadata.""" + url = f'{GITHUB_RAW_URL}/skalenetwork/skale-network/master/metadata/{network_name}/chains.json' async with session.get(url) as response: - metadata_srt = await response.text() - return json.loads(metadata_srt) + metadata_str = await response.text() + return json.loads(metadata_str) def get_empty_address_counter() -> AddressCounter: + """Return an empty counter structure with zero values.""" return { 'gas_usage_count': '0', 'token_transfers_count': '0', @@ -65,7 +68,10 @@ def get_empty_address_counter() -> AddressCounter: } -async def fetch_address_data(session: ClientSession, url: str) -> AddressCounter: +async def fetch_address_data( + session: ClientSession, url: str, chain_name: str, app_name: str, address: str +) -> AddressCounter: + """Fetch and process address data, updating DB immediately.""" async with session.get(url) as response: if response.status == 404: data = await response.json() @@ -73,39 +79,48 @@ async def fetch_address_data(session: ClientSession, url: str) -> AddressCounter logger.warning(f'Address not found at {url}. Returning empty counter.') return get_empty_address_counter() response.raise_for_status() - return await response.json() + + current_data: Dict = await response.json() + + await update_transaction_counts(chain_name, app_name, address, current_data) + + today = datetime.now().date() + yesterday = today - timedelta(days=1) + week_ago = today - timedelta(days=7) + month_ago = today - timedelta(days=30) + + transactions_last_day = await get_address_transaction_counts( + chain_name, app_name, address, yesterday, yesterday + ) + transactions_last_7_days = await get_address_transaction_counts( + chain_name, app_name, address, week_ago, yesterday + ) + transactions_last_30_days = await get_address_transaction_counts( + chain_name, app_name, address, month_ago, yesterday + ) + + result: AddressCounter = { + 'gas_usage_count': str(current_data.get('gas_usage_count', '0')), + 'token_transfers_count': str(current_data.get('token_transfers_count', '0')), + 'transactions_count': str(current_data.get('transactions_count', '0')), + 'validations_count': str(current_data.get('validations_count', '0')), + 'transactions_last_day': transactions_last_day, + 'transactions_last_7_days': transactions_last_7_days, + 'transactions_last_30_days': transactions_last_30_days, + } + logger.info(f'Fetched data for {address} at {url}: {result}') + return result async def get_address_counters( session: ClientSession, network: str, chain_name: str, app_name: str, address: str ) -> AddressCounter: + """Get address counters with retries.""" url = get_address_counters_url(network, chain_name, address) + for attempt in range(API_ERROR_RETRIES): try: - data = await fetch_address_data(session, url) - - today = datetime.now().date() - yesterday = today - timedelta(days=1) - week_ago = today - timedelta(days=7) - month_ago = today - timedelta(days=30) - - transactions_last_day = await get_address_transaction_counts( - chain_name, app_name, address, yesterday, yesterday - ) - transactions_last_7_days = await get_address_transaction_counts( - chain_name, app_name, address, week_ago, yesterday - ) - transactions_last_30_days = await get_address_transaction_counts( - chain_name, app_name, address, month_ago, yesterday - ) - - data['transactions_last_day'] = transactions_last_day - data['transactions_last_7_days'] = transactions_last_7_days - data['transactions_last_30_days'] = transactions_last_30_days - - await update_transaction_counts(chain_name, app_name, address, data) - - return data + return await fetch_address_data(session, url, chain_name, app_name, address) except ClientError as e: if attempt < API_ERROR_RETRIES - 1: logger.warning(f'Attempt {attempt + 1} failed for {url}. Retrying... Error: {e}') @@ -117,19 +132,22 @@ async def get_address_counters( async def get_all_address_counters( - session, network, chain_name, app_name, addresses + session: ClientSession, network: str, chain_name: str, app_name: str, addresses: List[str] ) -> AddressCountersMap: - results = [ - await get_address_counters(session, network, chain_name, app_name, address) + """Get counters for multiple addresses concurrently.""" + tasks = [ + get_address_counters(session, network, chain_name, app_name, address) for address in addresses ] - return dict(zip(addresses, results)) + results = await asyncio.gather(*tasks) + return {AddressType(addr): counter for addr, counter in zip(addresses, results)} async def fetch_counters_for_app( - session, network_name, chain_name, app_name, app_info + session: ClientSession, network_name: str, chain_name: str, app_name: str, app_info: Dict ) -> Tuple[str, Optional[AddressCountersMap]]: - logger.info(f'fetching counters for app {app_name}') + """Fetch counters for a specific app.""" + logger.info(f'Fetching counters for app {app_name}') if 'contracts' in app_info: counters = await get_all_address_counters( session, network_name, chain_name, app_name, app_info['contracts'] @@ -138,7 +156,10 @@ async def fetch_counters_for_app( return app_name, None -async def fetch_counters_for_apps(session, chain_info, network_name, chain_name): +async def fetch_counters_for_apps( + session: ClientSession, chain_info: Dict, network_name: str, chain_name: str +) -> List[Tuple[str, Optional[AddressCountersMap]]]: + """Fetch counters for all apps in a chain concurrently.""" tasks = [ fetch_counters_for_app(session, network_name, chain_name, app_name, app_info) for app_name, app_info in chain_info['apps'].items() @@ -147,6 +168,7 @@ async def fetch_counters_for_apps(session, chain_info, network_name, chain_name) async def collect_metrics(network_name: str) -> MetricsData: + """Collect all metrics and save to file.""" async with aiohttp.ClientSession() as session: metadata = await download_metadata(session, network_name) metrics: Dict[str, ChainMetrics] = {} From ec820b798a13b8755c0de7090dfec2f66aca8c6f Mon Sep 17 00:00:00 2001 From: Dmytro Date: Fri, 1 Nov 2024 18:33:48 +0000 Subject: [PATCH 2/2] Update tests and dev dependencies --- metrics/requirements-dev.txt | 5 +-- metrics/tests/conftest.py | 20 ++++++++++++ metrics/tests/test_metrics_collector.py | 41 ++++++++++++++++++++----- 3 files changed, 56 insertions(+), 10 deletions(-) diff --git a/metrics/requirements-dev.txt b/metrics/requirements-dev.txt index 67a0b14..f5896f0 100644 --- a/metrics/requirements-dev.txt +++ b/metrics/requirements-dev.txt @@ -1,5 +1,6 @@ ruff==0.6.4 pytest==8.3.3 Faker==28.4.1 -eth-typing==4.1.0 -pytest-aiohttp==1.0.5 \ No newline at end of file +pytest-aiohttp==1.0.5 +eth-typing==4.0.0 +eth-utils==4.0.0 \ No newline at end of file diff --git a/metrics/tests/conftest.py b/metrics/tests/conftest.py index d21cf29..29d2501 100644 --- a/metrics/tests/conftest.py +++ b/metrics/tests/conftest.py @@ -43,6 +43,7 @@ TEST_NETWORK = 'testnet' TEST_CHAIN = 'chain2' TEST_ADDRESS = '0x1234' +TEST_APP = 'test-app' @pytest.fixture @@ -82,6 +83,25 @@ def latest_day_counters(): return get_latest_day_counters() +@pytest.fixture +def mock_db_data(): + return { + 'transactions_last_day': 50, + 'transactions_last_7_days': 300, + 'transactions_last_30_days': 1000, + } + + +@pytest.fixture +def mock_address_data(): + return { + 'gas_usage_count': '16935', + 'token_transfers_count': '174', + 'transactions_count': '1734', + 'validations_count': '22', + } + + @pytest.fixture def mock_chain_stats_data(): return CHAIN_STATS diff --git a/metrics/tests/test_metrics_collector.py b/metrics/tests/test_metrics_collector.py index 3738cf3..1fa579f 100644 --- a/metrics/tests/test_metrics_collector.py +++ b/metrics/tests/test_metrics_collector.py @@ -1,7 +1,8 @@ import pytest +from unittest.mock import patch from typing import Dict from src.collector import get_chain_stats, fetch_address_data -from conftest import TEST_NETWORK, TEST_CHAIN +from conftest import TEST_NETWORK, TEST_CHAIN, TEST_ADDRESS, TEST_APP pytestmark = pytest.mark.asyncio pytest_plugins = ('pytest_asyncio',) @@ -13,10 +14,34 @@ async def test_get_chain_stats(mock_chain_stats_data: Dict, client, mock_explore assert result == mock_chain_stats_data -async def test_fetch_address_data(client, mock_explorer_url) -> None: - result = await fetch_address_data(client, '/api/v2/addresses/0x1234/counters') - assert isinstance(result, dict) - assert result['gas_usage_count'] == '16935' - assert result['token_transfers_count'] == '174' - assert result['transactions_count'] == '1734' - assert result['validations_count'] == '22' +async def test_fetch_address_data_success( + client, mock_explorer_url, mock_address_data: Dict[str, str], mock_db_data: Dict[str, int] +) -> None: + with ( + patch('src.collector.update_transaction_counts') as mock_update, + patch('src.collector.get_address_transaction_counts') as mock_get_counts, + ): + mock_get_counts.side_effect = [ + mock_db_data['transactions_last_day'], + mock_db_data['transactions_last_7_days'], + mock_db_data['transactions_last_30_days'], + ] + + result = await fetch_address_data( + client, '/api/v2/addresses/0x1234/counters', TEST_CHAIN, TEST_APP, TEST_ADDRESS + ) + + # Verify the result type and content + assert isinstance(result, dict) + assert result['gas_usage_count'] == mock_address_data['gas_usage_count'] + assert result['token_transfers_count'] == mock_address_data['token_transfers_count'] + assert result['transactions_count'] == mock_address_data['transactions_count'] + assert result['validations_count'] == mock_address_data['validations_count'] + + # Verify historical data + assert result['transactions_last_day'] == mock_db_data['transactions_last_day'] + assert result['transactions_last_7_days'] == mock_db_data['transactions_last_7_days'] + assert result['transactions_last_30_days'] == mock_db_data['transactions_last_30_days'] + + # Verify database was updated + mock_update.assert_called_once()