Skip to content

Commit

Permalink
feat(ingest): default to ASYNC_BATCH mode in datahub-rest sink (datah…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 17, 2024
1 parent adc6b90 commit 38bcd9c
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
Re-running with stateful ingestion should automatically clear up the entities with old URNS and add entities with new URNs, therefore not duplicating the containers or jobs.

- #11313 - `datahub get` will no longer return a key aspect for entities that don't exist.
- #11369 - The default datahub-rest sink mode has been changed to `ASYNC_BATCH`. This requires a server with version 0.14.0+.

### Potential Downtime

Expand Down
47 changes: 24 additions & 23 deletions metadata-ingestion/sink_docs/datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ sink:
```
If you are connecting to a hosted DataHub Cloud instance, your sink will look like
```yml
source:
# source configs
Expand Down Expand Up @@ -68,16 +69,17 @@ If you are using [UI based ingestion](../../docs/ui-ingestion.md) then where GMS
Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
|----------------------------|----------|----------------------|----------------------------------------------------------------------------------------------------|
| `server` | ✅ | | URL of DataHub GMS endpoint. |
| -------------------------- | -------- | -------------------- | -------------------------------------------------------------------------------------------------- |
| `server` | ✅ | | URL of DataHub GMS endpoint. |
| `token` | | | Bearer token used for authentication. |
| `timeout_sec` | | 30 | Per-HTTP request timeout. |
| `retry_max_times` | | 1 | Maximum times to retry if HTTP request fails. The delay between retries is increased exponentially |
| `retry_status_codes` | | [429, 502, 503, 504] | Retry HTTP request also on these status codes |
| `token` | | | Bearer token used for authentication. |
| `extra_headers` | | | Extra headers which will be added to the request. |
| `max_threads` | | `15` | Experimental: Max parallelism for REST API calls |
| `ca_certificate_path` | | | Path to server's CA certificate for verification of HTTPS communications |
| `client_certificate_path` | | | Path to client's CA certificate for HTTPS communications |
| `max_threads` | | `15` | Max parallelism for REST API calls |
| `mode` | | `ASYNC_BATCH` | [Advanced] Mode of operation - `SYNC`, `ASYNC`, or `ASYNC_BATCH` |
| `ca_certificate_path` | | | Path to server's CA certificate for verification of HTTPS communications |
| `client_certificate_path` | | | Path to client's CA certificate for HTTPS communications |
| `disable_ssl_verification` | | false | Disable ssl certificate validation |

## DataHub Kafka
Expand Down Expand Up @@ -115,14 +117,14 @@ sink:

Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
| -------------------------------------------- | -------- | ------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `connection.bootstrap` | ✅ | | Kafka bootstrap URL. |
| `connection.producer_config.<option>` | | | Passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.SerializingProducer |
| `connection.schema_registry_url` | ✅ | | URL of schema registry being used. |
| `connection.schema_registry_config.<option>` | | | Passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient |
| `topic_routes.MetadataChangeEvent` | | MetadataChangeEvent | Overridden Kafka topic name for the MetadataChangeEvent |
| `topic_routes.MetadataChangeProposal` | | MetadataChangeProposal | Overridden Kafka topic name for the MetadataChangeProposal |
| Field | Required | Default | Description |
| -------------------------------------------- | -------- | ---------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `connection.bootstrap` | ✅ | | Kafka bootstrap URL. |
| `connection.producer_config.<option>` | | | Passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.SerializingProducer |
| `connection.schema_registry_url` | ✅ | | URL of schema registry being used. |
| `connection.schema_registry_config.<option>` | | | Passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient |
| `topic_routes.MetadataChangeEvent` | | MetadataChangeEvent | Overridden Kafka topic name for the MetadataChangeEvent |
| `topic_routes.MetadataChangeProposal` | | MetadataChangeProposal | Overridden Kafka topic name for the MetadataChangeProposal |

The options in the producer config and schema registry config are passed to the Kafka SerializingProducer and SchemaRegistryClient respectively.

Expand Down Expand Up @@ -178,15 +180,14 @@ DataHub Lite currently doesn't support stateful ingestion, so you'll have to tur

Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
|----------------------------|----------|----------------------|----------------------------------------------------------------------------------------------------|
| `type` | | duckdb | Type of DataHub Lite implementation to use |
| `config` | | `{"file": "~/.datahub/lite/datahub.duckdb"}` | Config dictionary to pass through to the DataHub Lite implementation. See below for fields accepted by the DuckDB implementation |
| Field | Required | Default | Description |
| -------- | -------- | -------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------- |
| `type` | | duckdb | Type of DataHub Lite implementation to use |
| `config` | | `{"file": "~/.datahub/lite/datahub.duckdb"}` | Config dictionary to pass through to the DataHub Lite implementation. See below for fields accepted by the DuckDB implementation |

#### DuckDB Config Details

| Field | Required | Default | Description |
|----------------------------|----------|----------------------|----------------------------------------------------------------------------------------------------|
| `file` | | `"~/.datahub/lite/datahub.duckdb"` | File to use for DuckDB storage |
| `options` | | `{}` | Options dictionary to pass through to DuckDB library. See [the official spec](https://duckdb.org/docs/sql/configuration.html) for the options supported by DuckDB. |

| Field | Required | Default | Description |
| --------- | -------- | ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `file` | | `"~/.datahub/lite/datahub.duckdb"` | File to use for DuckDB storage |
| `options` | | `{}` | Options dictionary to pass through to DuckDB library. See [the official spec](https://duckdb.org/docs/sql/configuration.html) for the options supported by DuckDB. |
36 changes: 28 additions & 8 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
os.getenv("DATAHUB_REST_EMITTER_DEFAULT_RETRY_MAX_TIMES", "4")
)

# The limit is 16mb. We will use a max of 15mb to have some space for overhead.
_MAX_BATCH_INGEST_PAYLOAD_SIZE = 15 * 1024 * 1024


class DataHubRestEmitter(Closeable, Emitter):
_gms_server: str
Expand Down Expand Up @@ -269,20 +272,37 @@ def emit_mcps(
self,
mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> None:
) -> int:
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)

mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps]
payload_dict: dict = {"proposals": mcp_objs}

if async_flag is not None:
payload_dict["async"] = "true" if async_flag else "false"

payload = json.dumps(payload_dict)

self._emit_generic(url, payload)
# As a safety mechanism, we need to make sure we don't exceed the max payload size for GMS.
# If we will exceed the limit, we need to break it up into chunks.
mcp_obj_chunks: List[List[str]] = []
current_chunk_size = _MAX_BATCH_INGEST_PAYLOAD_SIZE
for mcp_obj in mcp_objs:
mcp_obj_size = len(json.dumps(mcp_obj))

if mcp_obj_size + current_chunk_size > _MAX_BATCH_INGEST_PAYLOAD_SIZE:
mcp_obj_chunks.append([])
current_chunk_size = 0
mcp_obj_chunks[-1].append(mcp_obj)
current_chunk_size += mcp_obj_size

for mcp_obj_chunk in mcp_obj_chunks:
# TODO: We're calling json.dumps on each MCP object twice, once to estimate
# the size when chunking, and again for the actual request.
payload_dict: dict = {"proposals": mcp_obj_chunk}
if async_flag is not None:
payload_dict["async"] = True if async_flag else False

payload = json.dumps(payload_dict)
self._emit_generic(url, payload)

return len(mcp_obj_chunks)

@deprecated
def emit_usage(self, usageStats: UsageAggregation) -> None:
Expand Down
12 changes: 10 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class RestSinkMode(ConfigEnum):


_DEFAULT_REST_SINK_MODE = pydantic.parse_obj_as(
RestSinkMode, os.getenv("DATAHUB_REST_SINK_DEFAULT_MODE", RestSinkMode.ASYNC)
RestSinkMode, os.getenv("DATAHUB_REST_SINK_DEFAULT_MODE", RestSinkMode.ASYNC_BATCH)
)


Expand All @@ -78,6 +78,8 @@ class DataHubRestSinkReport(SinkReport):
gms_version: Optional[str] = None
pending_requests: int = 0

async_batches_split: int = 0

main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)

def compute_stats(self) -> None:
Expand Down Expand Up @@ -255,7 +257,13 @@ def _emit_batch_wrapper(
else:
events.append(event)

self.emitter.emit_mcps(events)
chunks = self.emitter.emit_mcps(events)
if chunks > 1:
self.report.async_batches_split += chunks
logger.info(
f"In async_batch mode, the payload was split into {chunks} batches. "
"If there's many of these issues, consider decreasing `max_per_batch`."
)

def write_record_async(
self,
Expand Down
Loading

0 comments on commit 38bcd9c

Please sign in to comment.