From 12708e77d08b9f9794667ba67f944f3d9990fb3d Mon Sep 17 00:00:00 2001 From: Jonathan Sick Date: Wed, 11 Mar 2020 17:52:22 -0400 Subject: [PATCH] Make init_kafka_producer handle empty broker URL This is for the case of letting app initialization run even if a producer hasn't been configured. In this scenario, the app would need to explicitly test that safkir/kafka_producer is not None whenever it is being tested. --- src/safir/events.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/safir/events.py b/src/safir/events.py index 25d71a80..fc66b2ef 100644 --- a/src/safir/events.py +++ b/src/safir/events.py @@ -113,6 +113,9 @@ async def init_kafka_producer(app: Application) -> AsyncGenerator: This initializer adds an `aiokafka.AIOKafkaProducer` instance to the ``app`` under the ``safir/kafka_producer`` key. + If the ``kafka_broker_url`` configuration key has a value of `None`, then + the value of ``safir/kafka_producer`` is `None`. + Examples -------- Use this function as a `cleanup context @@ -124,21 +127,30 @@ async def init_kafka_producer(app: Application) -> AsyncGenerator: producer = app["safir/kafka_producer"] """ - # Startup phase logger = structlog.get_logger(app["safir/config"].logger_name) - logger.info("Starting Kafka producer") - producer = AIOKafkaProducer( - loop=asyncio.get_running_loop(), - bootstrap_servers=app["safir/config"].kafka_broker_url, - ssl_context=app["safir/kafka_ssl_context"], - security_protocol=app["safir/kafka_protocol"], - ) - await producer.start() - app["safir/kafka_producer"] = producer - logger.info("Finished starting Kafka producer") + + # Startup phase + kafka_broker_url = app["safir/config"].kafka_broker_url + + if kafka_broker_url is None: + logger.info("Skipping Kafka producer initialization") + producer = None + + else: + logger.info("Starting Kafka producer") + producer = AIOKafkaProducer( + loop=asyncio.get_running_loop(), + bootstrap_servers=kafka_broker_url, + ssl_context=app["safir/kafka_ssl_context"], + security_protocol=app["safir/config"].kafka_protocol, + ) + await producer.start() + app["safir/kafka_producer"] = producer + logger.info("Finished starting Kafka producer") yield # Cleanup phase - logger.info("Shutting down Kafka producer") - await producer.stop() + if producer is not None: + logger.info("Shutting down Kafka producer") + await producer.stop()