From 826a717a0c67619ec250cec7be31db5da721b164 Mon Sep 17 00:00:00 2001 From: EmilioCarrion <54892804+EmilioCarrion@users.noreply.github.com> Date: Tue, 2 May 2023 17:11:15 +0200 Subject: [PATCH] Implemented setting the gc region to new topics (#249) * Add test to add region to topic creation * Implemented setting the gc region to new topics * Fix linting * Fix test * Fix linting * Fix test --------- Co-authored-by: jponzvan --- docs/settings.rst | 8 ++++++++ rele/__main__.py | 2 +- rele/client.py | 13 ++++++++++++- rele/config.py | 2 +- rele/worker.py | 8 +++++++- tests/commands/test_runrele.py | 8 ++++++-- tests/conftest.py | 6 +++++- tests/test_client.py | 9 ++++++++- tests/test_worker.py | 5 ++++- 9 files changed, 52 insertions(+), 9 deletions(-) diff --git a/docs/settings.rst b/docs/settings.rst index 68acda1..27c2830 100644 --- a/docs/settings.rst +++ b/docs/settings.rst @@ -31,6 +31,7 @@ Example:: 'PUBLISHER_TIMEOUT': 3.0, 'FILTER_SUBS_BY': boolean_function, 'DEFAULT_RETRY_POLICY': RetryPolicy(10, 50), + 'GC_STORAGE_REGION': 'europe-west1', } ``GC_PROJECT_ID`` @@ -187,3 +188,10 @@ A RetryPolicy object which must be instantiated with `minimum_backoff` and `maxi If not set, the default retry policy is applied, meaning a minimum backoff of 10 seconds and a maximum backoff of 60 seconds. This generally implies that messages will be retried as soon as possible for healthy subscribers. RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message. + +``GC_STORAGE_REGION`` +---------------------------- + +**Optional** + +Set the Google Cloud's region for storing the messages. By default is `europe-west1` diff --git a/rele/__main__.py b/rele/__main__.py index 743879f..282469a 100644 --- a/rele/__main__.py +++ b/rele/__main__.py @@ -3,7 +3,7 @@ import os import sys -from rele import config, discover, subscription +from rele import config, discover from rele.worker import create_and_run logger = logging.getLogger(__name__) diff --git a/rele/client.py b/rele/client.py index 47d381b..7217609 100644 --- a/rele/client.py +++ b/rele/client.py @@ -9,6 +9,7 @@ from google.api_core import exceptions from google.cloud import pubsub_v1 from google.protobuf import duration_pb2 +from google.pubsub_v1 import MessageStoragePolicy from google.pubsub_v1 import RetryPolicy as GCloudRetryPolicy from rele.middleware import run_middleware_hook @@ -37,6 +38,7 @@ class Subscriber: :param gc_project_id: str :ref:`settings_project_id` . :param credentials: obj :meth:`~rele.config.Config.credentials`. + :param message_storage_policy: str Region to store the messages :param default_ack_deadline: int Ack Deadline defined in settings :param default_retry_policy: RetryPolicy Rele's RetryPolicy defined in settings """ @@ -45,12 +47,14 @@ def __init__( self, gc_project_id, credentials, + message_storage_policy, default_ack_deadline=None, default_retry_policy=None, ): self._gc_project_id = gc_project_id self._ack_deadline = default_ack_deadline or DEFAULT_ACK_DEADLINE self.credentials = credentials if not USE_EMULATOR else None + self._message_storage_policy = message_storage_policy self._client = pubsub_v1.SubscriberClient(credentials=credentials) self._retry_policy = default_retry_policy @@ -83,7 +87,14 @@ def create_subscription(self, subscription): def _create_topic(self, topic_path): publisher_client = pubsub_v1.PublisherClient(credentials=self.credentials) - return publisher_client.create_topic(request={"name": topic_path}) + return publisher_client.create_topic( + request={ + "name": topic_path, + "message_storage_policy": MessageStoragePolicy( + {"allowed_persistence_regions": [self._message_storage_policy]} + ), + } + ) def _create_subscription(self, subscription_path, topic_path, subscription): request = { diff --git a/rele/config.py b/rele/config.py index 319acb1..0dd4ca4 100644 --- a/rele/config.py +++ b/rele/config.py @@ -1,6 +1,5 @@ import importlib import os -import warnings from google.oauth2 import service_account @@ -29,6 +28,7 @@ class Config: def __init__(self, setting): self._gc_project_id = setting.get("GC_PROJECT_ID") self.gc_credentials_path = setting.get("GC_CREDENTIALS_PATH") + self.gc_storage_region = setting.get("GC_STORAGE_REGION", "europe-west1") self.app_name = setting.get("APP_NAME") self.sub_prefix = setting.get("SUB_PREFIX") self.middleware = setting.get("MIDDLEWARE", default_middleware) diff --git a/rele/worker.py b/rele/worker.py index 82cd1c6..d5cd3a0 100644 --- a/rele/worker.py +++ b/rele/worker.py @@ -27,12 +27,17 @@ def __init__( subscriptions, gc_project_id=None, credentials=None, + gc_storage_region=None, default_ack_deadline=None, threads_per_subscription=None, default_retry_policy=None, ): self._subscriber = Subscriber( - gc_project_id, credentials, default_ack_deadline, default_retry_policy + gc_project_id, + credentials, + gc_storage_region, + default_ack_deadline, + default_retry_policy, ) self._futures = {} self._subscriptions = subscriptions @@ -153,6 +158,7 @@ def create_and_run(subs, config): subs, config.gc_project_id, config.credentials, + config.gc_storage_region, config.ack_deadline, config.threads_per_subscription, config.retry_policy, diff --git a/tests/commands/test_runrele.py b/tests/commands/test_runrele.py index ab03e00..3ae6124 100644 --- a/tests/commands/test_runrele.py +++ b/tests/commands/test_runrele.py @@ -22,7 +22,9 @@ def mock_worker(self): def test_calls_worker_start_and_setup_when_runrele(self, mock_worker): call_command("runrele") - mock_worker.assert_called_with([], "rele-test", ANY, 60, 2, None) + mock_worker.assert_called_with( + [], "rele-test", ANY, "europe-west1", 60, 2, None + ) mock_worker.return_value.run_forever.assert_called_once_with() def test_prints_warning_when_conn_max_age_not_set_to_zero( @@ -37,5 +39,7 @@ def test_prints_warning_when_conn_max_age_not_set_to_zero( "This may result in slots for database connections to " "be exhausted." in err ) - mock_worker.assert_called_with([], "rele-test", ANY, 60, 2, None) + mock_worker.assert_called_with( + [], "rele-test", ANY, "europe-west1", 60, 2, None + ) mock_worker.return_value.run_forever.assert_called_once_with() diff --git a/tests/conftest.py b/tests/conftest.py index 2e7464b..2668ee0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,6 +27,7 @@ def config(project_id): "APP_NAME": "rele", "SUB_PREFIX": "rele", "GC_CREDENTIALS_PATH": "tests/dummy-pub-sub-credentials.json", + "GC_STORAGE_REGION": "some-region", "MIDDLEWARE": ["rele.contrib.LoggingMiddleware"], } ) @@ -39,6 +40,7 @@ def config_with_retry_policy(project_id): "APP_NAME": "rele", "SUB_PREFIX": "rele", "GC_CREDENTIALS_PATH": "tests/dummy-pub-sub-credentials.json", + "GC_STORAGE_REGION": "some-region", "MIDDLEWARE": ["rele.contrib.LoggingMiddleware"], "DEFAULT_RETRY_POLICY": RetryPolicy(5, 30), } @@ -47,7 +49,9 @@ def config_with_retry_policy(project_id): @pytest.fixture def subscriber(project_id, config): - return Subscriber(config.gc_project_id, config.credentials, 60) + return Subscriber( + config.gc_project_id, config.credentials, config.gc_storage_region, 60 + ) @pytest.fixture diff --git a/tests/test_client.py b/tests/test_client.py index 53581bd..c0c4b9f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -9,6 +9,7 @@ from google.cloud import pubsub_v1 from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient from google.protobuf import duration_pb2 +from google.pubsub_v1 import MessageStoragePolicy from rele import Subscriber from rele.retry_policy import RetryPolicy @@ -279,7 +280,12 @@ def test_creates_topic_when_subscription_topic_does_not_exist( assert _mocked_client.call_count == 2 mock_create_topic.assert_called_with( - request={"name": f"projects/rele-test/topics/{project_id}-test-topic"} + request={ + "name": f"projects/rele-test/topics/{project_id}-test-topic", + "message_storage_policy": MessageStoragePolicy( + {"allowed_persistence_regions": ["some-region"]} + ), + } ) _mocked_client.assert_called_with( @@ -328,6 +334,7 @@ def test_default_retry_policy_is_applied_when_not_explicitly_provided( subscriber = Subscriber( config_with_retry_policy.gc_project_id, config_with_retry_policy.credentials, + config_with_retry_policy.gc_storage_region, 60, config_with_retry_policy.retry_policy, ) diff --git a/tests/test_worker.py b/tests/test_worker.py index bd3b1d5..149c9d0 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -15,7 +15,7 @@ @sub(topic="some-cool-topic", prefix="rele") def sub_stub(data, **kwargs): - print(f"I am a task doing stuff.") + print("I am a task doing stuff.") @pytest.fixture @@ -25,6 +25,7 @@ def worker(config): subscriptions, config.gc_project_id, config.credentials, + config.gc_storage_region, default_ack_deadline=60, threads_per_subscription=10, default_retry_policy=config.retry_policy, @@ -105,6 +106,7 @@ def test_creates_subscription_with_custom_ack_deadline_from_environment( worker = Worker( subscriptions, config.gc_project_id, + config.gc_storage_region, config.credentials, custom_ack_deadline, threads_per_subscription=10, @@ -168,6 +170,7 @@ def test_waits_forever_when_called_with_config_and_subs( subscriptions, "rele-test", ANY, + "some-region", 60, 2, RetryPolicy(5, 30),