diff --git a/.gitignore b/.gitignore index 5c9dbc1ae56..546d9936302 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ log/ .vscode allure-results/ +postgresql # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/requirements.txt b/requirements.txt index a470d1e05ca..d2ea5cb5ed5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,7 @@ pytest-rerunfailures==13.0 pytest-timeout==2.2.0 pytest-xdist==3.5.0 python-dotenv==1.0.1 +pytest-dependency==0.6.0 PyYAML==6.0.1 requests==2.31.0 setuptools==69.0.3 diff --git a/src/env_vars.py b/src/env_vars.py index a5881856162..9217af526f4 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -29,6 +29,8 @@ def get_env_var(var_name, default=None): NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68") API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20) RLN_CREDENTIALS = get_env_var("RLN_CREDENTIALS") +PG_USER = get_env_var("POSTGRES_USER", "postgres") +PG_PASS = get_env_var("POSTGRES_PASSWORD", "test123") # example for .env file # RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "wss://sepolia.infura.io/ws/v3/api_key", "rln-relay-eth-contract-address": "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4", "rln-relay-eth-private-key-1": "1111111111111111111111111111111111111111111111111111111111111111", "rln-relay-eth-private-key-2": "1111111111111111111111111111111111111111111111111111111111111111"} diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py index c42566ec1cb..72963ae1032 100644 --- a/src/node/api_clients/rest.py +++ b/src/node/api_clients/rest.py @@ -93,29 +93,29 @@ def get_filter_messages(self, content_topic, pubsub_topic=None): return get_messages_response.json() def get_store_messages( - self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs + self, peer_addr, include_data, pubsub_topic, content_topics, start_time, end_time, hashes, cursor, page_size, ascending, store_v, **kwargs ): base_url = f"store/{store_v}/messages" params = [] - if peerAddr is not None: - params.append(f"peerAddr={quote(peerAddr, safe='')}") - if includeData is not None: - params.append(f"includeData={includeData}") - if pubsubTopic is not None: - params.append(f"pubsubTopic={quote(pubsubTopic, safe='')}") - if contentTopics is not None: - params.append(f"contentTopics={quote(contentTopics, safe='')}") - if startTime is not None: - params.append(f"startTime={startTime}") - if endTime is not None: - params.append(f"endTime={endTime}") + if peer_addr is not None: + params.append(f"peerAddr={quote(peer_addr, safe='')}") + if include_data is not None: + params.append(f"includeData={include_data}") + if pubsub_topic is not None: + params.append(f"pubsubTopic={quote(pubsub_topic, safe='')}") + if content_topics is not None: + params.append(f"contentTopics={quote(content_topics, safe='')}") + if start_time is not None: + params.append(f"startTime={start_time}") + if end_time is not None: + params.append(f"endTime={end_time}") if hashes is not None: params.append(f"hashes={quote(hashes, safe='')}") if cursor is not None: params.append(f"cursor={quote(cursor, safe='')}") - if pageSize is not None: - params.append(f"pageSize={pageSize}") + if page_size is not None: + params.append(f"pageSize={page_size}") if ascending is not None: params.append(f"ascending={ascending}") diff --git a/src/node/docker_mananger.py b/src/node/docker_mananger.py index d638a189fb5..7085bac42ce 100644 --- a/src/node/docker_mananger.py +++ b/src/node/docker_mananger.py @@ -31,7 +31,7 @@ def create_network(self, network_name=NETWORK_NAME): logger.debug(f"Network {network_name} created") return network - def start_container(self, image_name, ports, args, log_path, container_ip, volumes): + def start_container(self, image_name, ports, args, log_path, container_ip, volumes, remove_container=True): cli_args = [] for key, value in args.items(): if isinstance(value, list): # Check if value is a list @@ -46,7 +46,7 @@ def start_container(self, image_name, ports, args, log_path, container_ip, volum cli_args_str_for_log = " ".join(cli_args) logger.debug(f"docker run -i -t {port_bindings_for_log} {image_name} {cli_args_str_for_log}") container = self._client.containers.run( - image_name, command=cli_args, ports=port_bindings, detach=True, remove=True, auto_remove=True, volumes=volumes + image_name, command=cli_args, ports=port_bindings, detach=True, remove=remove_container, auto_remove=remove_container, volumes=volumes ) network = self._client.networks.get(NETWORK_NAME) diff --git a/src/node/store_response.py b/src/node/store_response.py new file mode 100644 index 00000000000..602ad007b24 --- /dev/null +++ b/src/node/store_response.py @@ -0,0 +1,93 @@ +class StoreResponse: + def __init__(self, store_response, node): + self.response = store_response + self.node = node + + @property + def request_id(self): + try: + if self.node.is_nwaku(): + return self.response.get("requestId") + else: + return self.response.get("request_id") + except: + return None + + @property + def status_code(self): + try: + if self.node.is_nwaku(): + return self.response.get("statusCode") + else: + return self.response.get("status_code") + except: + return None + + @property + def status_desc(self): + try: + if self.node.is_nwaku(): + return self.response.get("statusDesc") + else: + return self.response.get("status_desc") + except: + return None + + @property + def messages(self): + try: + return self.response.get("messages") + except: + return None + + @property + def pagination_cursor(self): + try: + if self.node.is_nwaku(): + return self.response.get("paginationCursor") + else: + return self.response.get("pagination_cursor") + except: + return None + + def message_hash(self, index): + if self.messages is not None: + if self.node.is_nwaku(): + return self.messages[index]["messageHash"] + else: + return self.messages[index]["message_hash"] + else: + return None + + def message_payload(self, index): + try: + if self.messages is not None: + payload = self.messages[index]["message"]["payload"] + return payload + else: + return None + except IndexError: + return None + + def message_at(self, index): + try: + if self.messages is not None: + message = self.messages[index]["message"] + return message + else: + return None + except IndexError: + return None + + def message_pubsub_topic(self, index): + if self.messages is not None: + if self.node.is_nwaku(): + return self.messages[index]["pubsubTopic"] + else: + return self.messages[index]["pubsub_topic"] + else: + return None + + @property + def resp_json(self): + return self.response diff --git a/src/node/waku_message.py b/src/node/waku_message.py index 1c63bb5b82b..5b7e2311a8c 100644 --- a/src/node/waku_message.py +++ b/src/node/waku_message.py @@ -13,6 +13,7 @@ class MessageRpcResponse: timestamp: Optional[int] ephemeral: Optional[bool] meta: Optional[str] + rateLimitProof: Optional[str] = field(default_factory=dict) rate_limit_proof: Optional[dict] = field(default_factory=dict) diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 377baae547d..3402a7e8937 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -100,6 +100,12 @@ def start(self, wait_for_node_sec=20, **kwargs): else: raise NotImplementedError("Not implemented for this node type") + if "remove_container" in kwargs: + remove_container = kwargs["remove_container"] + del kwargs["remove_container"] + else: + remove_container = True + default_args.update(sanitize_docker_flags(kwargs)) rln_args, rln_creds_set, keystore_path = self.parse_rln_credentials(default_args, False) @@ -116,7 +122,13 @@ def start(self, wait_for_node_sec=20, **kwargs): logger.debug(f"Using volumes {self._volumes}") self._container = self._docker_manager.start_container( - self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip, self._volumes + self._docker_manager.image, + ports=self._ports, + args=default_args, + log_path=self._log_path, + container_ip=self._ext_ip, + volumes=self._volumes, + remove_container=remove_container, ) logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}") @@ -168,6 +180,10 @@ def stop(self): if self._container: logger.debug(f"Stopping container with id {self._container.short_id}") self._container.stop() + try: + self._container.remove() + except: + pass self._container = None logger.debug("Container stopped.") @@ -291,18 +307,30 @@ def get_filter_messages(self, content_topic, pubsub_topic=None): return self._api.get_filter_messages(content_topic, pubsub_topic) def get_store_messages( - self, peerAddr, includeData, pubsubTopic, contentTopics, startTime, endTime, hashes, cursor, pageSize, ascending, store_v, **kwargs + self, + peer_addr=None, + include_data=None, + pubsub_topic=None, + content_topics=None, + start_time=None, + end_time=None, + hashes=None, + cursor=None, + page_size=None, + ascending=None, + store_v="v3", + **kwargs, ): return self._api.get_store_messages( - peerAddr=peerAddr, - includeData=includeData, - pubsubTopic=pubsubTopic, - contentTopics=contentTopics, - startTime=startTime, - endTime=endTime, + peer_addr=peer_addr, + include_data=include_data, + pubsub_topic=pubsub_topic, + content_topics=content_topics, + start_time=start_time, + end_time=end_time, hashes=hashes, cursor=cursor, - pageSize=pageSize, + page_size=page_size, ascending=ascending, store_v=store_v, **kwargs, @@ -393,3 +421,7 @@ def parse_rln_credentials(self, default_args, is_registration): raise NotImplementedError("Not implemented for type other than Nim Waku ") return rln_args, True, keystore_path + + @property + def container(self): + return self._container diff --git a/src/postgres_setup.py b/src/postgres_setup.py new file mode 100644 index 00000000000..97d47c95481 --- /dev/null +++ b/src/postgres_setup.py @@ -0,0 +1,44 @@ +import docker +import os +from src.env_vars import NETWORK_NAME, PG_PASS, PG_USER +from src.libs.custom_logger import get_custom_logger + +logger = get_custom_logger(__name__) + + +def start_postgres(): + pg_env = {"POSTGRES_USER": PG_USER, "POSTGRES_PASSWORD": PG_PASS} + + base_path = os.path.abspath(".") + volumes = {os.path.join(base_path, "postgresql"): {"bind": "/var/lib/postgresql/data", "mode": "Z"}} + + client = docker.from_env() + + postgres_container = client.containers.run( + "postgres:15.4-alpine3.18", + name="postgres", + environment=pg_env, + volumes=volumes, + command="postgres", + ports={"5432/tcp": ("127.0.0.1", 5432)}, + restart_policy={"Name": "on-failure", "MaximumRetryCount": 5}, + healthcheck={ + "Test": ["CMD-SHELL", "pg_isready -U postgres -d postgres"], + "Interval": 30000000000, # 30 seconds in nanoseconds + "Timeout": 60000000000, # 60 seconds in nanoseconds + "Retries": 5, + "StartPeriod": 80000000000, # 80 seconds in nanoseconds + }, + detach=True, + network_mode=NETWORK_NAME, + ) + + logger.debug("Postgres container started") + + return postgres_container + + +def stop_postgres(postgres_container): + postgres_container.stop() + postgres_container.remove() + logger.debug("Postgres container stopped and removed.") diff --git a/src/steps/common.py b/src/steps/common.py index 8d520949e2c..1ce4db9b59f 100644 --- a/src/steps/common.py +++ b/src/steps/common.py @@ -1,3 +1,5 @@ +import base64 +import hashlib import inspect from time import time import allure @@ -34,3 +36,15 @@ def create_message(self, **kwargs): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic, "timestamp": int(time() * 1e9)} message.update(kwargs) return message + + @allure.step + def compute_message_hash(self, pubsub_topic, msg): + ctx = hashlib.sha256() + ctx.update(pubsub_topic.encode("utf-8")) + ctx.update(base64.b64decode(msg["payload"])) + ctx.update(msg["contentTopic"].encode("utf-8")) + if "meta" in msg: + ctx.update(base64.b64decode(msg["meta"])) + ctx.update(int(msg["timestamp"]).to_bytes(8, byteorder="big")) + hash_bytes = ctx.digest() + return base64.b64encode(hash_bytes).decode("utf-8") diff --git a/src/steps/store.py b/src/steps/store.py index e615a1e348c..685a5094c30 100644 --- a/src/steps/store.py +++ b/src/steps/store.py @@ -1,14 +1,15 @@ import inspect from src.libs.custom_logger import get_custom_logger -from time import time import pytest import allure from src.libs.common import delay +from src.node.store_response import StoreResponse from src.node.waku_message import WakuMessage from src.env_vars import ( ADDITIONAL_NODES, NODE_1, NODE_2, + NODEKEY, ) from src.node.waku_node import WakuNode from src.steps.common import StepsCommon @@ -29,6 +30,12 @@ def store_setup(self): self.optional_nodes = [] self.multiaddr_list = [] + @pytest.fixture(scope="function", autouse=False) + def node_setup(self, store_setup): + self.setup_first_publishing_node(store="true", relay="true") + self.setup_first_store_node(store="true", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + @allure.step def start_publishing_node(self, image, node_index, **kwargs): node = WakuNode(image, f"publishing_node{node_index}_{self.test_id}") @@ -53,7 +60,7 @@ def setup_store_node(self, image, node_index, **kwargs): @allure.step def setup_first_publishing_node(self, store="true", relay="true", **kwargs): - self.publishing_node1 = self.start_publishing_node(NODE_1, node_index=1, store=store, relay=relay, **kwargs) + self.publishing_node1 = self.start_publishing_node(NODE_1, node_index=1, store=store, relay=relay, nodekey=NODEKEY, **kwargs) self.enr_uri = self.publishing_node1.get_enr_uri() @allure.step @@ -110,37 +117,88 @@ def subscribe_to_pubsub_topics_via_filter(self, node, pubsub_topic=None, content node.set_filter_subscriptions(subscription) @allure.step - def publish_message_via(self, type, pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None): + def publish_message(self, via="relay", pubsub_topic=None, message=None, message_propagation_delay=0.01, sender=None): self.message = self.create_message() if message is None else message if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic if not sender: sender = self.publishing_node1 - if type == "relay": + if via == "relay": logger.debug("Relaying message") sender.send_relay_message(self.message, pubsub_topic) - elif type == "lightpush": + elif via == "lightpush": payload = self.create_payload(pubsub_topic, self.message) sender.send_light_push_message(payload) delay(message_propagation_delay) + return self.message + + def get_messages_from_store( + self, + node=None, + peer_addr=None, + include_data=None, + pubsub_topic=None, + content_topics=None, + start_time=None, + end_time=None, + hashes=None, + cursor=None, + page_size=None, + ascending="true", + store_v="v3", + **kwargs, + ): + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if node.is_gowaku(): + if content_topics is None: + content_topics = self.test_content_topic + if hashes is not None: + content_topics = None + pubsub_topic = None + peer_addr = self.multiaddr_list[0] + store_response = node.get_store_messages( + peer_addr=peer_addr, + include_data=include_data, + pubsub_topic=pubsub_topic, + content_topics=content_topics, + start_time=start_time, + end_time=end_time, + hashes=hashes, + cursor=cursor, + page_size=page_size, + ascending=ascending, + store_v=store_v, + **kwargs, + ) + store_response = StoreResponse(store_response, node) + assert store_response.request_id is not None, "Request id is missing" + assert store_response.status_code, "Status code is missing" + assert store_response.status_desc, "Status desc is missing" + return store_response @allure.step def check_published_message_is_stored( self, store_node=None, - peerAddr=None, - includeData=None, - pubsubTopic=None, - contentTopics=None, - startTime=None, - endTime=None, + peer_addr=None, + include_data=None, + pubsub_topic=None, + content_topics=None, + start_time=None, + end_time=None, hashes=None, cursor=None, - pageSize=None, + page_size=None, ascending=None, - store_v="v1", + store_v="v3", + message_to_check=None, **kwargs, ): + if pubsub_topic is None: + pubsub_topic = self.test_pubsub_topic + if message_to_check is None: + message_to_check = self.message if store_node is None: store_node = self.store_nodes elif not isinstance(store_node, list): @@ -149,33 +207,40 @@ def check_published_message_is_stored( store_node = store_node for node in store_node: logger.debug(f"Checking that peer {node.image} can find the stored message") - self.store_response = node.get_store_messages( - peerAddr=peerAddr, - includeData=includeData, - pubsubTopic=pubsubTopic, - contentTopics=contentTopics, - startTime=startTime, - endTime=endTime, + self.store_response = self.get_messages_from_store( + node=node, + peer_addr=peer_addr, + include_data=include_data, + pubsub_topic=pubsub_topic, + content_topics=content_topics, + start_time=start_time, + end_time=end_time, hashes=hashes, cursor=cursor, - pageSize=pageSize, + page_size=page_size, ascending=ascending, store_v=store_v, **kwargs, ) - assert "messages" in self.store_response, f"Peer {node.image} has no messages key in the reponse" - assert self.store_response["messages"], f"Peer {node.image} couldn't find any messages" - assert len(self.store_response["messages"]) >= 1, "Expected at least 1 message but got none" - waku_message = WakuMessage(self.store_response["messages"][-1:]) - waku_message.assert_received_message(self.message) + assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. Actual response: {self.store_response.resp_json}" + assert len(self.store_response.messages) >= 1, "Expected at least 1 message but got none" + store_message_index = -1 # we are looking for the last and most recent message in the store + waku_message = WakuMessage([self.store_response.messages[store_message_index:]]) + if store_v == "v1": + waku_message.assert_received_message(message_to_check) + else: + expected_hash = self.compute_message_hash(pubsub_topic, message_to_check) + assert expected_hash == self.store_response.message_hash( + store_message_index + ), f"Message hash returned by store doesn't match the computed message hash {expected_hash}" @allure.step def check_store_returns_empty_response(self, pubsub_topic=None): if not pubsub_topic: pubsub_topic = self.test_pubsub_topic try: - self.check_published_message_is_stored(pubsubTopic=pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(pubsubTopic=pubsub_topic, page_size=5, ascending="true") except Exception as ex: assert "couldn't find any messages" in str(ex) diff --git a/src/test_data.py b/src/test_data.py index bf52edd2697..13d39fa8f44 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -98,6 +98,18 @@ VALID_PUBSUB_TOPICS = ["/waku/2/rs/0/0", "/waku/2/rs/0/1", "/waku/2/rs/0/9", "/waku/2/rs/0/25", "/waku/2/rs/0/1000"] +PUBSUB_TOPICS_STORE = [ + "/waku/2/rs/0/0", + "/waku/2/rs/0/1", + "/waku/2/rs/0/2", + "/waku/2/rs/0/3", + "/waku/2/rs/0/4", + "/waku/2/rs/0/5", + "/waku/2/rs/0/6", + "/waku/2/rs/0/7", + "/waku/2/rs/0/8", +] + INVALID_PUBSUB_TOPICS = ["/test/2/rs/0/1", "/waku/3/rs/0/1", "/waku/2/test/0/1", "/waku/2/rs/0/b", "/waku/2/rs/0"] PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [ diff --git a/tests/conftest.py b/tests/conftest.py index 2e26b97cd4f..4e9ede25ecb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,7 @@ from src.libs.common import attach_allure_file import src.env_vars as env_vars from src.data_storage import DS +from src.postgres_setup import start_postgres, stop_postgres logger = get_custom_logger(__name__) @@ -37,6 +38,13 @@ def set_allure_env_variables(): outfile.write(f"{attribute_name}={attribute_value}\n") +@pytest.fixture(scope="class", autouse=False) +def start_postgres_container(): + pg_container = start_postgres() + yield + stop_postgres(pg_container) + + @pytest.fixture(scope="function", autouse=True) def test_id(request): # setting up an unique test id to be used where needed diff --git a/tests/store/test_api_flags.py b/tests/store/test_api_flags.py new file mode 100644 index 00000000000..8c0cba1b615 --- /dev/null +++ b/tests/store/test_api_flags.py @@ -0,0 +1,27 @@ +import pytest +from src.libs.common import to_base64 +from src.node.waku_message import WakuMessage +from src.steps.store import StepsStore +from src.test_data import SAMPLE_INPUTS + + +@pytest.mark.usefixtures("node_setup") +class TestApiFlags(StepsStore): + def test_store_with_peerAddr(self): + self.publish_message() + self.check_published_message_is_stored(store_node=self.store_node1, peer_addr=self.multiaddr_list[0]) + + def test_store_include_data(self): + message_list = [] + for payload in SAMPLE_INPUTS: + message = self.create_message(payload=to_base64(payload["value"])) + self.publish_message(message=message) + message_list.append(message) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, include_data="true", page_size=50) + assert len(store_response.messages) == len(SAMPLE_INPUTS) + for index in range(len(store_response.messages)): + assert store_response.message_payload(index) == message_list[index]["payload"] + assert store_response.message_pubsub_topic(index) == self.test_pubsub_topic + waku_message = WakuMessage([store_response.message_at(index)]) + waku_message.assert_received_message(message_list[index]) diff --git a/tests/store/test_cursor.py b/tests/store/test_cursor.py new file mode 100644 index 00000000000..695fca06dd0 --- /dev/null +++ b/tests/store/test_cursor.py @@ -0,0 +1,104 @@ +import pytest +from src.env_vars import NODE_1, NODE_2 +from src.libs.common import to_base64 +from src.node.store_response import StoreResponse +from src.steps.store import StepsStore + + +@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1109") +@pytest.mark.usefixtures("node_setup") +class TestCursor(StepsStore): + # we implicitly test the reusabilty of the cursor for multiple nodes + + def test_get_multiple_2000_store_messages(self): + expected_message_hash_list = [] + for i in range(2000): + message = self.create_message(payload=to_base64(f"Message_{i}")) + self.publish_message(message=message) + expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + store_response = StoreResponse({"paginationCursor": "", "pagination_cursor": ""}, self.store_node1) + response_message_hash_list = [] + while store_response.pagination_cursor is not None: + cursor = store_response.pagination_cursor + store_response = self.get_messages_from_store(self.store_node1, page_size=100, cursor=cursor) + for index in range(len(store_response.messages)): + response_message_hash_list.append(store_response.message_hash(index)) + assert len(expected_message_hash_list) == len(response_message_hash_list), "Message count mismatch" + assert expected_message_hash_list == response_message_hash_list, "Message hash mismatch" + + @pytest.mark.parametrize("cursor_index, message_count", [[2, 4], [3, 20], [10, 40], [19, 20], [19, 50], [110, 120]]) + def test_different_cursor_and_indexes(self, cursor_index, message_count): + message_hash_list = [] + cursor = "" + cursor_index = cursor_index if cursor_index < 100 else 100 + for i in range(message_count): + message = self.create_message(payload=to_base64(f"Message_{i}")) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=cursor_index) + assert len(store_response.messages) == cursor_index + cursor = store_response.pagination_cursor + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=100, ascending="true", cursor=cursor) + assert len(store_response.messages) == message_count - cursor_index + for index in range(len(store_response.messages)): + assert store_response.message_hash(index) == message_hash_list[cursor_index + index], f"Message hash at index {index} doesn't match" + + def test_passing_cursor_not_returned_in_paginationCursor(self): + cursor = "" + for i in range(10): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5) + # retrieving the cursor with the message hash of the 3rd message stored + cursor = store_response.message_hash(2) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor) + assert len(store_response.messages) == 7, "Message count mismatch" + + def test_passing_cursor_of_the_last_message_from_the_store(self): + cursor = "" + for i in range(10): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=10) + # retrieving the cursor with the message hash of the last message stored + cursor = store_response.message_hash(9) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor) + assert not store_response.messages, "Messages found" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110") + @pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716") + def test_passing_cursor_of_non_existing_message_from_the_store(self): + for i in range(4): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + # creating a cursor to a message that doesn't exist + wrong_message = self.create_message(payload=to_base64("test")) + cursor = self.compute_message_hash(self.test_pubsub_topic, wrong_message) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor) + assert not store_response.messages, "Messages found" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110") + @pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716") + def test_passing_invalid_cursor(self): + for i in range(4): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + # creating a invalid base64 cursor + cursor = to_base64("test") + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor) + assert not store_response.messages, "Messages found" + + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1110") + @pytest.mark.xfail("nwaku" in (NODE_1 + NODE_2), reason="Bug reported: https://github.com/waku-org/nwaku/issues/2716") + def test_passing_non_base64_cursor(self): + for i in range(4): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + # creating a non base64 cursor + cursor = "test" + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=100, cursor=cursor) + assert not store_response.messages, "Messages found" diff --git a/tests/store/test_ephemeral.py b/tests/store/test_ephemeral.py new file mode 100644 index 00000000000..acd1824aa17 --- /dev/null +++ b/tests/store/test_ephemeral.py @@ -0,0 +1,26 @@ +import pytest +from src.libs.custom_logger import get_custom_logger +from src.steps.store import StepsStore + +logger = get_custom_logger(__name__) + + +@pytest.mark.usefixtures("node_setup") +class TestEphemeral(StepsStore): + def test_message_with_ephemeral_true(self): + self.publish_message(message=self.create_message(ephemeral=True)) + self.check_store_returns_empty_response() + + def test_message_with_ephemeral_false(self): + self.publish_message(message=self.create_message(ephemeral=False)) + self.check_published_message_is_stored(page_size=5, ascending="true") + + def test_message_with_both_ephemeral_true_and_false(self): + self.publish_message(message=self.create_message(ephemeral=True)) + stored = self.publish_message(message=self.create_message(ephemeral=False)) + self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored) + assert len(self.store_response.messages) == 1 + stored = self.publish_message(message=self.create_message(ephemeral=False)) + self.publish_message(message=self.create_message(ephemeral=True)) + self.check_published_message_is_stored(page_size=5, ascending="true", message_to_check=stored) + assert len(self.store_response.messages) == 2 diff --git a/tests/store/test_external_db.py b/tests/store/test_external_db.py new file mode 100644 index 00000000000..73872b01acb --- /dev/null +++ b/tests/store/test_external_db.py @@ -0,0 +1,30 @@ +import pytest +from src.libs.custom_logger import get_custom_logger +from src.steps.store import StepsStore +from src.env_vars import PG_PASS, PG_USER + +logger = get_custom_logger(__name__) + + +class TestExternalDb(StepsStore): + postgress_url = f"postgres://{PG_USER}:{PG_PASS}@postgres:5432/postgres" + + @pytest.fixture(scope="function", autouse=True) + def node_postgres_setup(self, store_setup, start_postgres_container): + self.setup_first_publishing_node(store="true", relay="true", store_message_db_url=self.postgress_url) + self.setup_first_store_node(store="false", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + + @pytest.mark.dependency(name="test_on_empty_postgress_db") + def test_on_empty_postgress_db(self): + message = self.create_message() + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") + assert len(self.store_response.messages) == 1 + + @pytest.mark.dependency(depends=["test_on_empty_postgress_db"]) + def test_on_postgress_db_with_one_message(self): + message = self.create_message() + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") + assert len(self.store_response.messages) == 2 diff --git a/tests/store/test_get_messages.py b/tests/store/test_get_messages.py index 53e3bcc9ea2..be8167a225e 100644 --- a/tests/store/test_get_messages.py +++ b/tests/store/test_get_messages.py @@ -2,29 +2,91 @@ from src.libs.custom_logger import get_custom_logger from src.libs.common import to_base64 from src.steps.store import StepsStore -from src.test_data import SAMPLE_INPUTS +from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS, SAMPLE_INPUTS, PUBSUB_TOPICS_STORE logger = get_custom_logger(__name__) -# TO DO test without pubsubtopic freezes - +@pytest.mark.usefixtures("node_setup") class TestGetMessages(StepsStore): - @pytest.fixture(scope="function", autouse=True) - def store_functional_setup(self, store_setup): - self.setup_first_publishing_node(store="true", relay="true") - self.setup_first_store_node(store="true", relay="true") - self.subscribe_to_pubsub_topics_via_relay() + # only one test for store v1, all other tests are using the new store v3 + def test_legacy_store_v1(self): + self.publish_message() + for node in self.store_nodes: + store_response = node.get_store_messages(pubsub_topic=self.test_pubsub_topic, page_size=5, ascending="true", store_v="v1") + assert len(store_response["messages"]) == 1 - def test_store_messages_with_valid_payloads(self): + def test_get_store_messages_with_different_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: logger.debug(f'Running test with payload {payload["description"]}') message = self.create_message(payload=to_base64(payload["value"])) try: - self.publish_message_via("relay", message=message) - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=50, ascending="true") + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=50, ascending="true") except Exception as e: logger.error(f'Payload {payload["description"]} failed: {str(e)}') failed_payloads.append(payload["description"]) assert not failed_payloads, f"Payloads failed: {failed_payloads}" + assert len(self.store_response.messages) == len(SAMPLE_INPUTS) + + def test_get_store_messages_with_different_content_topics(self): + failed_content_topics = [] + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + logger.debug(f"Running test with content topic {content_topic}") + message = self.create_message(contentTopic=content_topic) + try: + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=50, content_topics=content_topic, ascending="true") + except Exception as e: + logger.error(f"ContentTopic {content_topic} failed: {str(e)}") + failed_content_topics.append(content_topic) + assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" + + def test_get_store_messages_with_different_pubsub_topics(self): + self.subscribe_to_pubsub_topics_via_relay(pubsub_topics=PUBSUB_TOPICS_STORE) + failed_pubsub_topics = [] + for pubsub_topic in PUBSUB_TOPICS_STORE: + logger.debug(f"Running test with pubsub topic {pubsub_topic}") + try: + self.publish_message(pubsub_topic=pubsub_topic) + self.check_published_message_is_stored(pubsub_topic=pubsub_topic, page_size=50, ascending="true") + except Exception as e: + logger.error(f"PubsubTopic pubsub_topic failed: {str(e)}") + failed_pubsub_topics.append(pubsub_topic) + assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}" + + def test_get_store_message_with_meta(self): + message = self.create_message(meta=to_base64(self.test_payload)) + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") + + def test_get_store_message_with_version(self): + message = self.create_message(version=10) + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") + + def test_get_store_duplicate_messages(self): + message = self.create_message() + self.publish_message(message=message) + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=5, ascending="true") + # only one message is stored + assert len(self.store_response.messages) == 1 + + def test_get_multiple_store_messages(self): + message_hash_list = [] + for payload in SAMPLE_INPUTS: + message = self.create_message(payload=to_base64(payload["value"])) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=50) + assert len(store_response.messages) == len(SAMPLE_INPUTS) + for index in range(len(store_response.messages)): + assert store_response.message_hash(index) == message_hash_list[index], f"Message hash at index {index} doesn't match" + + def test_store_is_empty(self): + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=50) + assert not store_response.messages diff --git a/tests/store/test_hashes.py b/tests/store/test_hashes.py new file mode 100644 index 00000000000..51528b4b80a --- /dev/null +++ b/tests/store/test_hashes.py @@ -0,0 +1,66 @@ +import pytest +from src.env_vars import NODE_2 +from src.libs.common import to_base64 +from src.libs.custom_logger import get_custom_logger +from src.steps.store import StepsStore +from src.test_data import SAMPLE_INPUTS + +logger = get_custom_logger(__name__) + + +@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1109") +@pytest.mark.usefixtures("node_setup") +class TestHashes(StepsStore): + def test_store_with_hashes(self): + message_hash_list = [] + for payload in SAMPLE_INPUTS: + message = self.create_message(payload=to_base64(payload["value"])) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + for message_hash in message_hash_list: + store_response = self.get_messages_from_store(node, hashes=message_hash, page_size=50) + assert len(store_response.messages) == 1 + assert store_response.message_hash(0) == message_hash + + def test_store_with_multiple_hashes(self): + message_hash_list = [] + for payload in SAMPLE_INPUTS: + message = self.create_message(payload=to_base64(payload["value"])) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, hashes=f"{message_hash_list[0]},{message_hash_list[4]}", page_size=50) + assert len(store_response.messages) == 2 + assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on multiple hashes" + assert store_response.message_hash(1) == message_hash_list[4], "Incorrect messaged filtered based on multiple hashes" + + def test_store_with_wrong_hash(self): + for i in range(4): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + wrong_hash = self.compute_message_hash(self.test_pubsub_topic, self.create_message(payload=to_base64("test"))) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, hashes=wrong_hash, page_size=50) + assert not store_response.messages, "Messages found" + + def test_store_with_invalid_hash(self): + for i in range(4): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + invalid_hash = to_base64("test") + for node in self.store_nodes: + try: + store_response = self.get_messages_from_store(node, hashes=invalid_hash, page_size=50) + assert not store_response.messages + except Exception as ex: + assert "waku message hash parsing error: invalid hash length" in str(ex) + + def test_store_with_non_base64_hash(self): + for i in range(4): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + non_base64_hash = "test" + for node in self.store_nodes: + try: + store_response = self.get_messages_from_store(node, hashes=non_base64_hash, page_size=50) + assert not store_response.messages + except Exception as ex: + assert "waku message hash parsing error: invalid hash length" in str(ex) diff --git a/tests/store/test_page_size.py b/tests/store/test_page_size.py new file mode 100644 index 00000000000..ee65453e4b9 --- /dev/null +++ b/tests/store/test_page_size.py @@ -0,0 +1,35 @@ +import pytest +from src.libs.common import to_base64 +from src.steps.store import StepsStore + + +@pytest.mark.usefixtures("node_setup") +class TestPageSize(StepsStore): + def test_default_page_size(self): + for i in range(30): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node) + assert len(store_response.messages) == 20, "Message count mismatch" + + def test_page_size_0_defaults_to_20(self): + for i in range(30): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=0) + assert len(store_response.messages) == 20, "Message count mismatch" + + def test_max_page_size(self): + for i in range(200): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=200) + assert len(store_response.messages) == 100, "Message count mismatch" + + @pytest.mark.parametrize("page_size", [1, 11, 39, 81, 99]) + def test_different_page_size(self, page_size): + for i in range(page_size + 1): + self.publish_message(message=self.create_message(payload=to_base64(f"Message_{i}"))) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=page_size) + assert len(store_response.messages) == page_size, "Message count mismatch" diff --git a/tests/store/test_reliability.py b/tests/store/test_reliability.py new file mode 100644 index 00000000000..6e5fc712036 --- /dev/null +++ b/tests/store/test_reliability.py @@ -0,0 +1,123 @@ +import pytest +from src.libs.custom_logger import get_custom_logger +from src.libs.common import delay +from src.steps.store import StepsStore + +logger = get_custom_logger(__name__) + + +class TestReliability(StepsStore): + def test_publishing_node_is_stopped(self, node_setup): + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.publishing_node1.stop() + try: + store_response = self.get_messages_from_store(self.store_node1, page_size=5) + assert len(store_response.messages) == 1 + except Exception as ex: + if self.store_node1.is_gowaku(): + assert "failed to dial: context deadline exceeded" in str(ex) + else: + raise AssertionError(f"Nwaku failed with {ex}") + + def test_publishing_node_restarts(self, node_setup): + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.publishing_node1.restart() + self.publishing_node1.ensure_ready() + self.add_node_peer(self.store_node1, self.multiaddr_list) + self.subscribe_to_pubsub_topics_via_relay(node=self.publishing_node1) + self.publish_message() + self.check_published_message_is_stored(page_size=5) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5) + assert len(store_response.messages) == 2 + + def test_store_node_restarts(self, node_setup): + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.store_node1.restart() + self.store_node1.ensure_ready() + self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1) + self.publish_message() + self.check_published_message_is_stored(page_size=5) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5) + assert len(store_response.messages) == 2 + + def test_publishing_node_paused_and_unpaused(self, node_setup): + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.publishing_node1.pause() + delay(1) + self.publishing_node1.unpause() + self.publishing_node1.ensure_ready() + self.publish_message() + self.check_published_message_is_stored(page_size=5) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5) + assert len(store_response.messages) == 2 + + def test_store_node_paused_and_unpaused(self, node_setup): + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.store_node1.pause() + delay(1) + self.store_node1.unpause() + self.store_node1.ensure_ready() + self.publish_message() + self.check_published_message_is_stored(page_size=5) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5) + assert len(store_response.messages) == 2 + + def test_message_relayed_while_store_node_is_paused(self, node_setup): + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.store_node1.pause() + self.publish_message() + self.store_node1.unpause() + self.store_node1.ensure_ready() + self.check_published_message_is_stored(page_size=5) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5) + assert len(store_response.messages) == 2 + + def test_message_relayed_while_store_node_is_stopped_without_removing(self): + self.setup_first_publishing_node(store="true", relay="true") + self.setup_first_store_node(store="false", relay="true", remove_container=False) + self.subscribe_to_pubsub_topics_via_relay() + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.store_node1.container.stop() + self.publish_message() + self.store_node1.container.start() + self.store_node1.ensure_ready() + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5) + assert len(store_response.messages) == 2 + + def test_message_relayed_while_store_node_is_stopped_and_removed(self, node_setup): + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.store_node1.stop() + self.store_nodes.remove(self.store_node1) + self.publish_message() + self.setup_first_store_node(store="false", relay="true") + self.store_node1.ensure_ready() + self.add_node_peer(self.store_node1, self.multiaddr_list) + self.subscribe_to_pubsub_topics_via_relay(node=self.store_node1) + delay(1) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5) + assert len(store_response.messages) == 2 + + def test_message_relayed_before_store_node_is_started(self, node_setup): + self.publish_message() + self.check_published_message_is_stored(page_size=5) + self.setup_second_store_node(store="false", relay="true") + self.subscribe_to_pubsub_topics_via_relay() + store_response = self.get_messages_from_store(self.store_node2, page_size=5) + assert len(store_response.messages) == 1 + self.publish_message() + self.check_published_message_is_stored(page_size=5) diff --git a/tests/store/test_running_nodes.py b/tests/store/test_running_nodes.py index 61d8d5ee717..b96ff95c363 100644 --- a/tests/store/test_running_nodes.py +++ b/tests/store/test_running_nodes.py @@ -1,3 +1,5 @@ +import pytest +from src.env_vars import NODE_2 from src.steps.store import StepsStore @@ -6,51 +8,54 @@ def test_main_node_relay_and_store__peer_relay_and_store(self): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") def test_main_node_relay_and_store__peer_only_store(self): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="true", relay="false") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") + self.publish_message() self.check_store_returns_empty_response() def test_main_node_relay_and_store__peer_only_relay(self): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="false", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") def test_main_node_relay_and_store__peer_neither_relay_nor_store(self): self.setup_first_publishing_node(store="true", relay="true") self.setup_first_store_node(store="false", relay="false") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1106") def test_main_node_only_relay__peer_relay_and_store(self): self.setup_first_publishing_node(store="false", relay="true") self.setup_first_store_node(store="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1106") def test_main_node_only_relay__peer_only_store(self): self.setup_first_publishing_node(store="false", relay="true") self.setup_first_store_node(store="true", relay="false") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") + self.publish_message() self.check_store_returns_empty_response() + @pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1106") def test_main_node_only_relay__peer_only_relay(self): self.setup_first_publishing_node(store="false", relay="true") self.setup_first_store_node(store="false", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") + self.publish_message() try: - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.check_published_message_is_stored(page_size=5, ascending="true") except Exception as ex: assert "failed to negotiate protocol: protocols not supported" in str(ex) or "PEER_DIAL_FAILURE" in str(ex) @@ -58,12 +63,12 @@ def test_store_lightpushed_message(self): self.setup_first_publishing_node(store="true", relay="true", lightpush="true") self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0]) self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("lightpush", sender=self.store_node1) - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.publish_message(via="lightpush", sender=self.store_node1) + self.check_published_message_is_stored(page_size=5, ascending="true") def test_store_with_filter(self): self.setup_first_publishing_node(store="true", relay="true", filter="true") self.setup_first_store_node(store="false", relay="false", filter="true") self.subscribe_to_pubsub_topics_via_relay() - self.publish_message_via("relay") - self.check_published_message_is_stored(pubsubTopic=self.test_pubsub_topic, pageSize=5, ascending="true") + self.publish_message() + self.check_published_message_is_stored(page_size=5, ascending="true") diff --git a/tests/store/test_sorting.py b/tests/store/test_sorting.py new file mode 100644 index 00000000000..512e44b7256 --- /dev/null +++ b/tests/store/test_sorting.py @@ -0,0 +1,23 @@ +import pytest +from src.libs.common import to_base64 +from src.steps.store import StepsStore + + +@pytest.mark.usefixtures("node_setup") +class TestSorting(StepsStore): + @pytest.mark.parametrize("ascending", ["true", "false"]) + def test_store_sort_ascending(self, ascending): + expected_message_hash_list = [] + for i in range(10): + message = self.create_message(payload=to_base64(f"Message_{i}")) + self.publish_message(message=message) + expected_message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = self.get_messages_from_store(node, page_size=5, ascending=ascending) + response_message_hash_list = [] + for index in range(len(store_response.messages)): + response_message_hash_list.append(store_response.message_hash(index)) + if ascending == "true": + assert response_message_hash_list == expected_message_hash_list[:5], "Message hash mismatch for acending order" + else: + assert response_message_hash_list == expected_message_hash_list[5:], "Message hash mismatch for descending order" diff --git a/tests/store/test_time_filter.py b/tests/store/test_time_filter.py new file mode 100644 index 00000000000..ad0cb0cf155 --- /dev/null +++ b/tests/store/test_time_filter.py @@ -0,0 +1,114 @@ +import pytest +from datetime import timedelta, datetime +from src.libs.custom_logger import get_custom_logger +from src.steps.store import StepsStore + +logger = get_custom_logger(__name__) + + +@pytest.mark.usefixtures("node_setup") +class TestTimeFilter(StepsStore): + @pytest.fixture(scope="function", autouse=True) + def setup_test_data(self): + self.ts_pass = [ + {"description": "3 sec Past", "value": int((datetime.now() - timedelta(seconds=3)).timestamp() * 1e9)}, + {"description": "1 sec Past", "value": int((datetime.now() - timedelta(seconds=1)).timestamp() * 1e9)}, + {"description": "0.1 sec Past", "value": int((datetime.now() - timedelta(seconds=0.1)).timestamp() * 1e9)}, + {"description": "0.1 sec Future", "value": int((datetime.now() + timedelta(seconds=0.1)).timestamp() * 1e9)}, + {"description": "2 sec Future", "value": int((datetime.now() + timedelta(seconds=2)).timestamp() * 1e9)}, + {"description": "10 sec Future", "value": int((datetime.now() + timedelta(seconds=10)).timestamp() * 1e9)}, + ] + self.ts_fail = [ + {"description": "20 sec Past", "value": int((datetime.now() - timedelta(seconds=20)).timestamp() * 1e9)}, + {"description": "40 sec Future", "value": int((datetime.now() + timedelta(seconds=40)).timestamp() * 1e9)}, + ] + + def test_messages_with_timestamps_close_to_now(self): + failed_timestamps = [] + for timestamp in self.ts_pass: + logger.debug(f'Running test with payload {timestamp["description"]}') + message = self.create_message(timestamp=timestamp["value"]) + try: + self.publish_message(message=message) + self.check_published_message_is_stored(page_size=20, ascending="true") + except Exception as ex: + logger.error(f'Payload {timestamp["description"]} failed: {str(ex)}') + failed_timestamps.append(timestamp["description"]) + assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}" + + def test_messages_with_timestamps_far_from_now(self): + success_timestamps = [] + for timestamp in self.ts_fail: + logger.debug(f'Running test with payload {timestamp["description"]}') + message = self.create_message(timestamp=timestamp["value"]) + try: + self.publish_message(message=message) + self.check_store_returns_empty_response() + except Exception as ex: + logger.error(f'Payload {timestamp["description"]} succeeded where it should have failed: {str(ex)}') + success_timestamps.append(timestamp["description"]) + assert not success_timestamps, f"Timestamps succeeded: {success_timestamps}" + + def test_time_filter_matches_one_message(self): + message_hash_list = [] + for timestamp in self.ts_pass: + message = self.create_message(timestamp=timestamp["value"]) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = self.get_messages_from_store( + node, + page_size=20, + start_time=self.ts_pass[0]["value"] - 100000, + end_time=self.ts_pass[0]["value"] + 100000, + ) + assert len(store_response.messages) == 1, "Message count mismatch" + assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on time" + + def test_time_filter_matches_multiple_messages(self): + message_hash_list = [] + for timestamp in self.ts_pass: + message = self.create_message(timestamp=timestamp["value"]) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = self.get_messages_from_store( + node, + page_size=20, + start_time=self.ts_pass[0]["value"] - 100000, + end_time=self.ts_pass[4]["value"] + 100000, + ) + assert len(store_response.messages) == 5, "Message count mismatch" + for i in range(5): + assert store_response.message_hash(i) == message_hash_list[i], f"Incorrect messaged filtered based on time at index {i}" + + def test_time_filter_matches_no_message(self): + message_hash_list = [] + for timestamp in self.ts_pass: + message = self.create_message(timestamp=timestamp["value"]) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = self.get_messages_from_store( + node, + page_size=20, + start_time=self.ts_pass[0]["value"] - 100000, + end_time=self.ts_pass[0]["value"] - 100, + ) + assert not store_response.messages, "Message count mismatch" + + def test_time_filter_start_time_equals_end_time(self): + message_hash_list = [] + for timestamp in self.ts_pass: + message = self.create_message(timestamp=timestamp["value"]) + self.publish_message(message=message) + message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + for node in self.store_nodes: + store_response = self.get_messages_from_store( + node, + page_size=20, + start_time=self.ts_pass[0]["value"], + end_time=self.ts_pass[0]["value"], + ) + assert len(store_response.messages) == 1, "Message count mismatch" + assert store_response.message_hash(0) == message_hash_list[0], "Incorrect messaged filtered based on time" diff --git a/tests/store/test_topics.py b/tests/store/test_topics.py new file mode 100644 index 00000000000..fc315c0106d --- /dev/null +++ b/tests/store/test_topics.py @@ -0,0 +1,81 @@ +import pytest +from src.env_vars import NODE_2 +from src.steps.store import StepsStore +from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS + + +@pytest.mark.xfail("go-waku" in NODE_2, reason="Bug reported: https://github.com/waku-org/go-waku/issues/1108") +class TestTopics(StepsStore): + @pytest.fixture(scope="function", autouse=True) + def topics_setup(self, node_setup): + self.message_hash_list = [] + for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: + message = self.create_message(contentTopic=content_topic) + self.publish_message(message=message) + self.message_hash_list.append(self.compute_message_hash(self.test_pubsub_topic, message)) + + def test_store_with_one_content_topic(self): + for node in self.store_nodes: + for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS): + store_response = node.get_store_messages(content_topics=content_topic, page_size=20, ascending="true") + assert len(store_response["messages"]) == 1, "Message count mismatch" + assert ( + store_response["messages"][0]["messageHash"] == self.message_hash_list[index] + ), "Incorrect messaged filtered based on content topic" + + def test_store_with_multiple_content_topics(self): + for node in self.store_nodes: + store_response = node.get_store_messages( + content_topics=f"{CONTENT_TOPICS_DIFFERENT_SHARDS[0]},{CONTENT_TOPICS_DIFFERENT_SHARDS[4]}", page_size=20, ascending="true" + ) + assert len(store_response["messages"]) == 2, "Message count mismatch" + assert ( + store_response["messages"][0]["messageHash"] == self.message_hash_list[0] + ), "Incorrect messaged filtered based on multiple content topics" + assert ( + store_response["messages"][1]["messageHash"] == self.message_hash_list[4] + ), "Incorrect messaged filtered based on multiple content topics" + + def test_store_with_unknown_content_topic(self): + for node in self.store_nodes: + store_response = node.get_store_messages(content_topics="test", page_size=20, ascending="true") + assert len(store_response["messages"]) == 0, "Message count mismatch" + + def test_store_with_unknown_pubsub_topic(self): + for node in self.store_nodes: + store_response = node.get_store_messages(pubsub_topic="test", page_size=20, ascending="true") + assert len(store_response["messages"]) == 0, "Message count mismatch" + + def test_store_with_both_pubsub_topic_and_content_topic(self): + for node in self.store_nodes: + for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS): + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, content_topics=content_topic, page_size=20, ascending="true" + ) + assert len(store_response["messages"]) == 1, "Message count mismatch" + assert ( + store_response["messages"][0]["messageHash"] == self.message_hash_list[index] + ), "Incorrect messaged filtered based on content topic" + + def test_store_with_unknown_pubsub_topic_but_known_content_topic(self): + for node in self.store_nodes: + store_response = node.get_store_messages( + pubsub_topic="test", content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS[0], page_size=20, ascending="true" + ) + assert len(store_response["messages"]) == 0, "Message count mismatch" + + def test_store_with_both_pubsub_topic_and_content_topic(self): + for node in self.store_nodes: + for index, content_topic in enumerate(CONTENT_TOPICS_DIFFERENT_SHARDS): + store_response = node.get_store_messages( + pubsub_topic=self.test_pubsub_topic, content_topics=content_topic, page_size=20, ascending="true" + ) + assert len(store_response["messages"]) == 1, "Message count mismatch" + assert ( + store_response["messages"][0]["messageHash"] == self.message_hash_list[index] + ), "Incorrect messaged filtered based on content topic" + + def test_store_without_pubsub_topic_and_content_topic(self): + for node in self.store_nodes: + store_response = node.get_store_messages(page_size=20, ascending="true") + assert len(store_response["messages"]) == len(CONTENT_TOPICS_DIFFERENT_SHARDS), "Message count mismatch"