Skip to content

Commit

Permalink
Kafka consumer reliability/performance improvements (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
noggi authored Jun 7, 2024
1 parent 01661f0 commit 54a568b
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 16 deletions.
28 changes: 19 additions & 9 deletions datahub-actions/src/datahub_actions/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,15 @@ def run(self) -> None:
enveloped_events = self.source.events()
for enveloped_event in enveloped_events:
# Then, process the event.
self._process_event(enveloped_event)
retval = self._process_event(enveloped_event)

# For legacy users w/o selective ack support, convert
# None to True, i.e. always commit.
if retval is None:
retval = True

# Finally, ack the event.
self._ack_event(enveloped_event)
self._ack_event(enveloped_event, retval)

def stop(self) -> None:
"""
Expand All @@ -189,21 +195,22 @@ def stats(self) -> PipelineStats:
"""
return self._stats

def _process_event(self, enveloped_event: EventEnvelope) -> None:
def _process_event(self, enveloped_event: EventEnvelope) -> Optional[bool]:
# Attempt to process the incoming event, with retry.
curr_attempt = 1
max_attempts = self._retry_count + 1
retval = None
while curr_attempt <= max_attempts:
try:
# First, transform the event.
transformed_event = self._execute_transformers(enveloped_event)

# Then, invoke the action if the event is non-null.
if transformed_event is not None:
self._execute_action(transformed_event)
retval = self._execute_action(transformed_event)

# Short circuit - processing has succeeded.
return
return retval
except Exception:
logger.exception(
f"Caught exception while attempting to process event. Attempt {curr_attempt}/{max_attempts} event type: {enveloped_event.event_type}, pipeline name: {self.name}"
Expand All @@ -220,6 +227,8 @@ def _process_event(self, enveloped_event: EventEnvelope) -> None:
# Finally, handle the failure
self._handle_failure(enveloped_event)

return retval

def _execute_transformers(
self, enveloped_event: EventEnvelope
) -> Optional[EventEnvelope]:
Expand Down Expand Up @@ -254,19 +263,20 @@ def _execute_transformer(
f"Caught exception while executing Transformer with name {type(transformer).__name__}"
) from e

def _execute_action(self, enveloped_event: EventEnvelope) -> None:
def _execute_action(self, enveloped_event: EventEnvelope) -> Optional[bool]:
try:
self.action.act(enveloped_event)
retval = self.action.act(enveloped_event)
self._stats.increment_action_success_count()
return retval
except Exception as e:
self._stats.increment_action_exception_count()
raise PipelineException(
f"Caught exception while executing Action with type {type(self.action).__name__}"
) from e

def _ack_event(self, enveloped_event: EventEnvelope) -> None:
def _ack_event(self, enveloped_event: EventEnvelope, processed: bool) -> None:
try:
self.source.ack(enveloped_event)
self.source.ack(enveloped_event, processed)
self._stats.increment_success_count()
except Exception:
self._stats.increment_failed_ack_count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

# May or may not need these.
from datahub_actions.pipeline.pipeline_context import PipelineContext
from datahub_actions.plugin.source.kafka.utils import with_retry
from datahub_actions.source.event_source import EventSource

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,6 +91,10 @@ def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent
class KafkaEventSourceConfig(ConfigModel):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()
topic_routes: Optional[Dict[str, str]]
async_commit_enabled: bool = False
async_commit_interval: int = 10000
commit_retry_count: int = 5
commit_retry_backoff: float = 10.0


def kafka_messages_observer(pipeline_name: str) -> Callable:
Expand Down Expand Up @@ -120,6 +125,16 @@ def __init__(self, config: KafkaEventSourceConfig, ctx: PipelineContext):
schema_client_config = config.connection.schema_registry_config.copy()
schema_client_config["url"] = self.source_config.connection.schema_registry_url
self.schema_registry_client = SchemaRegistryClient(schema_client_config)

async_commit_config: Dict[str, Any] = {}
if self.source_config.async_commit_enabled:
# See for details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#auto-offset-commit
async_commit_config["enable.auto.offset.store"] = False
async_commit_config["enable.auto.commit"] = True
async_commit_config["auto.commit.interval.ms"] = (
self.source_config.async_commit_interval
)

self.consumer: confluent_kafka.Consumer = confluent_kafka.DeserializingConsumer(
{
# Provide a custom group id to subcribe to multiple partitions via separate actions pods.
Expand All @@ -134,6 +149,7 @@ def __init__(self, config: KafkaEventSourceConfig, ctx: PipelineContext):
"session.timeout.ms": "10000", # 10s timeout.
"max.poll.interval.ms": "10000", # 10s poll max.
**self.source_config.connection.consumer_config,
**async_commit_config,
}
)
self._observe_message: Callable = kafka_messages_observer(ctx.pipeline_name)
Expand Down Expand Up @@ -201,16 +217,54 @@ def close(self) -> None:
self.running = False
self.consumer.close()

def ack(self, event: EventEnvelope) -> None:
self.consumer.commit(
def _commit_offsets(self, event: EventEnvelope) -> None:
retval = self.consumer.commit(
asynchronous=False,
offsets=[
TopicPartition(
event.meta["kafka"]["topic"],
event.meta["kafka"]["partition"],
event.meta["kafka"]["offset"] + 1,
)
]
],
)
if retval is None:
logger.exception(
f"Unexpected response when commiting offset to kafka: topic: {event.meta['kafka']['topic']}, partition: {event.meta['kafka']['partition']}, offset: {event.meta['kafka']['offset']}"
)
return
for partition in retval:
if partition.error is not None:
raise KafkaException(
f"Failed to commit offest for topic: {partition.topic}, partition: {partition.partition}, offset: {partition.offset}: {partition.error.str()}"
)
logger.debug(
f"Successfully committed offsets at message: topic: {event.meta['kafka']['topic']}, partition: {event.meta['kafka']['partition']}, offset: {event.meta['kafka']['offset']}"
)

def _store_offsets(self, event: EventEnvelope) -> None:
self.consumer.store_offsets(
offsets=[
TopicPartition(
event.meta["kafka"]["topic"],
event.meta["kafka"]["partition"],
event.meta["kafka"]["offset"] + 1,
)
],
)

def ack(self, event: EventEnvelope, processed: bool = True) -> None:
# See for details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#auto-offset-commit

if processed or not self.source_config.async_commit_enabled:
# Immediately commit if the message was processed by the upstream,
# or delayed commit is disabled
with_retry(
self.source_config.commit_retry_count,
self.source_config.commit_retry_backoff,
self._commit_offsets,
event,
)
else:
# Otherwise store offset for periodic autocommit
self._store_offsets(event)
26 changes: 26 additions & 0 deletions datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging
import time
from typing import Any, Callable

logger = logging.getLogger(__name__)


def with_retry(max_attempts: int, max_backoff: float, func: Callable, *args, **kwargs) -> Any: # type: ignore
curr_attempt = 0
backoff = 0.3

while curr_attempt < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(str(e))

curr_attempt = curr_attempt + 1
if curr_attempt >= max_attempts:
logger.warning("kafka event source: exhausted all attempts.")
return

backoff = backoff * 2
if backoff > max_backoff:
backoff = max_backoff
time.sleep(backoff)
2 changes: 1 addition & 1 deletion datahub-actions/src/datahub_actions/source/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def events(self) -> Iterable[EventEnvelope]:
"""

@abstractmethod
def ack(self, event: EventEnvelope) -> None:
def ack(self, event: EventEnvelope, processed: bool = True) -> None:
"""
Acknowledges the processing of an individual event by the Actions Framework
"""
4 changes: 2 additions & 2 deletions datahub-actions/tests/unit/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def events(self) -> Iterable[EventEnvelope]:
EventEnvelope("TestEvent", TestEvent("value"), {}),
]

def ack(self, event: EventEnvelope) -> None:
def ack(self, event: EventEnvelope, processed: bool = True) -> None:
self.ack_count = self.ack_count + 1

def close(self) -> None:
Expand Down Expand Up @@ -221,7 +221,7 @@ def events(self) -> Iterable[EventEnvelope]:
"MetadataChangeLogEvent_v1", metadata_change_log_event, {}
)

def ack(self, event: EventEnvelope) -> None:
def ack(self, event: EventEnvelope, processed: bool = True) -> None:
pass

def close(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

0 comments on commit 54a568b

Please sign in to comment.