Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC DO NOT MERGE feat: multiple events per topic #155

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Change Log
Unreleased
**********

[4.0.0] - 2023-05-04
********************
Changed
=======
* **BREAKING CHANGE**: consume_events no longer takes the signal as an argument. Consumers determine how to deserialize based on the signal type in the message header.
* Switch from ``edx-sphinx-theme`` to ``sphinx-book-theme`` since the former is
deprecated

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '3.9.6'
__version__ = '4.0.0'
141 changes: 81 additions & 60 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
import confluent_kafka
from confluent_kafka import TIMESTAMP_NOT_AVAILABLE, DeserializingConsumer
from confluent_kafka import TIMESTAMP_NOT_AVAILABLE, Consumer
from confluent_kafka.error import KafkaError
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext
except ImportError: # pragma: no cover
confluent_kafka = None

Expand Down Expand Up @@ -107,49 +108,35 @@ class KafkaEventConsumer:
Can also consume messages indefinitely off the queue.
"""

def __init__(self, topic, group_id, signal):
def __init__(self, topic, group_id):
if confluent_kafka is None: # pragma: no cover
raise Exception('Library confluent-kafka not available. Cannot create event consumer.')

self.topic = topic
self.group_id = group_id
self.signal = signal
self.consumer = self._create_consumer()
self._shut_down_loop = False

# return type (Optional[DeserializingConsumer]) removed from signature to avoid error on import
# return type (Optional[Consumer]) removed from signature to avoid error on import
def _create_consumer(self):
"""
Create a DeserializingConsumer for events of the given signal instance.
Create a new Consumer in the consumer group.

Returns
None if confluent_kafka is not available.
DeserializingConsumer if it is.
Consumer if it is.
"""

schema_registry_client = get_schema_registry_client()

signal_deserializer = AvroSignalDeserializer(self.signal)

def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument
return signal_deserializer.from_dict(event_data_dict)

consumer_config = load_common_settings()

# We do not deserialize the key because we don't need it for anything yet.
# Also see https://github.com/openedx/openedx-events/issues/86 for some challenges on determining key schema.
consumer_config.update({
'group.id': self.group_id,
'value.deserializer': AvroDeserializer(schema_str=signal_deserializer.schema_string(),
schema_registry_client=schema_registry_client,
from_dict=inner_from_dict),
# Turn off auto commit. Auto commit will commit offsets for the entire batch of messages received,
# potentially resulting in data loss if some of those messages are not fully processed. See
# https://newrelic.com/blog/best-practices/kafka-consumer-config-auto-commit-data-loss
'enable.auto.commit': False,
})

return DeserializingConsumer(consumer_config)
return Consumer(consumer_config)

def _shut_down(self):
"""
Expand Down Expand Up @@ -248,7 +235,6 @@ def _consume_indefinitely(self):
run_context = {
'full_topic': full_topic,
'consumer_group': self.group_id,
'expected_signal': self.signal,
}
self.consumer.subscribe([full_topic])
logger.info(f"Running consumer for {run_context!r}")
Expand Down Expand Up @@ -276,11 +262,10 @@ def _consume_indefinitely(self):
with function_trace('_consume_indefinitely_consume_single_message'):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()

self.emit_signals_from_message(msg)
self.consume_single_message(msg, run_context)
consecutive_errors = 0
self._add_message_monitoring(run_context=run_context, message=msg)

self._add_message_monitoring(run_context=run_context, message=msg)
except Exception as e: # pylint: disable=broad-except
consecutive_errors += 1
self.record_event_consuming_error(run_context, e, msg)
Expand All @@ -303,6 +288,46 @@ def _consume_indefinitely(self):
finally:
self.consumer.close()

def consume_single_message(self, msg, run_context):
"""
Deserialize a single message and emit the associated signal with the message data

Parameters:
msg: A raw message from Kafka
run_context: A dictionary of information about the consumer, for logging
"""

# determine the event type of the message and use it to create a deserializer
all_msg_headers = msg.headers()
event_type = self._determine_event_type(all_msg_headers)
signal = None
try:
signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
except KeyError as ke:
raise UnusableMessageError(f"Unrecognized event_type {event_type}, cannot determine signal") from ke
run_context['expected_signal'] = signal

ctx = SerializationContext(msg.topic(), MessageField.VALUE, all_msg_headers)
value = msg.value()
signal_deserializer = AvroSignalDeserializer(signal)
schema_registry_client = get_schema_registry_client()

def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argument
return signal_deserializer.from_dict(event_data_dict)

# We do not deserialize the key because we don't need it for anything yet.
# Also see https://github.com/openedx/openedx-events/issues/86 for some challenges on
# determining key schema.
value_deserializer = AvroDeserializer(schema_str=signal_deserializer.schema_string(),
schema_registry_client=schema_registry_client,
from_dict=inner_from_dict)
deserialized_value = value_deserializer(value, ctx)

# set the value of the message to the new deserialized version
msg.set_value(deserialized_value)

self.emit_signals_from_deserialized_message(msg, signal)

def consume_indefinitely(self, offset_timestamp=None):
"""
Consume events from a topic in an infinite loop.
Expand All @@ -324,13 +349,16 @@ def consume_indefinitely(self, offset_timestamp=None):
)
self.reset_offsets_and_sleep_indefinitely(offset_timestamp)

@function_trace('emit_signals_from_message')
def emit_signals_from_message(self, msg):
@function_trace('emit_signals_from_deserialized_message')
def emit_signals_from_deserialized_message(self, msg, signal):
"""
Determine the correct signal and send the event from the message.
Send the event from the message.

This method expects that the caller has already determined the correct signal from the message headers

Arguments:
msg (Message): Consumed message.
msg (Message): Deserialized message
signal (OpenEdxPublicSignal): The signal determined by the message headers.
"""
self._log_message_received(msg)

Expand All @@ -345,34 +373,18 @@ def emit_signals_from_message(self, msg):

headers = msg.headers() or [] # treat None as []

event_types = get_message_header_values(headers, HEADER_EVENT_TYPE)
if len(event_types) == 0:
raise UnusableMessageError(
"Missing ce_type header on message, cannot determine signal"
)
if len(event_types) > 1:
raise UnusableMessageError(
"Multiple ce_type headers found on message, cannot determine signal"
)
event_type = event_types[0]

if event_type != self.signal.event_type:
raise UnusableMessageError(
f"Signal types do not match. Expected {self.signal.event_type}. "
f"Received message of type {event_type}."
)
try:
event_metadata = _get_metadata_from_headers(headers)
except Exception as e:
raise UnusableMessageError(f"Error determining metadata from message headers: {e}") from e

with function_trace('emit_signals_from_message_send_event_with_custom_metadata'):
send_results = self.signal.send_event_with_custom_metadata(event_metadata, **msg.value())
with function_trace('emit_signals_from_deserialized_message_send_event_with_custom_metadata'):
send_results = signal.send_event_with_custom_metadata(event_metadata, **msg.value())

# Raise an exception if any receivers errored out. This allows logging of the receivers
# along with partition, offset, etc. in record_event_consuming_error. Hopefully the
# receiver code is idempotent and we can just replay any messages that were involved.
self._check_receiver_results(send_results)
self._check_receiver_results(send_results, signal)

# At the very end, log that a message was processed successfully.
# Since we're single-threaded, no other information is needed;
Expand All @@ -381,7 +393,7 @@ def emit_signals_from_message(self, msg):
if AUDIT_LOGGING_ENABLED.is_enabled():
logger.info('Message from Kafka processed successfully')

def _check_receiver_results(self, send_results: list):
def _check_receiver_results(self, send_results: list, signal: OpenEdxPublicSignal):
"""
Raises exception if any of the receivers produced an exception.

Expand Down Expand Up @@ -409,7 +421,7 @@ def _check_receiver_results(self, send_results: list):
raise ReceiverError(
f"{len(error_descriptions)} receiver(s) out of {len(send_results)} "
"produced errors (stack trace elsewhere in logs) "
f"when handling signal {self.signal}: {', '.join(error_descriptions)}",
f"when handling signal {signal}: {', '.join(error_descriptions)}",
errors
)

Expand Down Expand Up @@ -534,6 +546,24 @@ def _add_message_monitoring(self, run_context, message, error=None):
# Use this to fix any bugs in what should be benign monitoring code
set_custom_attribute('kafka_monitoring_error', repr(e))

def _determine_event_type(self, headers):
"""
Get event type from message headers

Arguments:
headers: List of key/value tuples. Keys are strings, values are bytestrings.
"""
event_types = get_message_header_values(headers, HEADER_EVENT_TYPE)
if len(event_types) == 0:
raise UnusableMessageError(
"Missing ce_type header on message, cannot determine signal"
)
if len(event_types) > 1:
raise UnusableMessageError(
"Multiple ce_type headers found on message, cannot determine signal"
)
return event_types[0]

def _get_kafka_message_and_error(self, message, error):
"""
Returns tuple of (kafka_message, kafka_error), if they can be found.
Expand Down Expand Up @@ -576,12 +606,11 @@ class ConsumeEventsCommand(BaseCommand):
Management command for Kafka consumer workers in the event bus.
"""
help = """
Consume messages of specified signal type from a Kafka topic and send their data to that signal.
Consume messages of from a Kafka topic and emit the relevant signals.

Example::

python3 manage.py cms consume_events -t user-login -g user-activity-service \
-s org.openedx.learning.auth.session.login.completed.v1
python3 manage.py cms consume_events -t user-login -g user-activity-service
"""

def add_arguments(self, parser):
Expand All @@ -599,12 +628,6 @@ def add_arguments(self, parser):
required=True,
help='Consumer group id'
)
parser.add_argument(
'-s', '--signal',
nargs=1,
required=True,
help='Type of signal to emit from consumed messages.'
)
parser.add_argument(
'-o', '--offset_time',
nargs=1,
Expand All @@ -628,7 +651,6 @@ def handle(self, *args, **options):

try:
load_all_signals()
signal = OpenEdxPublicSignal.get_signal_by_type(options['signal'][0])
if options['offset_time'] and options['offset_time'][0] is not None:
try:
offset_timestamp = datetime.fromisoformat(options['offset_time'][0])
Expand All @@ -641,7 +663,6 @@ def handle(self, *args, **options):
event_consumer = KafkaEventConsumer(
topic=options['topic'][0],
group_id=options['group_id'][0],
signal=signal,
)
if offset_timestamp is None:
event_consumer.consume_indefinitely()
Expand Down
4 changes: 4 additions & 0 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
try:
import confluent_kafka
from confluent_kafka import Producer
from confluent_kafka.schema_registry import topic_record_subject_name_strategy
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import MessageField, SerializationContext
except ImportError: # pragma: no cover
Expand Down Expand Up @@ -165,11 +166,13 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
schema_str=extract_key_schema(signal_serializer, event_key_field),
schema_registry_client=client,
to_dict=inner_to_dict,
conf={'subject.name.strategy': topic_record_subject_name_strategy}
)
value_serializer = AvroSerializer(
schema_str=signal_serializer.schema_string(),
schema_registry_client=client,
to_dict=inner_to_dict,
conf={'subject.name.strategy': topic_record_subject_name_strategy}
)

return key_serializer, value_serializer
Expand Down Expand Up @@ -272,6 +275,7 @@ def send(
key_serializer, value_serializer = get_serializers(signal, event_key_field)
key_bytes = key_serializer(event_key, SerializationContext(full_topic, MessageField.KEY, headers))
value_bytes = value_serializer(event_data, SerializationContext(full_topic, MessageField.VALUE, headers))
logger.info(f"{value_bytes=}")
self.producer.produce(
full_topic, key=key_bytes, value=value_bytes, headers=headers,
on_delivery=context.on_event_deliver,
Expand Down
Loading