diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b12d8d96..f1f74889 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,20 @@ Unreleased * +[0.5.0] - 2022-08-29 +******************** + +Changed +======= + +* **Breaking changes** in the producer module, refactored to expose a better API: + + * Rather than `send_to_event_bus(...)`, relying code should now call `get_producer().send(...)`. + * The `sync` kwarg is gone; to flush and sync messages before shutdown, call `get_producer().pre_shutdown()` instead. + +* Clarify that config module is for internal use only. +* Implementation changes: Only a single Producer is created, and is used for all signals. + [0.4.3] - 2022-08-24 ******************** diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index ff0a68ea..4f23f3a2 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -2,4 +2,4 @@ Kafka implementation for Open edX event bus. """ -__version__ = '0.4.3' +__version__ = '0.5.0' diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py index e73c4ba7..fc76bc64 100644 --- a/edx_event_bus_kafka/config.py +++ b/edx_event_bus_kafka/config.py @@ -1,5 +1,7 @@ """ Configuration loading and validation. + +This module is for internal use only. """ import warnings diff --git a/edx_event_bus_kafka/management/commands/produce_event.py b/edx_event_bus_kafka/management/commands/produce_event.py index 162cf2c6..c1d8e666 100644 --- a/edx_event_bus_kafka/management/commands/produce_event.py +++ b/edx_event_bus_kafka/management/commands/produce_event.py @@ -9,7 +9,7 @@ from django.utils.module_loading import import_string from openedx_events.tooling import OpenEdxPublicSignal -from edx_event_bus_kafka.publishing.event_producer import send_to_event_bus +from edx_event_bus_kafka.publishing.event_producer import get_producer logger = logging.getLogger(__name__) @@ -53,12 +53,13 @@ def add_arguments(self, parser): def handle(self, *args, **options): try: - send_to_event_bus( + producer = get_producer() + producer.send( signal=import_string(options['signal'][0]), topic=options['topic'][0], event_key_field=options['key_field'][0], event_data=json.loads(options['data'][0]), - sync=True, # otherwise command may exit before delivery is complete ) + producer.pre_shutdown() # otherwise command may exit before delivery is complete except Exception: # pylint: disable=broad-except logger.exception("Error producing Kafka event") diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 1fed3513..22c46843 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -1,13 +1,13 @@ """ Produce Kafka events from signals. -Main function is ``send_to_event_bus``. +Main function is ``get_producer()``. """ import json import logging from functools import lru_cache -from typing import Any, List +from typing import Any, List, Optional from django.dispatch import receiver from django.test.signals import setting_changed @@ -21,8 +21,9 @@ # See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst try: import confluent_kafka - from confluent_kafka import SerializingProducer + from confluent_kafka import Producer from confluent_kafka.schema_registry.avro import AvroSerializer + from confluent_kafka.serialization import MessageField, SerializationContext except ImportError: # pragma: no cover confluent_kafka = None @@ -113,63 +114,22 @@ def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: @lru_cache -def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer: +def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str): """ - Get the serializer for a signal. + Get the key and value serializers for a signal and a key field path. This is cached in order to save work re-transforming classes into Avro schemas. - """ - return AvroSignalSerializer(signal) - - -# Note: This caching is required, since otherwise the Producer will -# fall out of scope and be garbage-collected, destroying the -# outbound-message queue and threads. The use of this cache allows the -# producers to be long-lived. -# -# We are also likely to need to iterate through this cache at server -# shutdown in order to flush each of the producers, which means the -# cache needs to never evict. See https://github.com/openedx/event-bus-kafka/issues/11 -# for more details. -# -# (Why not change the code to use a single Producer rather than multiple -# SerializerProducer? Because the code actually turns out to be significantly -# uglier that way due to the number of separate values that need to be passed -# around in bundles. There aren't clear "cut-off" points. Additionally, it -# makes unit testing harder/uglier since now the mocks need to either deal with -# serialized bytes or mock out the serializers. Getting this down to a single -# Producer doesn't really seem worth the trouble.) - -# return type (Optional[SerializingProducer]) removed from signature to avoid error on import - -@lru_cache(maxsize=None) # Never evict an entry -- it's a small set and we need to keep all of them. -def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str): - """ - Create the producer for a signal and a key field path. - - If essential settings are missing or invalid, warn and return None. Arguments: - signal: The OpenEdxPublicSignal to make a producer for - event_key_field: Path to the event data field to use as the event key (period-delimited - string naming the dictionary keys to descend) + signal: The OpenEdxPublicSignal to make a serializer for. + event_key_field: Path to descend in the signal schema to find the subschema for the key + (period-delimited string naming the field names to descend). + Returns: - None if confluent_kafka is not defined or the settings are invalid. - SerializingProducer if it is. + 2-tuple of AvroSignalSerializers, for event key and value """ - if not confluent_kafka: # pragma: no cover - logger.warning('Library confluent-kafka not available. Cannot create event producer.') - return None - - schema_registry_client = get_schema_registry_client() - if schema_registry_client is None: - return None - - producer_settings = load_common_settings() - if producer_settings is None: - return None - - signal_serializer = get_serializer(signal) + client = get_schema_registry_client() + signal_serializer = AvroSignalSerializer(signal) def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument """Tells Avro how to turn objects into dictionaries.""" @@ -178,21 +138,95 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument # Serializers for key and value components of Kafka event key_serializer = AvroSerializer( schema_str=extract_key_schema(signal_serializer, event_key_field), - schema_registry_client=schema_registry_client, + schema_registry_client=client, to_dict=inner_to_dict, ) value_serializer = AvroSerializer( schema_str=signal_serializer.schema_string(), - schema_registry_client=schema_registry_client, + schema_registry_client=client, to_dict=inner_to_dict, ) - producer_settings.update({ - 'key.serializer': key_serializer, - 'value.serializer': value_serializer, - }) + return key_serializer, value_serializer + + +class EventProducerKafka(): + """ + API singleton for event production to Kafka. + + This is just a wrapper around a confluent_kafka Producer that knows how to + serialize a signal to event wire format. + + Only one instance (of Producer or this wrapper) should be created, + since it is stateful and needs lifecycle management. + """ + + def __init__(self, producer): + self.producer = producer + + def send( + self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, + ) -> None: + """ + Send a signal event to the event bus under the specified topic. + + Arguments: + signal: The original OpenEdxPublicSignal the event was sent to + topic: The event bus topic for the event + event_key_field: Path to the event data field to use as the event key (period-delimited + string naming the dictionary keys to descend) + event_data: The event data (kwargs) sent to the signal + """ + event_key = extract_event_key(event_data, event_key_field) + headers = {EVENT_TYPE_HEADER_KEY: signal.event_type} + + key_serializer, value_serializer = get_serializers(signal, event_key_field) + key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers)) + value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers)) + + self.producer.produce( + topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver, + ) + + # Opportunistically ensure any pending callbacks from recent event-sends are triggered. + # + # This assumes events come regularly, or that we're not concerned about + # high latency between delivery and callback. If those assumptions are + # false, we should switch to calling poll(1.0) or similar in a loop on + # a separate thread. Or do both. + # + # Issue: https://github.com/openedx/event-bus-kafka/issues/31 + self.producer.poll(0) + + def pre_shutdown(self): + """ + Prepare producer for a clean shutdown. - return SerializingProducer(producer_settings) + Flush pending outbound events, wait for acknowledgement, and process callbacks. + """ + self.producer.flush(-1) + + +# Note: This caching is required, since otherwise the Producer will +# fall out of scope and be garbage-collected, destroying the +# outbound-message queue and threads. The use of this cache allows the +# producer to be long-lived. +@lru_cache +def get_producer() -> Optional[EventProducerKafka]: + """ + Create or retrieve Producer API singleton. + + If confluent-kafka library or essential settings are missing, warn and return None. + """ + if not confluent_kafka: # pragma: no cover + logger.warning('Library confluent-kafka not available. Cannot create event producer.') + return None + + producer_settings = load_common_settings() + if producer_settings is None: + return None + + return EventProducerKafka(Producer(producer_settings)) def on_event_deliver(err, evt): @@ -214,51 +248,8 @@ def on_event_deliver(err, evt): f"partition={evt.partition()}") -def send_to_event_bus( - signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, - sync: bool = False, -) -> None: - """ - Send a signal event to the event bus under the specified topic. - - If the Kafka settings are missing or invalid, return with a warning. - - Arguments: - signal: The original OpenEdxPublicSignal the event was sent to - topic: The event bus topic for the event - event_key_field: Path to the event data field to use as the event key (period-delimited - string naming the dictionary keys to descend) - event_data: The event data (kwargs) sent to the signal - sync: Whether to wait indefinitely for event to be received by the message bus (probably - only want to use this for testing) - """ - producer = get_producer_for_signal(signal, event_key_field) - if producer is None: # Note: SerializingProducer has False truthiness when len() == 0 - return - - event_key = extract_event_key(event_data, event_key_field) - producer.produce(topic, key=event_key, value=event_data, - on_delivery=on_event_deliver, - headers={EVENT_TYPE_HEADER_KEY: signal.event_type}) - - if sync: - # Wait for all buffered events to send, then wait for all of - # them to be acknowledged, and trigger all callbacks. - producer.flush(-1) - else: - # Opportunistically ensure any pending callbacks from recent events are triggered. - # - # This assumes events come regularly, or that we're not concerned about - # high latency between delivery and callback. If those assumptions are - # false, we should switch to calling poll(1.0) or similar in a loop on - # a separate thread. - # - # Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 - producer.poll(0) - - @receiver(setting_changed) def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument """Reset caches during testing when settings change.""" - get_serializer.cache_clear() - get_producer_for_signal.cache_clear() + get_serializers.cache_clear() + get_producer.cache_clear() diff --git a/edx_event_bus_kafka/publishing/test_event_producer.py b/edx_event_bus_kafka/publishing/test_event_producer.py index 81521bc3..5990bb1b 100644 --- a/edx_event_bus_kafka/publishing/test_event_producer.py +++ b/edx_event_bus_kafka/publishing/test_event_producer.py @@ -14,12 +14,6 @@ import edx_event_bus_kafka.publishing.event_producer as ep -# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst -try: - from confluent_kafka import SerializingProducer -except ImportError: # pragma: no cover - pass - class TestEventProducer(TestCase): """Test producer.""" @@ -61,18 +55,16 @@ def test_extract_key_schema(self): schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username') assert schema == '{"name": "username", "type": "string"}' - def test_get_producer_for_signal_unconfigured(self): + def test_get_producer_unconfigured(self): """With missing essential settings, just warn and return None.""" - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter('always') - assert ep.get_producer_for_signal(signal, 'user.id') is None + assert ep.get_producer() is None assert len(caught_warnings) == 1 assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ") - def test_get_producer_for_signal_configured(self): + def test_get_producer_configured(self): """Creation succeeds when all settings are present.""" - signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with override_settings( EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key', @@ -82,7 +74,7 @@ def test_get_producer_for_signal_configured(self): EVENT_BUS_KAFKA_API_KEY='some_other_key', EVENT_BUS_KAFKA_API_SECRET='some_other_secret', ): - assert isinstance(ep.get_producer_for_signal(signal, 'user.id'), SerializingProducer) + assert isinstance(ep.get_producer(), ep.EventProducerKafka) @patch('edx_event_bus_kafka.publishing.event_producer.logger') def test_on_event_deliver(self, mock_logger): @@ -99,7 +91,15 @@ def test_on_event_deliver(self, mock_logger): 'Event delivered to topic some_topic; key=some_key; partition=some_partition' ) - def test_send_to_event_bus(self): + # Mock out the serializers for this one so we don't have to deal with expected Avro bytes + @patch( + 'edx_event_bus_kafka.publishing.event_producer.get_serializers', autospec=True, + return_value=( + lambda _key, _ctx: b'key-bytes-here', + lambda _value, _ctx: b'value-bytes-here', + ) + ) + def test_send_to_event_bus(self, mock_get_serializers): signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED event_data = { 'user': UserData( @@ -113,12 +113,18 @@ def test_send_to_event_bus(self): ) } - mock_producer = MagicMock() - with patch('edx_event_bus_kafka.publishing.event_producer.get_producer_for_signal', return_value=mock_producer): - ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data) + with override_settings( + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321', + ): + producer_api = ep.get_producer() + with patch.object(producer_api, 'producer', autospec=True) as mock_producer: + producer_api.send(signal=signal, topic='user_stuff', event_key_field='user.id', event_data=event_data) + + mock_get_serializers.assert_called_once_with(signal, 'user.id') mock_producer.produce.assert_called_once_with( - 'user_stuff', key=123, value=event_data, + 'user_stuff', key=b'key-bytes-here', value=b'value-bytes-here', on_delivery=ep.on_event_deliver, headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}, )