Skip to content

Commit

Permalink
Make init_kafka_producer handle empty broker URL
Browse files Browse the repository at this point in the history
This is for the case of letting app initialization run even if a
producer hasn't been configured. In this scenario, the app would need to
explicitly test that safkir/kafka_producer is not None whenever it is
being tested.
  • Loading branch information
jonathansick committed Mar 12, 2020
1 parent f84f178 commit 12708e7
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions src/safir/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ async def init_kafka_producer(app: Application) -> AsyncGenerator:
This initializer adds an `aiokafka.AIOKafkaProducer` instance to the
``app`` under the ``safir/kafka_producer`` key.
If the ``kafka_broker_url`` configuration key has a value of `None`, then
the value of ``safir/kafka_producer`` is `None`.
Examples
--------
Use this function as a `cleanup context
Expand All @@ -124,21 +127,30 @@ async def init_kafka_producer(app: Application) -> AsyncGenerator:
producer = app["safir/kafka_producer"]
"""
# Startup phase
logger = structlog.get_logger(app["safir/config"].logger_name)
logger.info("Starting Kafka producer")
producer = AIOKafkaProducer(
loop=asyncio.get_running_loop(),
bootstrap_servers=app["safir/config"].kafka_broker_url,
ssl_context=app["safir/kafka_ssl_context"],
security_protocol=app["safir/kafka_protocol"],
)
await producer.start()
app["safir/kafka_producer"] = producer
logger.info("Finished starting Kafka producer")

# Startup phase
kafka_broker_url = app["safir/config"].kafka_broker_url

if kafka_broker_url is None:
logger.info("Skipping Kafka producer initialization")
producer = None

else:
logger.info("Starting Kafka producer")
producer = AIOKafkaProducer(
loop=asyncio.get_running_loop(),
bootstrap_servers=kafka_broker_url,
ssl_context=app["safir/kafka_ssl_context"],
security_protocol=app["safir/config"].kafka_protocol,
)
await producer.start()
app["safir/kafka_producer"] = producer
logger.info("Finished starting Kafka producer")

yield

# Cleanup phase
logger.info("Shutting down Kafka producer")
await producer.stop()
if producer is not None:
logger.info("Shutting down Kafka producer")
await producer.stop()

0 comments on commit 12708e7

Please sign in to comment.