Skip to content

Commit

Permalink
feat: Add management command for testing event production; config fixes
Browse files Browse the repository at this point in the history
- Update manual-testing instructions
- Switch to `SESSION_LOGIN_COMPLETED` for the single signal we support
  since it's easier to test with (but may want to choose a different
  one in the future that doesn't involve PII, even fake PII).
- Move test receiver to event_consumer module for simplicity; wrap in
  try-block for safety.
- Tolerate some missing settings in the consumer
- Add unit tests
- Some small control flow changes:
  - Move responsibility for handling None messages from
    `process_single_message` to `consume_indefinitely`
  - Collapse None/empty headers into single case
- Log and docstring tweaks
  • Loading branch information
timmc-edx committed Jul 26, 2022
1 parent 4e81fd7 commit 3e05743
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 102 deletions.
6 changes: 4 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ Kafka implementation for Open edX event bus.
|pypi-badge| |ci-badge| |codecov-badge| |doc-badge| |pyversions-badge|
|license-badge|

Overview (please modify)
------------------------
Overview
--------
This package implements an event bus for Open EdX using Kafka.

The event bus acts as a broker between services publishing events and other services that consume these events.
Expand All @@ -28,6 +28,8 @@ Outside of testing this app, it is best to leave the KAFKA_CONSUMERS_ENABLED set

The repository works together with the openedx/openedx-events repository to make the fully functional event bus.

For manual testing, see `<docs/how_tos/manual_testing.rst>`__.

Documentation
-------------

Expand Down
56 changes: 26 additions & 30 deletions docs/how_tos/manual_testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,32 @@ Manual testing

The producer can be tested manually against a Kafka running in devstack.

#. Create a "unit test" in one of the test files that will actually call Kafka. For example, this could be added to the end of ``edx_event_bus_kafka/publishing/test_event_producer.py``::

def test_actually_send_to_event_bus():
import random
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
# Make events distinguishable
id = random.randrange(1000)
event_data = {
'user': UserData(
id=id,
is_active=True,
pii=UserPersonalData(
username=f'foobob_{id:03}',
email='[email protected]',
name="Bob Foo",
)
)
}

print(f"Sending event with random user ID {id}.")
with override_settings(
SCHEMA_REGISTRY_URL='http://edx.devstack.schema-registry:8081',
KAFKA_BOOTSTRAP_SERVERS='edx.devstack.kafka:29092',
):
ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data)

#. Make or refresh a copy of this repo where it can be seen from inside devstack: ``rsync -sax -delete ./ ../src/event-bus-kafka/``
#. In devstack, start Kafka and the control webapp: ``make dev.up.kafka-control-center`` and watch ``make dev.logs.kafka-control-center`` until server is up and happy (may take a few minutes; watch for ``INFO Kafka startTimeMs``)
#. Load the control center UI: http://localhost:9021/clusters and wait for the cluster to become healthy
#. In devstack, run ``lms-up-without-deps-shell`` to bring up an arbitrary shell inside Docker networking (LMS, in this case)
#. In the LMS shell, run ``pip install -e /edx/src/event-bus-kafka`` and then run whatever test you want, e.g. ``pytest /edx/src/event-bus-kafka/edx_event_bus_kafka/publishing/test_event_producer.py::test_actually_send_to_event_bus``
#. Go to the topic that was created and then into the Messages tab; select offset=0 to make sure you can see messages that were sent before you had the UI open.
#. Rerun ``rsync`` after any edits
#. In edx-platform's ``cms/envs/common.py``:

- Add ``'edx_event_bus_kafka'`` to the ``INSTALLED_APPS`` list
- Add the following::

KAFKA_CONSUMERS_ENABLED = True
KAFKA_BOOTSTRAP_SERVERS = "edx.devstack.kafka:29092"
SCHEMA_REGISTRY_URL = "http://edx.devstack.schema-registry:8081"

#. In devstack, run ``make devpi-up studio-up-without-deps-shell`` to bring up Studio with a shell.
#. In the Studio shell, run ``pip install -e /edx/src/event-bus-kafka``
#. Test the producer:

- Run the example command listed in the ``edx_event_bus_kafka.management.commands.produce_event.Command`` docstring
- Expect to see output that ends with a line containing "Event delivered to topic"
- Go to the topic that was created and then into the Messages tab; select offset=0 to make sure you can see messages that were sent before you had the UI open.

#. Test the consumer:

- Run the example command listed in the ``edx_event_bus_kafka.consumer.event_consumer.ConsumeEventsCommand`` docstring
- Expect to see output that ends with a line containing "Received SESSION_LOGIN_COMPLETED signal with user_data"
- Kill the management command (which would run indefinitely).

#. Rerun rsync after any edits as needed.

(Any IDA should work for testing, but all interactions have to happen inside devstack's networking layer, otherwise Kafka can't talk to itself.)
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/consumer/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ Consumer
Purpose
-------

This half of the library implements event bus consumer patterns using the Confluent Kafka API.
This part of the library implements event bus consumer patterns using the Confluent Kafka API.

During development, this app will be subject to frequent and rapid changes. Outside of testing this app, it is best to leave the KAFKA_CONSUMERS_ENABLED setting off.
102 changes: 66 additions & 36 deletions edx_event_bus_kafka/consumer/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from confluent_kafka.serialization import StringDeserializer
from django.conf import settings
from django.core.management.base import BaseCommand
from django.dispatch import receiver
from edx_toggles.toggles import SettingToggle
from openedx_events.enterprise.signals import SUBSCRIPTION_LICENSE_MODIFIED
from openedx_events.event_bus.avro.deserializer import AvroSignalDeserializer
from openedx_events.learning.data import UserData
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
from openedx_events.tooling import OpenEdxPublicSignal

logger = logging.getLogger(__name__)
Expand All @@ -33,16 +35,21 @@
EVENT_TYPE_HEADER = "ce_type"


def create_consumer(group_id):
def create_consumer(group_id) -> DeserializingConsumer:
"""
Create a consumer for SUBSCRIPTION_LICENSE_MODIFIED events
:param group_id: id of the consumer group this consumer will be part of
:return: DeserializingConsumer
Create a consumer for SESSION_LOGIN_COMPLETED events.
Note: Still needs to be expanded to cover arbitrary events.
Arguments:
group_id: id of the consumer group this consumer will be part of
"""

# TODO (EventBus): Deduplicate settings/client construction against producer code.
KAFKA_SCHEMA_REGISTRY_CONFIG = {
'url': settings.SCHEMA_REGISTRY_URL,
'basic.auth.user.info': f"{settings.SCHEMA_REGISTRY_API_KEY}:{settings.SCHEMA_REGISTRY_API_SECRET}",
'basic.auth.user.info': f"{getattr(settings, 'SCHEMA_REGISTRY_API_KEY', '')}"
f":{getattr(settings, 'SCHEMA_REGISTRY_API_SECRET', '')}",
}

schema_registry_client = SchemaRegistryClient(KAFKA_SCHEMA_REGISTRY_CONFIG)
Expand All @@ -51,13 +58,13 @@ def create_consumer(group_id):
# 1. Reevaluate if all consumers should listen for the earliest unprocessed offset (auto.offset.reset)
# 2. Ensure the signal used in the signal_deserializer is the same one sent over in the message header

signal_deserializer = AvroSignalDeserializer(SUBSCRIPTION_LICENSE_MODIFIED)
signal_deserializer = AvroSignalDeserializer(SESSION_LOGIN_COMPLETED)

def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument
return signal_deserializer.from_dict(event_data_dict)

consumer_config = {
'bootstrap.servers': settings.KAFKA_BOOTSTRAP_SERVER,
'bootstrap.servers': settings.KAFKA_BOOTSTRAP_SERVERS,
'group.id': group_id,
'key.deserializer': StringDeserializer('utf-8'),
'value.deserializer': AvroDeserializer(schema_str=signal_deserializer.schema_string(),
Expand All @@ -66,7 +73,7 @@ def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argume
'auto.offset.reset': 'earliest'
}

if settings.KAFKA_API_KEY and settings.KAFKA_API_SECRET:
if getattr(settings, 'KAFKA_API_KEY', None) and getattr(settings, 'KAFKA_API_SECRET', None):
consumer_config.update({
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
Expand All @@ -79,47 +86,48 @@ def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argume

def emit_signals_from_message(msg):
"""
Determine the correct signal and send the event from the message
Determine the correct signal and send the event from the message.
Arguments:
msg (Message): Consumed message.
"""
if msg.headers():
# TODO (EventBus): iterate on error handling for missing or multiple event_type headers
# (headers() is actually a list of (key, value) tuples rather than a dictionary)
event_types = [value for key, value in msg.headers() if key == EVENT_TYPE_HEADER]
if len(event_types) == 0:
logger.error(f"Missing {EVENT_TYPE_HEADER} header on message, cannot determine signal")
return
if len(event_types) > 1:
logger.error(f"Multiple {EVENT_TYPE_HEADER}s found on message, cannot determine signal")
return
headers = msg.headers() or [] # treat None as []

event_type = event_types[0]
# TODO (EventBus): iterate on error handling for missing or multiple event_type headers
# (headers() is actually a list of (key, value) tuples rather than a dictionary)
event_types = [value for key, value in headers if key == EVENT_TYPE_HEADER]
if len(event_types) == 0:
logger.error(f"Missing {EVENT_TYPE_HEADER} header on message, cannot determine signal")
return
if len(event_types) > 1:
logger.error(f"Multiple {EVENT_TYPE_HEADER} headers found on message, cannot determine signal")
return

# TODO (EventBus): Figure out who is doing the encoding and get the
# right one instead of just guessing utf-8
event_type_str = event_type.decode("utf-8")
try:
signal = OpenEdxPublicSignal.get_signal_by_type(event_type_str)
if signal:
signal.send_event(**msg.value())
except KeyError:
logger.exception(f"Signal not found: {event_type_str}")
event_type = event_types[0]

# TODO (EventBus): Figure out who is doing the encoding and get the
# right one instead of just guessing utf-8
event_type_str = event_type.decode("utf-8")
try:
signal = OpenEdxPublicSignal.get_signal_by_type(event_type_str)
signal.send_event(**msg.value())
except KeyError:
logger.exception(f"Signal not found: {event_type_str}")


def process_single_message(msg):
"""
Emit signal with message data
"""
if msg is None:
return
if msg.error():
# TODO (EventBus): iterate on error handling with retry and dead-letter queue topics
if msg.error().code() == KafkaError._PARTITION_EOF: # pylint: disable=protected-access
# End of partition event
logger.info(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset}")
else:
logger.exception(msg.error())
return
emit_signals_from_message(msg)
else:
emit_signals_from_message(msg)


def consume_indefinitely(topic, group_id):
Expand All @@ -136,7 +144,8 @@ def consume_indefinitely(topic, group_id):
# 2. Determine if there are other errors that shouldn't kill the entire loop
while True:
msg = consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
process_single_message(msg)
if msg is not None:
process_single_message(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
Expand All @@ -152,7 +161,7 @@ class ConsumeEventsCommand(BaseCommand):
is required.
example:
manage.py ... consume_events -t license-event-prod -g license-event-consumers
python3 manage.py cms consume_events -t user-event-debug -g user-event-consumers
# TODO (EventBus): Add pointer to relevant future docs around topics and consumer groups, and potentially
update example topic and group names to follow any future naming conventions.
Expand Down Expand Up @@ -186,3 +195,24 @@ def handle(self, *args, **options):
)
except Exception: # pylint: disable=broad-except
logger.exception("Error consuming Kafka events")


@receiver(SESSION_LOGIN_COMPLETED)
def log_event_from_event_bus(**kwargs): # pragma: no cover
"""
Log event received and transmitted from event bus consumer.
This is test code that should be removed.
Arguments:
kwargs: event data sent to the signal
"""
try:
user_data = kwargs.get('user', None)
if not user_data or not isinstance(user_data, UserData):
logger.error("Received null or incorrect data from SESSION_LOGIN_COMPLETED")
return
logger.info(f"Received SESSION_LOGIN_COMPLETED signal with user_data"
f" with UserData {user_data}")
except Exception: # pylint: disable=broad-except
logger.exception("Error while testing receiving signals from Kafka events")
Empty file.
24 changes: 0 additions & 24 deletions edx_event_bus_kafka/consumer/signals/receivers.py

This file was deleted.

Loading

0 comments on commit 3e05743

Please sign in to comment.