From 09a0a4e249524fb83bf4782a0ae106b2a4c48a8e Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Wed, 27 Jul 2022 14:16:51 -0400 Subject: [PATCH] fix: Use non-blocking poll after production; allow flush for testing (#12) I believe a nullary call to poll uses a default timeout of -1 (wait indefinitely), but we really just want to make sure that pending callbacks are triggered for the acks that have been buffered in the background, from previous events. poll(0) will not block if the buffer is empty. For testing we need to call flush(-1), so add sync=True as an option. Documentation for `rd_kafka_poll`: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 This addresses part of https://github.com/openedx/event-bus-kafka/issues/10 --- .../management/commands/produce_event.py | 1 + .../publishing/event_producer.py | 25 ++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/edx_event_bus_kafka/management/commands/produce_event.py b/edx_event_bus_kafka/management/commands/produce_event.py index 5d0cd16..4afd3b2 100644 --- a/edx_event_bus_kafka/management/commands/produce_event.py +++ b/edx_event_bus_kafka/management/commands/produce_event.py @@ -59,6 +59,7 @@ def handle(self, *args, **options): topic=options['topic'][0], event_key_field=options['key_field'][0], event_data=json.loads(options['data'][0]), + sync=True, # otherwise command may exit before delivery is complete ) except Exception: # pylint: disable=broad-except logger.exception("Error producing Kafka event") diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 285cab4..52c4c6d 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -203,7 +203,10 @@ def on_event_deliver(err, evt): f"partition={evt.partition()}") -def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict) -> None: +def send_to_event_bus( + signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, + sync: bool = False, +) -> None: """ Send a signal event to the event bus under the specified topic. @@ -215,6 +218,8 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: event_key_field: Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend) event_data: The event data (kwargs) sent to the signal + sync: Whether to wait indefinitely for event to be received by the message bus (probably + only want to use this for testing) """ producer = get_producer_for_signal(signal, event_key_field) if producer is None: # Note: SerializingProducer has False truthiness when len() == 0 @@ -224,6 +229,18 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: producer.produce(topic, key=event_key, value=event_data, on_delivery=on_event_deliver, headers={EVENT_TYPE_HEADER_KEY: signal.event_type}) - # TODO (EventBus): Investigate poll() vs. flush(), and other related settings: - # See https://github.com/openedx/event-bus-kafka/issues/10 - producer.poll() # wait indefinitely for the above event to either be delivered or fail + + if sync: + # Wait for all buffered events to send, then wait for all of + # them to be acknowledged, and trigger all callbacks. + producer.flush(-1) + else: + # Opportunistically ensure any pending callbacks from recent events are triggered. + # + # This assumes events come regularly, or that we're not concerned about + # high latency between delivery and callback. If those assumptions are + # false, we should switch to calling poll(1.0) or similar in a loop on + # a separate thread. + # + # Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 + producer.poll(0)