diff --git a/datahub-actions/src/datahub_actions/api/action_graph.py b/datahub-actions/src/datahub_actions/api/action_graph.py index aa44b935..24ddc270 100644 --- a/datahub-actions/src/datahub_actions/api/action_graph.py +++ b/datahub-actions/src/datahub_actions/api/action_graph.py @@ -346,7 +346,6 @@ def _get_entity_by_name( ) num_entities = results.get("value", {}).get("numEntities", 0) if num_entities > 1: - breakpoint() logger.warning( f"Got {num_entities} results for {entity_type} {name}. Will return the first match." ) diff --git a/datahub-actions/src/datahub_actions/pipeline/pipeline.py b/datahub-actions/src/datahub_actions/pipeline/pipeline.py index 8c81e72b..d4f78097 100644 --- a/datahub-actions/src/datahub_actions/pipeline/pipeline.py +++ b/datahub-actions/src/datahub_actions/pipeline/pipeline.py @@ -169,9 +169,15 @@ def run(self) -> None: enveloped_events = self.source.events() for enveloped_event in enveloped_events: # Then, process the event. - self._process_event(enveloped_event) + retval = self._process_event(enveloped_event) + + # For legacy users w/o selective ack support, convert + # None to True, i.e. always commit. + if retval is None: + retval = True + # Finally, ack the event. - self._ack_event(enveloped_event) + self._ack_event(enveloped_event, retval) def stop(self) -> None: """ @@ -189,10 +195,11 @@ def stats(self) -> PipelineStats: """ return self._stats - def _process_event(self, enveloped_event: EventEnvelope) -> None: + def _process_event(self, enveloped_event: EventEnvelope) -> Optional[bool]: # Attempt to process the incoming event, with retry. curr_attempt = 1 max_attempts = self._retry_count + 1 + retval = None while curr_attempt <= max_attempts: try: # First, transform the event. @@ -200,10 +207,10 @@ def _process_event(self, enveloped_event: EventEnvelope) -> None: # Then, invoke the action if the event is non-null. if transformed_event is not None: - self._execute_action(transformed_event) + retval = self._execute_action(transformed_event) # Short circuit - processing has succeeded. - return + return retval except Exception: logger.exception( f"Caught exception while attempting to process event. Attempt {curr_attempt}/{max_attempts} event type: {enveloped_event.event_type}, pipeline name: {self.name}" @@ -220,6 +227,8 @@ def _process_event(self, enveloped_event: EventEnvelope) -> None: # Finally, handle the failure self._handle_failure(enveloped_event) + return retval + def _execute_transformers( self, enveloped_event: EventEnvelope ) -> Optional[EventEnvelope]: @@ -254,19 +263,20 @@ def _execute_transformer( f"Caught exception while executing Transformer with name {type(transformer).__name__}" ) from e - def _execute_action(self, enveloped_event: EventEnvelope) -> None: + def _execute_action(self, enveloped_event: EventEnvelope) -> Optional[bool]: try: - self.action.act(enveloped_event) + retval = self.action.act(enveloped_event) self._stats.increment_action_success_count() + return retval except Exception as e: self._stats.increment_action_exception_count() raise PipelineException( f"Caught exception while executing Action with type {type(self.action).__name__}" ) from e - def _ack_event(self, enveloped_event: EventEnvelope) -> None: + def _ack_event(self, enveloped_event: EventEnvelope, processed: bool) -> None: try: - self.source.ack(enveloped_event) + self.source.ack(enveloped_event, processed) self._stats.increment_success_count() except Exception: self._stats.increment_failed_ack_count() diff --git a/datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py b/datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py index 6e1d6624..e82309f1 100644 --- a/datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py +++ b/datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py @@ -39,6 +39,7 @@ # May or may not need these. from datahub_actions.pipeline.pipeline_context import PipelineContext +from datahub_actions.plugin.source.kafka.utils import with_retry from datahub_actions.source.event_source import EventSource logger = logging.getLogger(__name__) @@ -47,6 +48,7 @@ ENTITY_CHANGE_EVENT_NAME = "entityChangeEvent" DEFAULT_TOPIC_ROUTES = { "mcl": "MetadataChangeLog_Versioned_v1", + "mcl_timeseries": "MetadataChangeLog_Timeseries_v1", "pe": "PlatformEvent_v1", } @@ -90,6 +92,10 @@ def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent class KafkaEventSourceConfig(ConfigModel): connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig() topic_routes: Optional[Dict[str, str]] + async_commit_enabled: bool = False + async_commit_interval: int = 10000 + commit_retry_count: int = 5 + commit_retry_backoff: float = 10.0 def kafka_messages_observer(pipeline_name: str) -> Callable: @@ -120,6 +126,16 @@ def __init__(self, config: KafkaEventSourceConfig, ctx: PipelineContext): schema_client_config = config.connection.schema_registry_config.copy() schema_client_config["url"] = self.source_config.connection.schema_registry_url self.schema_registry_client = SchemaRegistryClient(schema_client_config) + + async_commit_config: Dict[str, Any] = {} + if self.source_config.async_commit_enabled: + # See for details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#auto-offset-commit + async_commit_config["enable.auto.offset.store"] = False + async_commit_config["enable.auto.commit"] = True + async_commit_config["auto.commit.interval.ms"] = ( + self.source_config.async_commit_interval + ) + self.consumer: confluent_kafka.Consumer = confluent_kafka.DeserializingConsumer( { # Provide a custom group id to subcribe to multiple partitions via separate actions pods. @@ -134,6 +150,7 @@ def __init__(self, config: KafkaEventSourceConfig, ctx: PipelineContext): "session.timeout.ms": "10000", # 10s timeout. "max.poll.interval.ms": "10000", # 10s poll max. **self.source_config.connection.consumer_config, + **async_commit_config, } ) self._observe_message: Callable = kafka_messages_observer(ctx.pipeline_name) @@ -172,6 +189,13 @@ def events(self) -> Iterable[EventEnvelope]: else: if "mcl" in topic_routes and msg.topic() == topic_routes["mcl"]: yield from self.handle_mcl(msg) + if ( + "mcl_timeseries" in topic_routes + and msg.topic() == topic_routes["mcl_timeseries"] + ): + yield from self.handle_mcl( + msg + ) # Handle timeseries in the same way as usual MCL. elif "pe" in topic_routes and msg.topic() == topic_routes["pe"]: yield from self.handle_pe(msg) @@ -201,16 +225,54 @@ def close(self) -> None: self.running = False self.consumer.close() - def ack(self, event: EventEnvelope) -> None: - self.consumer.commit( + def _commit_offsets(self, event: EventEnvelope) -> None: + retval = self.consumer.commit( + asynchronous=False, offsets=[ TopicPartition( event.meta["kafka"]["topic"], event.meta["kafka"]["partition"], event.meta["kafka"]["offset"] + 1, ) - ] + ], ) + if retval is None: + logger.exception( + f"Unexpected response when commiting offset to kafka: topic: {event.meta['kafka']['topic']}, partition: {event.meta['kafka']['partition']}, offset: {event.meta['kafka']['offset']}" + ) + return + for partition in retval: + if partition.error is not None: + raise KafkaException( + f"Failed to commit offest for topic: {partition.topic}, partition: {partition.partition}, offset: {partition.offset}: {partition.error.str()}" + ) logger.debug( f"Successfully committed offsets at message: topic: {event.meta['kafka']['topic']}, partition: {event.meta['kafka']['partition']}, offset: {event.meta['kafka']['offset']}" ) + + def _store_offsets(self, event: EventEnvelope) -> None: + self.consumer.store_offsets( + offsets=[ + TopicPartition( + event.meta["kafka"]["topic"], + event.meta["kafka"]["partition"], + event.meta["kafka"]["offset"] + 1, + ) + ], + ) + + def ack(self, event: EventEnvelope, processed: bool = True) -> None: + # See for details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#auto-offset-commit + + if processed or not self.source_config.async_commit_enabled: + # Immediately commit if the message was processed by the upstream, + # or delayed commit is disabled + with_retry( + self.source_config.commit_retry_count, + self.source_config.commit_retry_backoff, + self._commit_offsets, + event, + ) + else: + # Otherwise store offset for periodic autocommit + self._store_offsets(event) diff --git a/datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py b/datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py new file mode 100644 index 00000000..a416d519 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py @@ -0,0 +1,26 @@ +import logging +import time +from typing import Any, Callable + +logger = logging.getLogger(__name__) + + +def with_retry(max_attempts: int, max_backoff: float, func: Callable, *args, **kwargs) -> Any: # type: ignore + curr_attempt = 0 + backoff = 0.3 + + while curr_attempt < max_attempts: + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(str(e)) + + curr_attempt = curr_attempt + 1 + if curr_attempt >= max_attempts: + logger.warning("kafka event source: exhausted all attempts.") + return + + backoff = backoff * 2 + if backoff > max_backoff: + backoff = max_backoff + time.sleep(backoff) diff --git a/datahub-actions/src/datahub_actions/source/event_source.py b/datahub-actions/src/datahub_actions/source/event_source.py index bcbbdb43..91c3163b 100644 --- a/datahub-actions/src/datahub_actions/source/event_source.py +++ b/datahub-actions/src/datahub_actions/source/event_source.py @@ -48,7 +48,7 @@ def events(self) -> Iterable[EventEnvelope]: """ @abstractmethod - def ack(self, event: EventEnvelope) -> None: + def ack(self, event: EventEnvelope, processed: bool = True) -> None: """ Acknowledges the processing of an individual event by the Actions Framework """ diff --git a/datahub-actions/tests/unit/test_helpers.py b/datahub-actions/tests/unit/test_helpers.py index e06106de..eeb1c68c 100644 --- a/datahub-actions/tests/unit/test_helpers.py +++ b/datahub-actions/tests/unit/test_helpers.py @@ -138,7 +138,7 @@ def events(self) -> Iterable[EventEnvelope]: EventEnvelope("TestEvent", TestEvent("value"), {}), ] - def ack(self, event: EventEnvelope) -> None: + def ack(self, event: EventEnvelope, processed: bool = True) -> None: self.ack_count = self.ack_count + 1 def close(self) -> None: @@ -221,7 +221,7 @@ def events(self) -> Iterable[EventEnvelope]: "MetadataChangeLogEvent_v1", metadata_change_log_event, {} ) - def ack(self, event: EventEnvelope) -> None: + def ack(self, event: EventEnvelope, processed: bool = True) -> None: pass def close(self) -> None: diff --git a/docker/config/executor.yaml b/docker/config/executor.yaml index 722def3b..2ad6d56e 100644 --- a/docker/config/executor.yaml +++ b/docker/config/executor.yaml @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -name: "ingestion_executor" +name: ${DATAHUB_ACTIONS_INGESTION_EXECUTOR_CONSUMER_GROUP_ID:-ingestion_executor} source: type: "kafka" config: diff --git a/docker/config/slack_action.yaml b/docker/config/slack_action.yaml index 7dd1a0d0..48491c00 100644 --- a/docker/config/slack_action.yaml +++ b/docker/config/slack_action.yaml @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -name: datahub_slack_action +name: ${DATAHUB_ACTIONS_SLACK_CONSUMER_GROUP_ID:-datahub_slack_action} enabled: ${DATAHUB_ACTIONS_SLACK_ENABLED:-false} source: type: "kafka" diff --git a/docker/config/teams_action.yaml b/docker/config/teams_action.yaml index 70f0f679..6ab3befb 100644 --- a/docker/config/teams_action.yaml +++ b/docker/config/teams_action.yaml @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -name: datahub_teams_action +name: ${DATAHUB_ACTIONS_TEAMS_CONSUMER_GROUP_ID:-datahub_teams_action} enabled: ${DATAHUB_ACTIONS_TEAMS_ENABLED:-false} source: type: "kafka" diff --git a/docker/datahub-actions/Dockerfile b/docker/datahub-actions/Dockerfile index 6ae7f80e..9e03253e 100644 --- a/docker/datahub-actions/Dockerfile +++ b/docker/datahub-actions/Dockerfile @@ -17,6 +17,9 @@ ARG APP_ENV=prod FROM acryldata/datahub-ingestion-base:head-full as prod-install COPY datahub-actions /actions-src + +USER root + RUN mkdir -p /etc/datahub/actions && mkdir -p /tmp/datahub/logs/actions/system RUN cd /actions-src && \ pip install "." && \ diff --git a/docker/datahub-actions/start.sh b/docker/datahub-actions/start.sh index b007cf56..3c50ca16 100755 --- a/docker/datahub-actions/start.sh +++ b/docker/datahub-actions/start.sh @@ -16,6 +16,8 @@ SYS_CONFIGS_PATH="${DATAHUB_ACTIONS_SYSTEM_CONFIGS_PATH:-/etc/datahub/actions/system/conf}" USER_CONFIGS_PATH="${DATAHUB_ACTIONS_USER_CONFIGS_PATH:-/etc/datahub/actions/conf}" +MONITORING_ENABLED="${DATAHUB_ACTIONS_MONITORING_ENABLED:-false}" +MONITORING_PORT="${DATAHUB_ACTIONS_MONITORING_PORT:-8000}" touch /tmp/datahub/logs/actions/actions.out @@ -60,4 +62,8 @@ else echo "No user action configurations found. Not starting user actions." fi -datahub-actions actions $config_files +if [ "$MONITORING_ENABLED" = true ]; then + datahub-actions --enable-monitoring --monitoring-port "$MONITORING_PORT" actions $config_files +else + datahub-actions actions $config_files +fi diff --git a/docs/sources/kafka-event-source.md b/docs/sources/kafka-event-source.md index f82c12cb..06c2ef81 100644 --- a/docs/sources/kafka-event-source.md +++ b/docs/sources/kafka-event-source.md @@ -62,6 +62,7 @@ source: # Topic Routing - which topics to read from. topic_routes: mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1} # Topic name for MetadataChangeLogEvent_v1 events. + mcl_timeseries: ${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:-MetadataChangeLog_Timeseries_v1} # Topic name for MetadataChangeLogEvent_v1 timeseries events. pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1} # Topic name for PlatformEvent_v1 events. action: # action configs @@ -75,7 +76,8 @@ action: | `connection.bootstrap` | ✅ | N/A | The Kafka bootstrap URI, e.g. `localhost:9092`. | | `connection.schema_registry_url` | ✅ | N/A | The URL for the Kafka schema registry, e.g. `http://localhost:8081` | | `connection.consumer_config` | ❌ | {} | A set of key-value pairs that represents arbitrary Kafka Consumer configs | - | `topic_routes.mcl` | ❌ | `MetadataChangeLogEvent_v1` | The name of the topic containing MetadataChangeLog events | + | `topic_routes.mcl` | ❌ | `MetadataChangeLog_Versioned_v1` | The name of the topic containing versionined MetadataChangeLog events | + | `topic_routes.mcl_timeseries` | ❌ | `MetadataChangeLog_Timeseries_v1` | The name of the topic containing timeseries MetadataChangeLog events | | `topic_routes.pe` | ❌ | `PlatformEvent_v1` | The name of the topic containing PlatformEvent events | diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 5028f28f..e1bef7e8 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists