Skip to content

Commit

Permalink
Merge branch 'main' into fix-links
Browse files Browse the repository at this point in the history
  • Loading branch information
Lizard-tlt authored Jul 3, 2024
2 parents a4b4283 + b1f0529 commit 770be2c
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 22 deletions.
1 change: 0 additions & 1 deletion datahub-actions/src/datahub_actions/api/action_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ def _get_entity_by_name(
)
num_entities = results.get("value", {}).get("numEntities", 0)
if num_entities > 1:
breakpoint()
logger.warning(
f"Got {num_entities} results for {entity_type} {name}. Will return the first match."
)
Expand Down
28 changes: 19 additions & 9 deletions datahub-actions/src/datahub_actions/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,15 @@ def run(self) -> None:
enveloped_events = self.source.events()
for enveloped_event in enveloped_events:
# Then, process the event.
self._process_event(enveloped_event)
retval = self._process_event(enveloped_event)

# For legacy users w/o selective ack support, convert
# None to True, i.e. always commit.
if retval is None:
retval = True

# Finally, ack the event.
self._ack_event(enveloped_event)
self._ack_event(enveloped_event, retval)

def stop(self) -> None:
"""
Expand All @@ -189,21 +195,22 @@ def stats(self) -> PipelineStats:
"""
return self._stats

def _process_event(self, enveloped_event: EventEnvelope) -> None:
def _process_event(self, enveloped_event: EventEnvelope) -> Optional[bool]:
# Attempt to process the incoming event, with retry.
curr_attempt = 1
max_attempts = self._retry_count + 1
retval = None
while curr_attempt <= max_attempts:
try:
# First, transform the event.
transformed_event = self._execute_transformers(enveloped_event)

# Then, invoke the action if the event is non-null.
if transformed_event is not None:
self._execute_action(transformed_event)
retval = self._execute_action(transformed_event)

# Short circuit - processing has succeeded.
return
return retval
except Exception:
logger.exception(
f"Caught exception while attempting to process event. Attempt {curr_attempt}/{max_attempts} event type: {enveloped_event.event_type}, pipeline name: {self.name}"
Expand All @@ -220,6 +227,8 @@ def _process_event(self, enveloped_event: EventEnvelope) -> None:
# Finally, handle the failure
self._handle_failure(enveloped_event)

return retval

def _execute_transformers(
self, enveloped_event: EventEnvelope
) -> Optional[EventEnvelope]:
Expand Down Expand Up @@ -254,19 +263,20 @@ def _execute_transformer(
f"Caught exception while executing Transformer with name {type(transformer).__name__}"
) from e

def _execute_action(self, enveloped_event: EventEnvelope) -> None:
def _execute_action(self, enveloped_event: EventEnvelope) -> Optional[bool]:
try:
self.action.act(enveloped_event)
retval = self.action.act(enveloped_event)
self._stats.increment_action_success_count()
return retval
except Exception as e:
self._stats.increment_action_exception_count()
raise PipelineException(
f"Caught exception while executing Action with type {type(self.action).__name__}"
) from e

def _ack_event(self, enveloped_event: EventEnvelope) -> None:
def _ack_event(self, enveloped_event: EventEnvelope, processed: bool) -> None:
try:
self.source.ack(enveloped_event)
self.source.ack(enveloped_event, processed)
self._stats.increment_success_count()
except Exception:
self._stats.increment_failed_ack_count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

# May or may not need these.
from datahub_actions.pipeline.pipeline_context import PipelineContext
from datahub_actions.plugin.source.kafka.utils import with_retry
from datahub_actions.source.event_source import EventSource

logger = logging.getLogger(__name__)
Expand All @@ -47,6 +48,7 @@
ENTITY_CHANGE_EVENT_NAME = "entityChangeEvent"
DEFAULT_TOPIC_ROUTES = {
"mcl": "MetadataChangeLog_Versioned_v1",
"mcl_timeseries": "MetadataChangeLog_Timeseries_v1",
"pe": "PlatformEvent_v1",
}

Expand Down Expand Up @@ -90,6 +92,10 @@ def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent
class KafkaEventSourceConfig(ConfigModel):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()
topic_routes: Optional[Dict[str, str]]
async_commit_enabled: bool = False
async_commit_interval: int = 10000
commit_retry_count: int = 5
commit_retry_backoff: float = 10.0


def kafka_messages_observer(pipeline_name: str) -> Callable:
Expand Down Expand Up @@ -120,6 +126,16 @@ def __init__(self, config: KafkaEventSourceConfig, ctx: PipelineContext):
schema_client_config = config.connection.schema_registry_config.copy()
schema_client_config["url"] = self.source_config.connection.schema_registry_url
self.schema_registry_client = SchemaRegistryClient(schema_client_config)

async_commit_config: Dict[str, Any] = {}
if self.source_config.async_commit_enabled:
# See for details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#auto-offset-commit
async_commit_config["enable.auto.offset.store"] = False
async_commit_config["enable.auto.commit"] = True
async_commit_config["auto.commit.interval.ms"] = (
self.source_config.async_commit_interval
)

self.consumer: confluent_kafka.Consumer = confluent_kafka.DeserializingConsumer(
{
# Provide a custom group id to subcribe to multiple partitions via separate actions pods.
Expand All @@ -134,6 +150,7 @@ def __init__(self, config: KafkaEventSourceConfig, ctx: PipelineContext):
"session.timeout.ms": "10000", # 10s timeout.
"max.poll.interval.ms": "10000", # 10s poll max.
**self.source_config.connection.consumer_config,
**async_commit_config,
}
)
self._observe_message: Callable = kafka_messages_observer(ctx.pipeline_name)
Expand Down Expand Up @@ -172,6 +189,13 @@ def events(self) -> Iterable[EventEnvelope]:
else:
if "mcl" in topic_routes and msg.topic() == topic_routes["mcl"]:
yield from self.handle_mcl(msg)
if (
"mcl_timeseries" in topic_routes
and msg.topic() == topic_routes["mcl_timeseries"]
):
yield from self.handle_mcl(
msg
) # Handle timeseries in the same way as usual MCL.
elif "pe" in topic_routes and msg.topic() == topic_routes["pe"]:
yield from self.handle_pe(msg)

Expand Down Expand Up @@ -201,16 +225,54 @@ def close(self) -> None:
self.running = False
self.consumer.close()

def ack(self, event: EventEnvelope) -> None:
self.consumer.commit(
def _commit_offsets(self, event: EventEnvelope) -> None:
retval = self.consumer.commit(
asynchronous=False,
offsets=[
TopicPartition(
event.meta["kafka"]["topic"],
event.meta["kafka"]["partition"],
event.meta["kafka"]["offset"] + 1,
)
]
],
)
if retval is None:
logger.exception(
f"Unexpected response when commiting offset to kafka: topic: {event.meta['kafka']['topic']}, partition: {event.meta['kafka']['partition']}, offset: {event.meta['kafka']['offset']}"
)
return
for partition in retval:
if partition.error is not None:
raise KafkaException(
f"Failed to commit offest for topic: {partition.topic}, partition: {partition.partition}, offset: {partition.offset}: {partition.error.str()}"
)
logger.debug(
f"Successfully committed offsets at message: topic: {event.meta['kafka']['topic']}, partition: {event.meta['kafka']['partition']}, offset: {event.meta['kafka']['offset']}"
)

def _store_offsets(self, event: EventEnvelope) -> None:
self.consumer.store_offsets(
offsets=[
TopicPartition(
event.meta["kafka"]["topic"],
event.meta["kafka"]["partition"],
event.meta["kafka"]["offset"] + 1,
)
],
)

def ack(self, event: EventEnvelope, processed: bool = True) -> None:
# See for details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#auto-offset-commit

if processed or not self.source_config.async_commit_enabled:
# Immediately commit if the message was processed by the upstream,
# or delayed commit is disabled
with_retry(
self.source_config.commit_retry_count,
self.source_config.commit_retry_backoff,
self._commit_offsets,
event,
)
else:
# Otherwise store offset for periodic autocommit
self._store_offsets(event)
26 changes: 26 additions & 0 deletions datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging
import time
from typing import Any, Callable

logger = logging.getLogger(__name__)


def with_retry(max_attempts: int, max_backoff: float, func: Callable, *args, **kwargs) -> Any: # type: ignore
curr_attempt = 0
backoff = 0.3

while curr_attempt < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(str(e))

curr_attempt = curr_attempt + 1
if curr_attempt >= max_attempts:
logger.warning("kafka event source: exhausted all attempts.")
return

backoff = backoff * 2
if backoff > max_backoff:
backoff = max_backoff
time.sleep(backoff)
2 changes: 1 addition & 1 deletion datahub-actions/src/datahub_actions/source/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def events(self) -> Iterable[EventEnvelope]:
"""

@abstractmethod
def ack(self, event: EventEnvelope) -> None:
def ack(self, event: EventEnvelope, processed: bool = True) -> None:
"""
Acknowledges the processing of an individual event by the Actions Framework
"""
4 changes: 2 additions & 2 deletions datahub-actions/tests/unit/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def events(self) -> Iterable[EventEnvelope]:
EventEnvelope("TestEvent", TestEvent("value"), {}),
]

def ack(self, event: EventEnvelope) -> None:
def ack(self, event: EventEnvelope, processed: bool = True) -> None:
self.ack_count = self.ack_count + 1

def close(self) -> None:
Expand Down Expand Up @@ -221,7 +221,7 @@ def events(self) -> Iterable[EventEnvelope]:
"MetadataChangeLogEvent_v1", metadata_change_log_event, {}
)

def ack(self, event: EventEnvelope) -> None:
def ack(self, event: EventEnvelope, processed: bool = True) -> None:
pass

def close(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion docker/config/executor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: "ingestion_executor"
name: ${DATAHUB_ACTIONS_INGESTION_EXECUTOR_CONSUMER_GROUP_ID:-ingestion_executor}
source:
type: "kafka"
config:
Expand Down
2 changes: 1 addition & 1 deletion docker/config/slack_action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: datahub_slack_action
name: ${DATAHUB_ACTIONS_SLACK_CONSUMER_GROUP_ID:-datahub_slack_action}
enabled: ${DATAHUB_ACTIONS_SLACK_ENABLED:-false}
source:
type: "kafka"
Expand Down
2 changes: 1 addition & 1 deletion docker/config/teams_action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: datahub_teams_action
name: ${DATAHUB_ACTIONS_TEAMS_CONSUMER_GROUP_ID:-datahub_teams_action}
enabled: ${DATAHUB_ACTIONS_TEAMS_ENABLED:-false}
source:
type: "kafka"
Expand Down
3 changes: 3 additions & 0 deletions docker/datahub-actions/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ ARG APP_ENV=prod

FROM acryldata/datahub-ingestion-base:head-full as prod-install
COPY datahub-actions /actions-src

USER root

RUN mkdir -p /etc/datahub/actions && mkdir -p /tmp/datahub/logs/actions/system
RUN cd /actions-src && \
pip install "." && \
Expand Down
8 changes: 7 additions & 1 deletion docker/datahub-actions/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

SYS_CONFIGS_PATH="${DATAHUB_ACTIONS_SYSTEM_CONFIGS_PATH:-/etc/datahub/actions/system/conf}"
USER_CONFIGS_PATH="${DATAHUB_ACTIONS_USER_CONFIGS_PATH:-/etc/datahub/actions/conf}"
MONITORING_ENABLED="${DATAHUB_ACTIONS_MONITORING_ENABLED:-false}"
MONITORING_PORT="${DATAHUB_ACTIONS_MONITORING_PORT:-8000}"

touch /tmp/datahub/logs/actions/actions.out

Expand Down Expand Up @@ -60,4 +62,8 @@ else
echo "No user action configurations found. Not starting user actions."
fi

datahub-actions actions $config_files
if [ "$MONITORING_ENABLED" = true ]; then
datahub-actions --enable-monitoring --monitoring-port "$MONITORING_PORT" actions $config_files
else
datahub-actions actions $config_files
fi
4 changes: 3 additions & 1 deletion docs/sources/kafka-event-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ source:
# Topic Routing - which topics to read from.
topic_routes:
mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1} # Topic name for MetadataChangeLogEvent_v1 events.
mcl_timeseries: ${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:-MetadataChangeLog_Timeseries_v1} # Topic name for MetadataChangeLogEvent_v1 timeseries events.
pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1} # Topic name for PlatformEvent_v1 events.
action:
# action configs
Expand All @@ -75,7 +76,8 @@ action:
| `connection.bootstrap` || N/A | The Kafka bootstrap URI, e.g. `localhost:9092`. |
| `connection.schema_registry_url` || N/A | The URL for the Kafka schema registry, e.g. `http://localhost:8081` |
| `connection.consumer_config` || {} | A set of key-value pairs that represents arbitrary Kafka Consumer configs |
| `topic_routes.mcl` || `MetadataChangeLogEvent_v1` | The name of the topic containing MetadataChangeLog events |
| `topic_routes.mcl` || `MetadataChangeLog_Versioned_v1` | The name of the topic containing versionined MetadataChangeLog events |
| `topic_routes.mcl_timeseries` || `MetadataChangeLog_Timeseries_v1` | The name of the topic containing timeseries MetadataChangeLog events |
| `topic_routes.pe` || `PlatformEvent_v1` | The name of the topic containing PlatformEvent events |
</details>

Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

0 comments on commit 770be2c

Please sign in to comment.