Skip to content

Commit

Permalink
fix(ingest/kafka):add poll for admin client for oauth_cb (datahub-pro…
Browse files Browse the repository at this point in the history
…ject#11985)

Co-authored-by: Tamas Nemeth <[email protected]>
  • Loading branch information
mayurinehate and treff7es authored Nov 28, 2024
1 parent 2206e58 commit ecba224
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 14 deletions.
29 changes: 21 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def get_kafka_consumer(
) -> confluent_kafka.Consumer:
consumer = confluent_kafka.Consumer(
{
"group.id": "test",
"group.id": "datahub-kafka-ingestion",
"bootstrap.servers": connection.bootstrap,
**connection.consumer_config,
}
Expand All @@ -164,6 +164,25 @@ def get_kafka_consumer(
return consumer


def get_kafka_admin_client(
connection: KafkaConsumerConnectionConfig,
) -> AdminClient:
client = AdminClient(
{
"group.id": "datahub-kafka-ingestion",
"bootstrap.servers": connection.bootstrap,
**connection.consumer_config,
}
)
if CallableConsumerConfig.is_callable_config(connection.consumer_config):
# As per documentation, we need to explicitly call the poll method to make sure OAuth callback gets executed
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration
logger.debug("Initiating polling for kafka admin client")
client.poll(timeout=30)
logger.debug("Initiated polling for kafka admin client")
return client


@dataclass
class KafkaSourceReport(StaleEntityRemovalSourceReport):
topics_scanned: int = 0
Expand Down Expand Up @@ -278,13 +297,7 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
def init_kafka_admin_client(self) -> None:
try:
# TODO: Do we require separate config than existing consumer_config ?
self.admin_client = AdminClient(
{
"group.id": "test",
"bootstrap.servers": self.source_config.connection.bootstrap,
**self.source_config.connection.consumer_config,
}
)
self.admin_client = get_kafka_admin_client(self.source_config.connection)
except Exception as e:
logger.debug(e, exc_info=e)
self.report.report_warning(
Expand Down
33 changes: 27 additions & 6 deletions metadata-ingestion/tests/integration/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,32 @@ def test_kafka_oauth_callback(

pipeline.run()

is_found: bool = False
# Initialize flags to track oauth events
checks = {
"consumer_polling": False,
"consumer_oauth_callback": False,
"admin_polling": False,
"admin_oauth_callback": False,
}

# Read log file and check for oauth events
with open(log_file, "r") as file:
for line_number, line in enumerate(file, 1):
for line in file:
# Check for polling events
if "Initiating polling for kafka admin client" in line:
checks["admin_polling"] = True
elif "Initiating polling for kafka consumer" in line:
checks["consumer_polling"] = True

# Check for oauth callbacks
if oauth.MESSAGE in line:
is_found = True
break

assert is_found
if checks["consumer_polling"] and not checks["admin_polling"]:
checks["consumer_oauth_callback"] = True
elif checks["consumer_polling"] and checks["admin_polling"]:
checks["admin_oauth_callback"] = True

# Verify all oauth events occurred
assert checks["consumer_polling"], "Consumer polling was not initiated"
assert checks["consumer_oauth_callback"], "Consumer oauth callback not found"
assert checks["admin_polling"], "Admin polling was not initiated"
assert checks["admin_oauth_callback"], "Admin oauth callback not found"

0 comments on commit ecba224

Please sign in to comment.