From ecba2244f04b41384c35344b0532ccca3ad43aa7 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Thu, 28 Nov 2024 19:23:30 +0530 Subject: [PATCH 1/2] fix(ingest/kafka):add poll for admin client for oauth_cb (#11985) Co-authored-by: Tamas Nemeth --- .../datahub/ingestion/source/kafka/kafka.py | 29 +++++++++++----- .../tests/integration/kafka/test_kafka.py | 33 +++++++++++++++---- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py index e57dc853a83c6..709ba431f0f87 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py @@ -148,7 +148,7 @@ def get_kafka_consumer( ) -> confluent_kafka.Consumer: consumer = confluent_kafka.Consumer( { - "group.id": "test", + "group.id": "datahub-kafka-ingestion", "bootstrap.servers": connection.bootstrap, **connection.consumer_config, } @@ -164,6 +164,25 @@ def get_kafka_consumer( return consumer +def get_kafka_admin_client( + connection: KafkaConsumerConnectionConfig, +) -> AdminClient: + client = AdminClient( + { + "group.id": "datahub-kafka-ingestion", + "bootstrap.servers": connection.bootstrap, + **connection.consumer_config, + } + ) + if CallableConsumerConfig.is_callable_config(connection.consumer_config): + # As per documentation, we need to explicitly call the poll method to make sure OAuth callback gets executed + # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration + logger.debug("Initiating polling for kafka admin client") + client.poll(timeout=30) + logger.debug("Initiated polling for kafka admin client") + return client + + @dataclass class KafkaSourceReport(StaleEntityRemovalSourceReport): topics_scanned: int = 0 @@ -278,13 +297,7 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext): def init_kafka_admin_client(self) -> None: try: # TODO: Do we require separate config than existing consumer_config ? - self.admin_client = AdminClient( - { - "group.id": "test", - "bootstrap.servers": self.source_config.connection.bootstrap, - **self.source_config.connection.consumer_config, - } - ) + self.admin_client = get_kafka_admin_client(self.source_config.connection) except Exception as e: logger.debug(e, exc_info=e) self.report.report_warning( diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index 597889c8440b7..7462f177684b7 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -128,11 +128,32 @@ def test_kafka_oauth_callback( pipeline.run() - is_found: bool = False + # Initialize flags to track oauth events + checks = { + "consumer_polling": False, + "consumer_oauth_callback": False, + "admin_polling": False, + "admin_oauth_callback": False, + } + + # Read log file and check for oauth events with open(log_file, "r") as file: - for line_number, line in enumerate(file, 1): + for line in file: + # Check for polling events + if "Initiating polling for kafka admin client" in line: + checks["admin_polling"] = True + elif "Initiating polling for kafka consumer" in line: + checks["consumer_polling"] = True + + # Check for oauth callbacks if oauth.MESSAGE in line: - is_found = True - break - - assert is_found + if checks["consumer_polling"] and not checks["admin_polling"]: + checks["consumer_oauth_callback"] = True + elif checks["consumer_polling"] and checks["admin_polling"]: + checks["admin_oauth_callback"] = True + + # Verify all oauth events occurred + assert checks["consumer_polling"], "Consumer polling was not initiated" + assert checks["consumer_oauth_callback"], "Consumer oauth callback not found" + assert checks["admin_polling"], "Admin polling was not initiated" + assert checks["admin_oauth_callback"], "Admin oauth callback not found" From 816fd3dba7fe2ca7276cb3b0b8042b74c0d8e04c Mon Sep 17 00:00:00 2001 From: skrydal Date: Thu, 28 Nov 2024 16:34:20 +0100 Subject: [PATCH 2/2] fix(ingestion/iceberg): Improvements to iceberg source (#11987) --- .../src/app/ingest/source/builder/sources.json | 8 ++++++++ .../docs/sources/iceberg/iceberg.md | 6 ++++-- metadata-ingestion/setup.py | 3 ++- .../datahub/ingestion/source/iceberg/iceberg.py | 17 ++++++++++++----- 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 5e81a7855e9c4..70d9baabdb4bc 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -317,5 +317,13 @@ "displayName": "CassandraDB", "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra", "recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]" + }, + { + "urn": "urn:li:dataPlatform:iceberg", + "name": "iceberg", + "displayName": "Iceberg", + "description": "Ingest databases and tables from any Iceberg catalog implementation", + "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/iceberg", + "recipe": "source:\n type: \"iceberg\"\n config:\n env: dev\n # each thread will open internet connections to fetch manifest files independently, \n # this value needs to be adjusted with ulimit\n processing_threads: 1 \n # a single catalog definition with a form of a dictionary\n catalog: \n demo: # name of the catalog\n type: \"rest\" # other types are available\n uri: \"uri\"\n s3.access-key-id: \"access-key\"\n s3.secret-access-key: \"secret-access-key\"\n s3.region: \"aws-region\"\n profiling:\n enabled: false\n" } ] diff --git a/metadata-ingestion/docs/sources/iceberg/iceberg.md b/metadata-ingestion/docs/sources/iceberg/iceberg.md index 7e40315a2e319..92aac5ffa6ce5 100644 --- a/metadata-ingestion/docs/sources/iceberg/iceberg.md +++ b/metadata-ingestion/docs/sources/iceberg/iceberg.md @@ -18,6 +18,8 @@ This ingestion source maps the following Source System Concepts to DataHub Conce ## Troubleshooting -### [Common Issue] +### Exceptions while increasing `processing_threads` -[Provide description of common issues with this integration and steps to resolve] +Each processing thread will open several files/sockets to download manifest files from blob storage. If you experience +exceptions appearing when increasing `processing_threads` configuration parameter, try to increase limit of open +files (i.e. using `ulimit` in Linux). diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 74c2e611cf68f..292038380e6a2 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -249,7 +249,8 @@ iceberg_common = { # Iceberg Python SDK - "pyiceberg>=0.4,<0.7", + # Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency + "pyiceberg>=0.4.0", } mssql_common = { diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 258a4b9ad6daf..5931873f54236 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -9,6 +9,7 @@ NoSuchIcebergTableError, NoSuchNamespaceError, NoSuchPropertyException, + NoSuchTableError, ) from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit from pyiceberg.table import Table @@ -104,7 +105,7 @@ @capability(SourceCapability.DESCRIPTIONS, "Enabled by default.") @capability( SourceCapability.OWNERSHIP, - "Optionally enabled via configuration by specifying which Iceberg table property holds user or group ownership.", + "Automatically ingests ownership information from table properties based on `user_ownership_property` and `group_ownership_property`", ) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") class IcebergSource(StatefulIngestionSourceBase): @@ -192,9 +193,7 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: table = thread_local.local_catalog.load_table(dataset_path) time_taken = timer.elapsed_seconds() self.report.report_table_load_time(time_taken) - LOGGER.debug( - f"Loaded table: {table.identifier}, time taken: {time_taken}" - ) + LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}") yield from self._create_iceberg_workunit(dataset_name, table) except NoSuchPropertyException as e: self.report.report_warning( @@ -206,12 +205,20 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: ) except NoSuchIcebergTableError as e: self.report.report_warning( - "no-iceberg-table", + "not-an-iceberg-table", f"Failed to create workunit for {dataset_name}. {e}", ) LOGGER.warning( f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.", ) + except NoSuchTableError as e: + self.report.report_warning( + "no-such-table", + f"Failed to create workunit for {dataset_name}. {e}", + ) + LOGGER.warning( + f"NoSuchTableError while processing table {dataset_path}, skipping it.", + ) except Exception as e: self.report.report_failure("general", f"Failed to create workunit: {e}") LOGGER.exception(