Skip to content

Commit

Permalink
Refreshing website content from main repo.
Browse files Browse the repository at this point in the history
  • Loading branch information
GitHub Action Website Snapshot committed Dec 16, 2024
1 parent 198e395 commit 783bd31
Show file tree
Hide file tree
Showing 39 changed files with 5,576 additions and 643 deletions.
66 changes: 66 additions & 0 deletions blog/streaming-philosophy/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
---
title: OpenLineage for Streaming Jobs
date: 2024-12-13
authors: [Obuchowski]
description: A deep dive into how OpenLineage thinks about streaming jobs.
---

## OpenLineage for Streaming Jobs
Despite appearing to fit mostly batch processing jobs, OpenLineage provides comprehensive lineage tracking for both batch and streaming job models.
In fact, the streaming model can be seen as an extension of the batch model and works effectively with systems like Apache Flink or Apache Spark.
However, there are some core differences between how those jobs are typically executed, and OpenLineage integration has to take into account those differences.

### Understanding Batch and Streaming Job Lineage Differences
For batch jobs, OpenLineage typically aims to answer questions like:
- Which datasets are read, written to, or updated, and what are their properties?
- When does the job start and end, and did it succeed?
- What actual transformations do happen? For instance, which input columns are used to produce which output columns?

In contrast, streaming jobs raise some additional questions:
- When does an unbounded job end?
- When are datasets updated?
- Does the transformation change during execution?

### When Does the Unbounded Job End?

Isn’t streaming supposed to be continuous?
Streaming data is generally defined as being continuously generated, with a clear beginning but no defined end. However, most streaming jobs, even those considered unbounded, do eventually end. For instance:
- Jobs might need to be upgraded - whether it’s the underlying system or the job itself.
- Schema changes in the data may necessitate restarting a job.
In such cases, the original run, which refers to a single execution instance of a streaming job, ends, and a new run of the same job begins.
OpenLineage recommends modeling these scenarios as distinct runs, where one run ends and the next begins.
This approach enables OpenLineage consumers to capture key insights.
For example, after weeks of continuous operation, it becomes clear that a streaming job failed to send new data for 30 minutes following a deployment of a new version.
Such visibility is necessary for diagnosing issues and assessing impacts, such as identifying the root cause of delayed data processing.

### When Are Datasets Updated?
Only at the start or end of a job?
Dataset versioning is critical for tasks like debugging and ensuring data freshness. OpenLineage handles dataset updates for batch jobs using two mechanisms:
- Implicit versioning: The “last update timestamp” corresponds to the end of a run modifying the dataset.
- Explicit versioning: The DatasetVersionDatasetFacet allows events to provide explicit version details for datasets being read or written.
However, explicit versioning is currently provided only by new “table formats” like Apache Iceberg or Delta Lake, as most systems lack native versioning support in their data storage schemes.
For streaming jobs, OpenLineage offers additional capabilities, as OpenLineage’s RUNNING event type captures information that's available during job execution.
This is particularly evident in OpenLineage's Flink integration, where events are emitted continuously at configurable intervals, including details about completed Flink checkpoints.
Unfortunately, some frameworks
such as Apache Spark, do not provide detailed updates while the job is still running.

### Does the Transformation Change During Execution?
Certain scenarios in Apache Flink show how transformations can evolve mid-execution.
For example, a KafkaSource configured with a wildcard pattern can dynamically pick up additional topics or partitions during the job’s runtime.
This capability is often used to scale jobs:
In a consumer group, a single partition can be processed by only one consumer, so increasing partitions can address scalability limits.
New topics, representing distinct datasets in OpenLineage, may also be discovered and processed.
If OpenLineage were restricted to emitting events only at the start or end of a job, these new datasets would remain “unclaimed” until the job concluded.
This approach would be an impractical limitation for streaming jobs. With RUNNING events, OpenLineage can transmit updates about new datasets in near real-time, significantly reducing latency.

### Other Concerns
There are other differences between streaming and batch jobs, but, in practice, those are often system-specific rather than general.
OpenLineage facilitates capturing system-specific information by custom facets, which allow you to attach any atomic piece of metadata to OpenLineage core objects.
This provides users with the flexibility to capture unique metadata, system-specific nuances, or operational details.

### Conclusion
In summary, while OpenLineage was initially perceived as a tool primarily for batch job tracking, its robust support for streaming jobs demonstrates its adaptability.
By addressing key challenges such as run boundaries, dataset updates, and dynamic transformations, OpenLineage offers comprehensive lineage tracking for streaming models.
The inclusion of continuous RUNNING events and support for custom facets further enhances its utility in complex streaming environments.
This allows OpenLineage consumers to effectively use events from streaming jobs to achieve greater visibility, traceability, and control over their streaming workflows.
It also facilitates more efficient debugging, impact analysis, and operational awareness.
1 change: 1 addition & 0 deletions docs/integrations/spark/configuration/spark_conf.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ The following parameters can be specified:
| spark.openlineage.debugFacet | Determines whether debug facet shall be generated and included within the event. Set `enabled` to turn it on. By default, facet is disabled. | enabled |
| spark.openlineage.job.owners.\<ownership-type\> | Specifies ownership of the job. Multiple entries with different types are allowed. Config key name and value are used to create job ownership type and name (available since 1.13). | spark.openlineage.job.owners.team="Some Team" |
| spark.openlineage.columnLineage.datasetLineageEnabled | Makes the dataset dependencies to be included in their own property `dataset` in the column lineage pattern. If this flag is set to `false`, then the dataset dependencies are merged into `fields` property. The default value is `false`. **It is recommended to set it to `true`** | true |
| spark.openlineage.vendors.iceberg.metricsReporterDisabled | Disables metrics reporter for Iceberg which turns off mechanism to collect scan and commit reports. | false |
120 changes: 120 additions & 0 deletions docs/integrations/spark/dataset_metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
---
sidebar_position: 7
title: Dataset Metrics
---

Input and output facets in OpenLineage specification describe datasets in the context of a given
run. For example, an amount of rows read is not a dataset facet as it does not describe the dataset.
For the convenience, OpenLineage events contain this information under `inputFacets` and `outputFacets`
fields of input and output datasets respectively.

## Standard Input / Output dataset statistics

OpenLineage specification comes with:
* [InputStatisticsInputDatasetFacet](../../spec/facets/dataset-facets/input-dataset-facets/input_statistics.md)
* [OutputStatisticsOutputDatasetFacet](../../spec/facets/dataset-facets/output-dataset-facets/output_statistics.md)

which are collected by the Spark integration. Those facets basically contain:
* amount rows read/written,
* amount of bytes read/written,
* amount of files read/written.

As a limitation to this, a row count for input datasets is collected only
for DataSourceV2 api datasets.

## Iceberg specific metrics reports

Even more extensive metrics are collected for Iceberg tables, as
the library exposes [MetricReport API](https://iceberg.apache.org/docs/latest/metrics-reporting/?h=metrics).
Two report types are currently supported:
* `ScanReport` - carries metrics being collected during scan planning against a given table.
Amongst some general information about the involved table, such as the snapshot id or the table
name, it includes metrics like:
* total scan planning duration
* number of data/delete files included in the result
* number of data/delete manifests scanned/skipped
* number of data/delete files scanned/skipped
* number of equality/positional delete files scanned
* `CommitReport` - carries metrics being collected after committing changes to a table (aka producing a snapshot).
Amongst some general information about the involved table, such as the snapshot
id or the table name, it includes metrics like:
* total duration
* number of attempts required for the commit to succeed
* number of added/removed data/delete files
* number of added/removed equality/positional delete files
* number of added/removed equality/positional deletes

At the bottom of the page, we list example facets generated by Spark integration.

This feature is delivered by implementing custom `OpenLineageMetricsReporter` class
as Iceberg metrics reporter and injecting it automatically into Iceberg catalog. If any other
custom reporter is present, `OpenLineageMetricsReporter` will overwrite it, but it will still
report metrics to it.

In case of any issues, a spark config flag:
`spark.openlineage.vendors.iceberg.metricsReporterDisabled=true` can be used to disable this feature.

```json
"icebergScanReport": {
"_producer":"https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark",
"_schemaURL":"https://openlineage.io/spec/facets/1-0-0/IcebergScanReportInputDatasetFacet.json",
"snapshotId":4115428054613373118,
"filterDescription":"",
"projectedFieldNames":[
"a",
"b"
],
"scanMetrics":{
"totalPlanningDuration":21,
"resultDataFiles":1,
"resultDeleteFiles":0,
"totalDataManifests":1,
"totalDeleteManifests":0,
"scannedDataManifests":1,
"skippedDataManifests":0,
"totalFileSizeInBytes":676,
"totalDeleteFileSizeInBytes":0,
"skippedDataFiles":0,
"skippedDeleteFiles":0,
"scannedDeleteManifests":0,
"skippedDeleteManifests":0,
"indexedDeleteFiles":0,
"equalityDeleteFiles":0,
"positionalDeleteFiles":0
},
"metadata":{
"engine-version":"3.3.4",
"iceberg-version":"Apache Iceberg 1.6.0 (commit 229d8f6fcd109e6c8943ea7cbb41dab746c6d0ed)",
"app-id":"local-1733228790932",
"engine-name":"spark"
}
}
```

```json
"icebergCommitReport": {
"snapshotId":3131594900391425696,
"sequenceNumber":2,
"operation":"append",
"commitMetrics":{
"totalDuration":87,
"attempts":1,
"addedDataFiles":1,
"totalDataFiles":2,
"totalDeleteFiles":0,
"addedRecords":1,
"totalRecords":4,
"addedFilesSizeInBytes":651,
"totalFilesSizeInBytes":1343,
"totalPositionalDeletes":0,
"totalEqualityDeletes":0
},
"metadata":{
"engine-version":"3.3.4",
"app-id":"local-1733228862465",
"engine-name":"spark",
"iceberg-version":"Apache Iceberg 1.6.0 (commit 229d8f6fcd109e6c8943ea7cbb41dab746c6d0ed)"
}
}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
sidebar_position: 1
---

# Input Statistics Facet

Example:

```json
{
...
"inputs": {
"inputFacets": {
"inputStatistics": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json",
"rowCount": 123,
"fileCount": 5,
"size": 35602
}
}
}
...
}
```
The facet specification can be found [here](https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json).
40 changes: 40 additions & 0 deletions static/apidocs/javadoc/allclasses-index.html
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,46 @@ <h1 title="All Classes and Interfaces" class="title">All Classes and Interfaces<
<div class="col-last odd-row-color all-classes-table all-classes-table-tab2">
<div class="block">builder class for GcpLineageJobFacetOrigin</div>
</div>
<div class="col-first even-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergCommitReportOutputDatasetFacet.html" title="class in io.openlineage.client">OpenLineage.IcebergCommitReportOutputDatasetFacet</a></div>
<div class="col-last even-row-color all-classes-table all-classes-table-tab2">
<div class="block">model class for IcebergCommitReportOutputDatasetFacet</div>
</div>
<div class="col-first odd-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergCommitReportOutputDatasetFacetCommitMetrics.html" title="class in io.openlineage.client">OpenLineage.IcebergCommitReportOutputDatasetFacetCommitMetrics</a></div>
<div class="col-last odd-row-color all-classes-table all-classes-table-tab2">
<div class="block">model class for IcebergCommitReportOutputDatasetFacetCommitMetrics</div>
</div>
<div class="col-first even-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergCommitReportOutputDatasetFacetCommitMetricsBuilder.html" title="class in io.openlineage.client">OpenLineage.IcebergCommitReportOutputDatasetFacetCommitMetricsBuilder</a></div>
<div class="col-last even-row-color all-classes-table all-classes-table-tab2">
<div class="block">builder class for IcebergCommitReportOutputDatasetFacetCommitMetrics</div>
</div>
<div class="col-first odd-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergCommitReportOutputDatasetFacetMetadata.html" title="class in io.openlineage.client">OpenLineage.IcebergCommitReportOutputDatasetFacetMetadata</a></div>
<div class="col-last odd-row-color all-classes-table all-classes-table-tab2">
<div class="block">model class for IcebergCommitReportOutputDatasetFacetMetadata</div>
</div>
<div class="col-first even-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergCommitReportOutputDatasetFacetMetadataBuilder.html" title="class in io.openlineage.client">OpenLineage.IcebergCommitReportOutputDatasetFacetMetadataBuilder</a></div>
<div class="col-last even-row-color all-classes-table all-classes-table-tab2">
<div class="block">builder class for IcebergCommitReportOutputDatasetFacetMetadata</div>
</div>
<div class="col-first odd-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergScanReportInputDatasetFacet.html" title="class in io.openlineage.client">OpenLineage.IcebergScanReportInputDatasetFacet</a></div>
<div class="col-last odd-row-color all-classes-table all-classes-table-tab2">
<div class="block">model class for IcebergScanReportInputDatasetFacet</div>
</div>
<div class="col-first even-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergScanReportInputDatasetFacetMetadata.html" title="class in io.openlineage.client">OpenLineage.IcebergScanReportInputDatasetFacetMetadata</a></div>
<div class="col-last even-row-color all-classes-table all-classes-table-tab2">
<div class="block">model class for IcebergScanReportInputDatasetFacetMetadata</div>
</div>
<div class="col-first odd-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergScanReportInputDatasetFacetMetadataBuilder.html" title="class in io.openlineage.client">OpenLineage.IcebergScanReportInputDatasetFacetMetadataBuilder</a></div>
<div class="col-last odd-row-color all-classes-table all-classes-table-tab2">
<div class="block">builder class for IcebergScanReportInputDatasetFacetMetadata</div>
</div>
<div class="col-first even-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergScanReportInputDatasetFacetScanMetrics.html" title="class in io.openlineage.client">OpenLineage.IcebergScanReportInputDatasetFacetScanMetrics</a></div>
<div class="col-last even-row-color all-classes-table all-classes-table-tab2">
<div class="block">model class for IcebergScanReportInputDatasetFacetScanMetrics</div>
</div>
<div class="col-first odd-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.IcebergScanReportInputDatasetFacetScanMetricsBuilder.html" title="class in io.openlineage.client">OpenLineage.IcebergScanReportInputDatasetFacetScanMetricsBuilder</a></div>
<div class="col-last odd-row-color all-classes-table all-classes-table-tab2">
<div class="block">builder class for IcebergScanReportInputDatasetFacetScanMetrics</div>
</div>
<div class="col-first even-row-color all-classes-table all-classes-table-tab2"><a href="io/openlineage/client/OpenLineage.InputDataset.html" title="class in io.openlineage.client">OpenLineage.InputDataset</a></div>
<div class="col-last even-row-color all-classes-table all-classes-table-tab2">
<div class="block">model class for InputDataset</div>
Expand Down
Loading

0 comments on commit 783bd31

Please sign in to comment.