diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 31434e0c..949afc79 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,8 +16,25 @@ Unreleased * +[0.3.0] - 2022-08-10 +~~~~~~~~~~~~~~~~~~~~ + +Updated +_______ + +* Moved configuration onto separate file. +* Updated configuration settings to have EVENT_BUS_KAFKA prefix. + +[0.2.0] - 2022-08-09 +~~~~~~~~~~~~~~~~~~~~ + +Fixed +_____ + +* Cache producers so that they don't lose data. + [0.1.0] - 2022-06-16 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~ Added _____ diff --git a/README.rst b/README.rst index eda9670b..0a8740f3 100644 --- a/README.rst +++ b/README.rst @@ -44,7 +44,7 @@ One Time Setup .. code-block:: # Clone the repository - git clone git@github.com:edx/event-bus-kafka.git + git clone git@github.com:openedx/event-bus-kafka.git cd event-bus-kafka # Set up a virtualenv using virtualenvwrapper with the same name as the repo and activate it @@ -131,26 +131,26 @@ For more information about these options, see the `Getting Help`_ page. .. _community Slack workspace: https://openedx.slack.com/ .. _Getting Help: https://openedx.org/getting-help -.. |pypi-badge| image:: https://img.shields.io/pypi/v/event-bus-kafka.svg - :target: https://pypi.python.org/pypi/event-bus-kafka/ +.. |pypi-badge| image:: https://img.shields.io/pypi/v/edx-event-bus-kafka.svg + :target: https://pypi.python.org/pypi/edx-event-bus-kafka/ :alt: PyPI -.. |ci-badge| image:: https://github.com/edx/event-bus-kafka/workflows/Python%20CI/badge.svg?branch=main - :target: https://github.com/edx/event-bus-kafka/actions +.. |ci-badge| image:: https://github.com/openedx/event-bus-kafka/workflows/Python%20CI/badge.svg?branch=main + :target: https://github.com/openedx/event-bus-kafka/actions :alt: CI -.. |codecov-badge| image:: https://codecov.io/github/edx/event-bus-kafka/coverage.svg?branch=main - :target: https://codecov.io/github/edx/event-bus-kafka?branch=main +.. |codecov-badge| image:: https://codecov.io/github/openedx/event-bus-kafka/coverage.svg?branch=main + :target: https://codecov.io/github/openedx/event-bus-kafka?branch=main :alt: Codecov -.. |doc-badge| image:: https://readthedocs.org/projects/event-bus-kafka/badge/?version=latest - :target: https://event-bus-kafka.readthedocs.io/en/latest/ +.. |doc-badge| image:: https://readthedocs.org/projects/edx-event-bus-kafka/badge/?version=latest + :target: https://edx-event-bus-kafka.readthedocs.io/en/latest/ :alt: Documentation -.. |pyversions-badge| image:: https://img.shields.io/pypi/pyversions/event-bus-kafka.svg - :target: https://pypi.python.org/pypi/event-bus-kafka/ +.. |pyversions-badge| image:: https://img.shields.io/pypi/pyversions/edx-event-bus-kafka.svg + :target: https://pypi.python.org/pypi/edx-event-bus-kafka/ :alt: Supported Python versions -.. |license-badge| image:: https://img.shields.io/github/license/edx/event-bus-kafka.svg - :target: https://github.com/edx/event-bus-kafka/blob/main/LICENSE.txt +.. |license-badge| image:: https://img.shields.io/github/license/openedx/event-bus-kafka.svg + :target: https://github.com/openedx/event-bus-kafka/blob/main/LICENSE.txt :alt: License diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index 6ea8237e..69e91584 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -2,4 +2,4 @@ Kafka implementation for Open edX event bus. """ -__version__ = '0.1.1' +__version__ = '0.3.0' diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py new file mode 100644 index 00000000..c2fed42b --- /dev/null +++ b/edx_event_bus_kafka/config.py @@ -0,0 +1,54 @@ +""" +Configuration loading and validation. +""" + +import warnings +from typing import Optional + +from confluent_kafka.schema_registry import SchemaRegistryClient +from django.conf import settings + + +def create_schema_registry_client() -> Optional[SchemaRegistryClient]: + """ + Create a schema registry client from common settings. + """ + url = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL', None) + if url is None: + warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL") + return None + + key = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY', '') + secret = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET', '') + + return SchemaRegistryClient({ + 'url': url, + 'basic.auth.user.info': f"{key}:{secret}", + }) + + +def load_common_settings() -> Optional[dict]: + """ + Load common settings, a base for either producer or consumer configuration. + """ + bootstrap_servers = getattr(settings, 'EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS', None) + if bootstrap_servers is None: + warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS") + return None + + base_settings = { + 'bootstrap.servers': bootstrap_servers, + } + + key = getattr(base_settings, 'EVENT_BUS_KAFKA_API_KEY', None) + secret = getattr(base_settings, 'EVENT_BUS_KAFKA_API_SECRET', None) + + if key and secret: + base_settings.update({ + 'sasl.mechanism': 'PLAIN', + 'security.protocol': 'SASL_SSL', + 'sasl.username': key, + 'sasl.password': secret, + }) + + return base_settings diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 52c4c6db..d9cfdafd 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -6,17 +6,16 @@ import json import logging -import warnings from functools import lru_cache from typing import Any, List, Optional from confluent_kafka import SerializingProducer -from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer -from django.conf import settings from openedx_events.event_bus.avro.serializer import AvroSignalSerializer from openedx_events.tooling import OpenEdxPublicSignal +from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings + logger = logging.getLogger(__name__) # CloudEvent standard name for the event type header, see @@ -115,6 +114,11 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer: return AvroSignalSerializer(signal) +# Note: This caching is required, since otherwise the Producer will +# fall out of scope and be garbage-collected, destroying the +# outbound-message queue and threads. The use of this cache allows the +# producers to be long-lived. +@lru_cache def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]: """ Create the producer for a signal and a key field path. @@ -131,33 +135,14 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) - remote-config (and in particular does not result in mixed cache/uncached configuration). This complexity is being deferred until this becomes a performance issue. """ - if schema_registry_url := getattr(settings, 'SCHEMA_REGISTRY_URL', None): - schema_registry_config = { - 'url': schema_registry_url, - 'basic.auth.user.info': f"{getattr(settings, 'SCHEMA_REGISTRY_API_KEY', '')}" - f":{getattr(settings, 'SCHEMA_REGISTRY_API_SECRET', '')}", - } - else: - warnings.warn("Cannot configure event-bus-kafka: Missing setting SCHEMA_REGISTRY_URL") + schema_registry_client = create_schema_registry_client() + if schema_registry_client is None: return None - if bootstrap_servers := getattr(settings, 'KAFKA_BOOTSTRAP_SERVERS', None): - producer_settings = { - 'bootstrap.servers': bootstrap_servers, - } - else: - warnings.warn("Cannot configure event-bus-kafka: Missing setting KAFKA_BOOTSTRAP_SERVERS") + producer_settings = load_common_settings() + if producer_settings is None: return None - if getattr(settings, 'KAFKA_API_KEY', None) and getattr(settings, 'KAFKA_API_SECRET', None): - producer_settings.update({ - 'sasl.mechanism': 'PLAIN', - 'security.protocol': 'SASL_SSL', - 'sasl.username': settings.KAFKA_API_KEY, - 'sasl.password': settings.KAFKA_API_SECRET, - }) - - schema_registry_client = SchemaRegistryClient(schema_registry_config) signal_serializer = get_serializer(signal) def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument diff --git a/edx_event_bus_kafka/publishing/test_event_producer.py b/edx_event_bus_kafka/publishing/test_event_producer.py index 1f5e4b1e..f8cc5c10 100644 --- a/edx_event_bus_kafka/publishing/test_event_producer.py +++ b/edx_event_bus_kafka/publishing/test_event_producer.py @@ -19,6 +19,11 @@ class TestEventProducer(TestCase): """Test producer.""" + def setUp(self): + super().setUp() + ep.get_producer_for_signal.cache_clear() + ep.get_serializer.cache_clear() + def test_extract_event_key(self): event_data = { 'user': UserData( @@ -56,22 +61,23 @@ def test_extract_key_schema(self): schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username') assert schema == '{"name": "username", "type": "string"}' - def test_get_producer_for_signal(self): + def test_get_producer_for_signal_unconfigured(self): + """With missing essential settings, just warn and return None.""" signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED - - # With missing essential settings, just warn and return None with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter('always') assert ep.get_producer_for_signal(signal, 'user.id') is None assert len(caught_warnings) == 1 assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ") - # Creation succeeds when all settings are present + def test_get_producer_for_signal_configured(self): + """Creation succeeds when all settings are present.""" + signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with override_settings( - SCHEMA_REGISTRY_URL='http://localhost:12345', - SCHEMA_REGISTRY_API_KEY='some_key', - SCHEMA_REGISTRY_API_SECRET='some_secret', - KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key', + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET='some_secret', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', # include these just to maximize code coverage KAFKA_API_KEY='some_other_key', KAFKA_API_SECRET='some_other_secret',