From 62b68f9f6b485eb7716609b981994ca71d825207 Mon Sep 17 00:00:00 2001 From: tonynguyen-ccl <115619462+tonynguyen-ccl@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:35:09 +0800 Subject: [PATCH] Create new aptos collector (#62) * Refactor the HTTP interface to include get request * Fix test interfaces, fix pylint * Add first version of Aptos collector * Add Aptos to the registries * Working version of Aptos * Fix styling * Add test cases for Aptos * Fix name for cached_rest_api_get * Fix name for cached_json_rest_api_get --- src/collectors.py | 42 +++++++++++ src/configuration.py | 2 +- src/helpers.py | 34 ++++++--- src/interfaces.py | 81 +++++++++++++------- src/registries.py | 2 + src/test_collectors.py | 84 +++++++++++++++++++++ src/test_interfaces.py | 43 ++++++++++- src/test_registries.py | 10 +++ src/tests/fixtures/configuration_aptos.yaml | 12 +++ 9 files changed, 271 insertions(+), 39 deletions(-) create mode 100644 src/tests/fixtures/configuration_aptos.yaml diff --git a/src/collectors.py b/src/collectors.py index 7562a83..5afaa26 100644 --- a/src/collectors.py +++ b/src/collectors.py @@ -351,3 +351,45 @@ def block_height(self): def latency(self): """Returns connection latency.""" return self.interface.latest_query_latency + + +class AptosCollector(): + """A collector to fetch information about Aptos endpoints.""" + + def __init__(self, url, labels, chain_id, **client_parameters): + + self.labels = labels + self.chain_id = chain_id + self.interface = HttpsInterface(url, client_parameters.get('open_timeout'), + client_parameters.get('ping_timeout')) + + self._logger_metadata = { + 'component': 'AptosCollector', + 'url': strip_url(url) + } + + def alive(self): + """Returns true if endpoint is alive, false if not.""" + # Run cached query because we can also fetch client version from this + # later on. This will save us an RPC call per run. + return self.interface.cached_json_rest_api_get() is not None + + def block_height(self): + """Runs a cached query to return block height""" + blockchain_info = self.interface.cached_json_rest_api_get() + return validate_dict_and_return_key_value( + blockchain_info, 'block_height', self._logger_metadata, to_number=True) + + def client_version(self): + """Runs a cached query to return client version.""" + blockchain_info = self.interface.cached_json_rest_api_get() + version = validate_dict_and_return_key_value( + blockchain_info, 'git_hash', self._logger_metadata, stringify=True) + if version is None: + return None + client_version = {"client_version": version} + return client_version + + def latency(self): + """Returns connection latency.""" + return self.interface.latest_query_latency diff --git a/src/configuration.py b/src/configuration.py index 111f8d7..93788df 100644 --- a/src/configuration.py +++ b/src/configuration.py @@ -47,7 +47,7 @@ def endpoints(self): def _load_configuration(self): allowed_providers = self._load_validation_file() supported_collectors = ('evm', 'cardano', 'conflux', 'solana', - 'bitcoin', 'doge', 'filecoin', 'starknet') + 'bitcoin', 'doge', 'filecoin', 'starknet', 'aptos') configuration_schema = Schema({ 'blockchain': diff --git a/src/helpers.py b/src/helpers.py index bc61508..d2fcd9e 100644 --- a/src/helpers.py +++ b/src/helpers.py @@ -1,9 +1,9 @@ """Module for providing useful functions accessible globally.""" import urllib.parse +import json from json.decoder import JSONDecodeError from jsonrpcclient import Ok, parse_json - from log import logger @@ -13,18 +13,22 @@ def strip_url(url) -> str: return urllib.parse.urlparse(url).hostname -def return_and_validate_rpc_json_result(message: str, logger_metadata) -> dict: +def return_and_validate_json_result(message: str, json_type: str,logger_metadata) -> dict: """Loads json rpc response text and validates the response - as per JSON-RPC 2.0 Specification. In case the message is + as per JSON-RPC 2.0 Specification or JSON parsable if type is REST. In case the message is not valid it returns None. This method is used by both HTTPS and Websocket Interface.""" try: - parsed = parse_json(message) - if isinstance(parsed, Ok): # pylint: disable=no-else-return - return parsed.result + if json_type=='RPC': + parsed = parse_json(message) + if isinstance(parsed, Ok): # pylint: disable=no-else-return + return parsed.result + else: + logger.error('Error in RPC message.', + message=message, **logger_metadata) else: - logger.error('Error in RPC message.', - message=message, **logger_metadata) + parsed = json.loads(message) + return parsed except (JSONDecodeError, KeyError) as error: logger.error('Invalid JSON RPC object in RPC message.', message=message, @@ -32,8 +36,15 @@ def return_and_validate_rpc_json_result(message: str, logger_metadata) -> dict: **logger_metadata) return None +def return_and_validate_rpc_json_result(message: str, logger_metadata) -> dict: + """Validate that message is JSON parsable and per JSON-RPC specs""" + return return_and_validate_json_result(message,json_type='RPC',logger_metadata=logger_metadata) + +def return_and_validate_rest_api_json_result(message: str, logger_metadata) -> dict: + """Validate that message is JSON parsable""" + return return_and_validate_json_result(message,json_type='REST',logger_metadata=logger_metadata) -def validate_dict_and_return_key_value(data, key, logger_metadata, stringify=False): +def validate_dict_and_return_key_value(data, key, logger_metadata, stringify=False, to_number=False): # pylint: disable=line-too-long """Validates that a dict is provided and returns the key value either in original form or as a string""" if isinstance(data, dict): @@ -41,6 +52,11 @@ def validate_dict_and_return_key_value(data, key, logger_metadata, stringify=Fal if value is not None: if stringify: return str(value) + if to_number: + try: + return float(value) + except ValueError: + return None return value logger.error("Provided data is not a dict or has no value for key", key=key, diff --git a/src/interfaces.py b/src/interfaces.py index 741ee4e..71ef8dc 100644 --- a/src/interfaces.py +++ b/src/interfaces.py @@ -9,7 +9,7 @@ import requests from urllib3 import Timeout -from helpers import strip_url, return_and_validate_rpc_json_result +from helpers import strip_url, return_and_validate_rpc_json_result, return_and_validate_rest_api_json_result # pylint: disable=line-too-long from cache import Cache from log import logger @@ -37,37 +37,43 @@ def latest_query_latency(self): self._latest_query_latency = None return latency - def _return_and_validate_post_request(self, payload: dict) -> str: - """Sends a POST request and validates the http response code. If - response code is OK, it returns the response.text, otherwise - it returns None. - """ + def _return_and_validate_request(self, method='GET', payload=None, params=None): + """Sends a GET or POST request and validates the http response code.""" with self.session as ses: try: - self._logger.debug("Querying endpoint.", - payload=payload, - **self._logger_metadata) + self._logger.debug(f"Querying endpoint with {method}.", + payload=payload, + params=params, + **self._logger_metadata) start_time = perf_counter() - req = ses.post(self.url, - json=payload, - timeout=Timeout(connect=self.connect_timeout, - read=self.response_timeout)) - if req.status_code == requests.codes.ok: # pylint: disable=no-member + if method.upper() == 'GET': + req = ses.get(self.url, + params=params, + timeout=Timeout(connect=self.connect_timeout, + read=self.response_timeout)) + elif method.upper() == 'POST': + req = ses.post(self.url, + json=payload, + timeout=Timeout(connect=self.connect_timeout, + read=self.response_timeout)) + else: + raise ValueError(f"Unsupported HTTP method: {method}") + + if req.status_code == requests.codes.ok: # pylint: disable=no-member self._latest_query_latency = perf_counter() - start_time return req.text - except (IOError, requests.HTTPError, - json.decoder.JSONDecodeError) as error: - self._logger.error("Problem while sending a post request.", - payload=payload, - error=error, - **self._logger_metadata) - return None + except (IOError, requests.HTTPError, json.decoder.JSONDecodeError, ValueError) as error: + self._logger.error(f"Problem while sending a {method} request.", + payload=payload, + params=params, + error=error, + **self._logger_metadata) return None def json_rpc_post(self, payload): """Checks the validity of a successful json-rpc response. If any of the validations fail, the method returns type None. """ - response = self._return_and_validate_post_request(payload) + response = self._return_and_validate_request(method='POST', payload=payload) if response is not None: result = return_and_validate_rpc_json_result( response, self._logger_metadata) @@ -76,20 +82,41 @@ def json_rpc_post(self, payload): return None def cached_json_rpc_post(self, payload: dict): - """Calls json_rpc_post and stores the result in in-memory - cache, by using payload as key.Method will always return - cached value after the first call. Cache never expires.""" - cache_key = str(payload) + """Calls json_rpc_post and stores the result in in-memory cache.""" + cache_key = f"rpc:{str(payload)}" if self.cache.is_cached(cache_key): return_value = self.cache.retrieve_key_value(cache_key) return return_value - value = self.json_rpc_post(payload) + value = self.json_rpc_post(payload=payload) if value is not None: self.cache.store_key_value(cache_key, value) return value + def json_rest_api_get(self, params: dict = None): + """Checks the validity of a successful json-rpc response. If any of the + validations fail, the method returns type None. """ + response = self._return_and_validate_request(method='GET', params=params) + if response is not None: + result = return_and_validate_rest_api_json_result( + response, self._logger_metadata) + if result is not None: + return result + return None + + def cached_json_rest_api_get(self, params: dict = None): + """Calls json_rest_api_get and stores the result in in-memory cache.""" + cache_key = f"rest:{str(params)}" + + if self.cache.is_cached(cache_key): + return_value = self.cache.retrieve_key_value(cache_key) + return return_value + + value = self.json_rest_api_get(params) + if value is not None: + self.cache.store_key_value(cache_key, value) + return value class WebsocketSubscription(threading.Thread): # pylint: disable=too-many-instance-attributes """A thread class used to subscribe and track diff --git a/src/registries.py b/src/registries.py index 726ba38..7ad7e80 100644 --- a/src/registries.py +++ b/src/registries.py @@ -81,6 +81,8 @@ def get_collector_registry(self) -> list: collector = collectors.SolanaCollector case "starknet", "starknet": collector = collectors.StarknetCollector + case "aptos", "aptos": + collector = collectors.AptosCollector case "evm", other: # pylint: disable=unused-variable collector = collectors.EvmCollector if collector is None: diff --git a/src/test_collectors.py b/src/test_collectors.py index 53815df..e296289 100644 --- a/src/test_collectors.py +++ b/src/test_collectors.py @@ -650,3 +650,87 @@ def test_latency(self): """Tests that the latency is obtained from the interface based on latest_query_latency""" self.mocked_connection.return_value.latest_query_latency = 0.123 self.assertEqual(0.123, self.starknet_collector.latency()) + +class TestAptosCollector(TestCase): + """Tests the Aptos collector class""" + + def setUp(self): + self.url = "https://test.com" + self.labels = ["dummy", "labels"] + self.chain_id = 123 + self.open_timeout = 8 + self.ping_timeout = 9 + self.client_params = { + "open_timeout": self.open_timeout, "ping_timeout": self.ping_timeout} + with mock.patch('collectors.HttpsInterface') as mocked_connection: + self.aptos_collector = collectors.AptosCollector( + self.url, self.labels, self.chain_id, **self.client_params) + self.mocked_connection = mocked_connection + + def test_logger_metadata(self): + """Validate logger metadata. Makes sure url is stripped by helpers.strip_url function.""" + expected_metadata = { + 'component': 'AptosCollector', 'url': 'test.com'} + self.assertEqual(expected_metadata, + self.aptos_collector._logger_metadata) + + def test_https_interface_created(self): + """Tests that the Aptos collector calls the https interface with the correct args""" + self.mocked_connection.assert_called_once_with( + self.url, self.open_timeout, self.ping_timeout) + + def test_interface_attribute_exists(self): + """Tests that the interface attribute exists.""" + self.assertTrue(hasattr(self.aptos_collector, 'interface')) + + def test_alive_call(self): + """Tests the alive function uses the correct call""" + self.aptos_collector.alive() + self.mocked_connection.return_value.cached_json_rest_api_get.assert_called_once() + + def test_alive_false(self): + """Tests the alive function returns false when get returns None""" + self.mocked_connection.return_value.cached_json_rest_api_get.return_value = None + result = self.aptos_collector.alive() + self.assertFalse(result) + + def test_block_height(self): + """Tests the block_height function uses the correct call to get block height""" + self.aptos_collector.block_height() + self.mocked_connection.return_value.cached_json_rest_api_get.assert_called_once() + + def test_block_height_returns_none(self): + """Tests that the block height returns None if cached_json_rest_api_get returns None""" + self.mocked_connection.return_value.cached_json_rest_api_get.return_value = None + result = self.aptos_collector.block_height() + self.assertIsNone(result) + + def test_client_version(self): + """Tests the client_version function uses the correct call to get client version""" + self.aptos_collector.client_version() + self.mocked_connection.return_value.cached_json_rest_api_get.assert_called_once() + + def test_client_version_get_git_hash(self): + """Tests that the client version is returned as a string with the git_hash key""" + self.mocked_connection.return_value.cached_json_rest_api_get.return_value = { + "git_hash": "abcdef123"} + result = self.aptos_collector.client_version() + self.assertEqual({"client_version": "abcdef123"}, result) + + def test_client_version_key_error_returns_none(self): + """Tests that the client_version returns None on KeyError""" + self.mocked_connection.return_value.cached_json_rest_api_get.return_value = { + "dummy_key": "value"} + result = self.aptos_collector.client_version() + self.assertIsNone(result) + + def test_client_version_returns_none(self): + """Tests that the client_version returns None if cached_json_rest_api_get returns None""" + self.mocked_connection.return_value.cached_json_rest_api_get.return_value = None + result = self.aptos_collector.client_version() + self.assertIsNone(result) + + def test_latency(self): + """Tests that the latency is obtained from the interface based on latest_query_latency""" + self.mocked_connection.return_value.latest_query_latency = 0.123 + self.assertEqual(0.123, self.aptos_collector.latency()) diff --git a/src/test_interfaces.py b/src/test_interfaces.py index b869f38..e3b87d8 100644 --- a/src/test_interfaces.py +++ b/src/test_interfaces.py @@ -66,7 +66,7 @@ def test_return_and_validate_post_request_method_200(self): "id": "exporter", "method": "getnetworkinfo" } - result = self.interface._return_and_validate_post_request(payload) + result = self.interface._return_and_validate_request(method='POST', payload=payload) self.assertEqual(result, "Ok") self.assertEqual(m.called, True) self.assertEqual(m.call_count, 1) @@ -80,11 +80,50 @@ def test_return_and_validate_post_request_method_non_200(self): "id": "exporter", "method": "getnetworkinfo" } - result = self.interface._return_and_validate_post_request(payload) + result = self.interface._return_and_validate_request(method='POST', payload=payload) self.assertEqual(result, None) self.assertEqual(m.called, True) self.assertEqual(m.call_count, 1) + def test_return_and_validate_get_request_method_200(self): + """Tests the GET request method is called once and returns Ok for 200 status code""" + with requests_mock.Mocker(session=self.interface.session) as m: + m.get(self.url, text="Ok", status_code=200) + params = { + "param1": "value1", + "param2": "value2" + } + result = self.interface._return_and_validate_request(method='GET', params=params) + self.assertEqual(result, "Ok") + self.assertEqual(m.called, True) + self.assertEqual(m.call_count, 1) + # Check if the GET request was made with the correct parameters + expected_params = { + 'apikey': ['123456'], + 'param1': ['value1'], + 'param2': ['value2'] + } + self.assertEqual(m.last_request.qs, expected_params) + + def test_return_and_validate_get_request_method_non_200(self): + """Tests the GET request method is called once and returns None for 500 status code""" + with requests_mock.Mocker(session=self.interface.session) as m: + m.get(self.url, text="Error", status_code=500) + params = { + "param1": "value1", + "param2": "value2" + } + result = self.interface._return_and_validate_request(method='GET', params=params) + self.assertEqual(result, None) + self.assertEqual(m.called, True) + self.assertEqual(m.call_count, 1) + # Check if the GET request was made with the correct parameters + expected_params = { + 'apikey': ['123456'], + 'param1': ['value1'], + 'param2': ['value2'] + } + self.assertEqual(m.last_request.qs, expected_params) class TestWebSocketSubscription(TestCase): """Tests the web socket subscription class""" diff --git a/src/test_registries.py b/src/test_registries.py index 11b5e84..b202efc 100644 --- a/src/test_registries.py +++ b/src/test_registries.py @@ -129,6 +129,16 @@ def test_get_collector_registry_for_starknet(self): with mock.patch('collectors.StarknetCollector', new=mock.Mock()) as collector: helper_test_collector_registry(self, collector) + @mock.patch.dict(os.environ, { + "CONFIG_FILE_PATH": "tests/fixtures/configuration_aptos.yaml", + "VALIDATION_FILE_PATH": "tests/fixtures/validation.yaml" + }) + def test_get_collector_registry_for_aptos(self): + """Tests that the aptos collector is called with the correct args""" + self.collector_registry = CollectorRegistry() + with mock.patch('collectors.AptosCollector', new=mock.Mock()) as collector: + helper_test_collector_registry(self, collector) + @mock.patch.dict(os.environ, { "CONFIG_FILE_PATH": "tests/fixtures/configuration_evm.yaml", "VALIDATION_FILE_PATH": "tests/fixtures/validation.yaml" diff --git a/src/tests/fixtures/configuration_aptos.yaml b/src/tests/fixtures/configuration_aptos.yaml new file mode 100644 index 0000000..6daf57b --- /dev/null +++ b/src/tests/fixtures/configuration_aptos.yaml @@ -0,0 +1,12 @@ +blockchain: "Aptos" +chain_id: 1234 +network_name: "Testnet" +network_type: "Testnet" +collector: "aptos" +endpoints: + - url: https://test1.com + provider: TestProvider1 + - url: https://test2.com + provider: TestProvider2 + - url: https://test3.com + provider: TestProvider3