Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Nov 28, 2024
2 parents 219a2e5 + 816fd3d commit 5f06dfa
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 22 deletions.
8 changes: 8 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -317,5 +317,13 @@
"displayName": "CassandraDB",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra",
"recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]"
},
{
"urn": "urn:li:dataPlatform:iceberg",
"name": "iceberg",
"displayName": "Iceberg",
"description": "Ingest databases and tables from any Iceberg catalog implementation",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/iceberg",
"recipe": "source:\n type: \"iceberg\"\n config:\n env: dev\n # each thread will open internet connections to fetch manifest files independently, \n # this value needs to be adjusted with ulimit\n processing_threads: 1 \n # a single catalog definition with a form of a dictionary\n catalog: \n demo: # name of the catalog\n type: \"rest\" # other types are available\n uri: \"uri\"\n s3.access-key-id: \"access-key\"\n s3.secret-access-key: \"secret-access-key\"\n s3.region: \"aws-region\"\n profiling:\n enabled: false\n"
}
]
6 changes: 4 additions & 2 deletions metadata-ingestion/docs/sources/iceberg/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ This ingestion source maps the following Source System Concepts to DataHub Conce

## Troubleshooting

### [Common Issue]
### Exceptions while increasing `processing_threads`

[Provide description of common issues with this integration and steps to resolve]
Each processing thread will open several files/sockets to download manifest files from blob storage. If you experience
exceptions appearing when increasing `processing_threads` configuration parameter, try to increase limit of open
files (i.e. using `ulimit` in Linux).
3 changes: 2 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@

iceberg_common = {
# Iceberg Python SDK
"pyiceberg>=0.4,<0.7",
# Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency
"pyiceberg>=0.4.0",
}

mssql_common = {
Expand Down
17 changes: 12 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
NoSuchIcebergTableError,
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
)
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
from pyiceberg.table import Table
Expand Down Expand Up @@ -104,7 +105,7 @@
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default.")
@capability(
SourceCapability.OWNERSHIP,
"Optionally enabled via configuration by specifying which Iceberg table property holds user or group ownership.",
"Automatically ingests ownership information from table properties based on `user_ownership_property` and `group_ownership_property`",
)
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class IcebergSource(StatefulIngestionSourceBase):
Expand Down Expand Up @@ -192,9 +193,7 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
table = thread_local.local_catalog.load_table(dataset_path)
time_taken = timer.elapsed_seconds()
self.report.report_table_load_time(time_taken)
LOGGER.debug(
f"Loaded table: {table.identifier}, time taken: {time_taken}"
)
LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}")
yield from self._create_iceberg_workunit(dataset_name, table)
except NoSuchPropertyException as e:
self.report.report_warning(
Expand All @@ -206,12 +205,20 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
)
except NoSuchIcebergTableError as e:
self.report.report_warning(
"no-iceberg-table",
"not-an-iceberg-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.",
)
except NoSuchTableError as e:
self.report.report_warning(
"no-such-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchTableError while processing table {dataset_path}, skipping it.",
)
except Exception as e:
self.report.report_failure("general", f"Failed to create workunit: {e}")
LOGGER.exception(
Expand Down
29 changes: 21 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def get_kafka_consumer(
) -> confluent_kafka.Consumer:
consumer = confluent_kafka.Consumer(
{
"group.id": "test",
"group.id": "datahub-kafka-ingestion",
"bootstrap.servers": connection.bootstrap,
**connection.consumer_config,
}
Expand All @@ -164,6 +164,25 @@ def get_kafka_consumer(
return consumer


def get_kafka_admin_client(
connection: KafkaConsumerConnectionConfig,
) -> AdminClient:
client = AdminClient(
{
"group.id": "datahub-kafka-ingestion",
"bootstrap.servers": connection.bootstrap,
**connection.consumer_config,
}
)
if CallableConsumerConfig.is_callable_config(connection.consumer_config):
# As per documentation, we need to explicitly call the poll method to make sure OAuth callback gets executed
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration
logger.debug("Initiating polling for kafka admin client")
client.poll(timeout=30)
logger.debug("Initiated polling for kafka admin client")
return client


@dataclass
class KafkaSourceReport(StaleEntityRemovalSourceReport):
topics_scanned: int = 0
Expand Down Expand Up @@ -278,13 +297,7 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
def init_kafka_admin_client(self) -> None:
try:
# TODO: Do we require separate config than existing consumer_config ?
self.admin_client = AdminClient(
{
"group.id": "test",
"bootstrap.servers": self.source_config.connection.bootstrap,
**self.source_config.connection.consumer_config,
}
)
self.admin_client = get_kafka_admin_client(self.source_config.connection)
except Exception as e:
logger.debug(e, exc_info=e)
self.report.report_warning(
Expand Down
33 changes: 27 additions & 6 deletions metadata-ingestion/tests/integration/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,32 @@ def test_kafka_oauth_callback(

pipeline.run()

is_found: bool = False
# Initialize flags to track oauth events
checks = {
"consumer_polling": False,
"consumer_oauth_callback": False,
"admin_polling": False,
"admin_oauth_callback": False,
}

# Read log file and check for oauth events
with open(log_file, "r") as file:
for line_number, line in enumerate(file, 1):
for line in file:
# Check for polling events
if "Initiating polling for kafka admin client" in line:
checks["admin_polling"] = True
elif "Initiating polling for kafka consumer" in line:
checks["consumer_polling"] = True

# Check for oauth callbacks
if oauth.MESSAGE in line:
is_found = True
break

assert is_found
if checks["consumer_polling"] and not checks["admin_polling"]:
checks["consumer_oauth_callback"] = True
elif checks["consumer_polling"] and checks["admin_polling"]:
checks["admin_oauth_callback"] = True

# Verify all oauth events occurred
assert checks["consumer_polling"], "Consumer polling was not initiated"
assert checks["consumer_oauth_callback"], "Consumer oauth callback not found"
assert checks["admin_polling"], "Admin polling was not initiated"
assert checks["admin_oauth_callback"], "Admin oauth callback not found"

0 comments on commit 5f06dfa

Please sign in to comment.