-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refreshing website content from main repo.
Source commit: OpenLineage/OpenLineage@41afe4d
- Loading branch information
GitHub Action Website Snapshot
committed
Dec 23, 2024
1 parent
717afdf
commit 97f8632
Showing
1 changed file
with
203 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
--- | ||
title: Capturing dataset statistics in Apache Spark | ||
date: 2024-12-20 | ||
authors: [Leszczynski] | ||
description: Capturing dataset statistics in Apache Spark | ||
--- | ||
|
||
OpenLineage events enable the creation of a consistent lineage graph, where dataset vertices are connected through the jobs that read from and write to them. This graph becomes even more valuable when its nodes and edges are enriched with additional metadata for practical use cases. One important aspect to capture is the amount of data processed, as it facilitates applications such as cost estimation and data quality monitoring, among others. | ||
In this post, we introduce recent developments in Spark dataset statistics collection and reporting within OpenLineage events. We outline the basic statistics included, as well as the detailed scan and commit reports generated when processing Iceberg datasets. | ||
|
||
<!--truncate--> | ||
|
||
## Input and output facets | ||
|
||
OpenLineage events are made of three main building blocks: runs, jobs, and datasets. | ||
Metadata about processed data presents an interesting challenge: it is both a property of a run and inherently tied to the context of a specific dataset. | ||
On one hand, it is a property of a run. On the other hand, it makes | ||
sense only in the context of a given dataset. For this purpose, `inputFacets` and `outputFacets` were attached | ||
to datasets, although logically they describe dataset in the context of a given run. | ||
|
||
## Basic input and output statistics | ||
|
||
OpenLineage spec has already contained for some time [OutputStatisticsDatasetFacet](https://openlineage.io/docs/spec/facets/dataset-facets/output-dataset-facets/output_statistics/). | ||
Recently, we added [InputStatisticsDatasetFacet](https://openlineage.io/docs/spec/facets/dataset-facets/input-dataset-facets/input_statistics/). | ||
Both facets contain basic statistics like number of rows processed, size in bytes, and file count. | ||
|
||
We can demonstrate them in action by running Jupyter quickstart demo which emit OpenLineage events to | ||
docker logs. We start Spark session with: | ||
```python | ||
from pyspark.sql import SparkSession | ||
|
||
spark = (SparkSession.builder.master('local') | ||
.appName('sample_spark') | ||
.config('spark.jars.packages', 'io.openlineage:openlineage-spark_2.12:1.26.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1') | ||
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener') | ||
.config('spark.openlineage.transport.type', 'console') | ||
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") | ||
.config("spark.sql.catalog.spark_catalog.type", "hadoop") | ||
.config("spark.sql.catalog.spark_catalog.warehouse", "/tmp/iceberg") | ||
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") | ||
.getOrCreate()) | ||
spark.sparkContext.setLogLevel("INFO") | ||
``` | ||
load example flights dataset to a dataframe: | ||
``` | ||
!wget https://github.com/plotly/datasets/raw/refs/heads/master/2015_flights.parquet | ||
``` | ||
```python | ||
flights = spark.read.parquet("2015_flights.parquet") | ||
``` | ||
|
||
When running a simple query on the dataset | ||
```python | ||
flights \ | ||
.filter("distance < 150") \ | ||
.write \ | ||
.mode("overwrite") \ | ||
.saveAsTable("short_flights") | ||
``` | ||
we can see the input statistics facet: | ||
```json | ||
{ | ||
"inputStatistics": { | ||
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark", | ||
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json#/$defs/InputStatisticsInputDatasetFacet", | ||
"size": 25238218, | ||
"fileCount": 1 | ||
} | ||
} | ||
``` | ||
and the output statistics facet: | ||
```json | ||
{ | ||
"outputStatistics": { | ||
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark", | ||
"_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet", | ||
"rowCount": 214307, | ||
"size": 878649, | ||
"fileCount": 1 | ||
} | ||
} | ||
``` | ||
A limitation of this approach is that inputs' row count is collected for Spark V2 datasets only. | ||
|
||
## Iceberg metrics reported with OpenLineage | ||
|
||
A much more detailed report is collected when processing Iceberg datasets. Iceberg provides | ||
[extensive statistics about datasets through Metrics API](https://iceberg.apache.org/docs/latest/metrics-reporting). | ||
When a `MetricsReporter` is configured, it gets notified with scan and commit metrics. | ||
By default, a `LoggingMetricsReporter` is configured, but the API allows to implement | ||
[custom reporters](https://iceberg.apache.org/docs/latest/metrics-reporting/#via-catalog-configuration). | ||
This is what OpenLineage Spark agent does. It instantiates `OpenLineageMetricsReporter` and injects | ||
it into Iceberg catalog. | ||
|
||
Then, statistics sent into `OpenLineageMetricsReporter` are collected and | ||
sent with OpenLineage events. This is possible as each report contains a snapshot id which is also | ||
contained as dataset version within OpenLineage events. As a result, we can correlate | ||
Iceberg reports with OpenLineage datasets, and we are able to attach | ||
`IcebergScanReportInputDatasetFacet` and `IcebergCommitReportInputDatasetFacet`. | ||
Iceberg dataset facets are modelled after Iceberg metric result interfaces. | ||
|
||
We can run some operation on Iceberg datasets: | ||
```python | ||
flights.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_flights") | ||
spark \ | ||
.read \ | ||
.table("iceberg_flights") \ | ||
.filter("distance < 150") \ | ||
.write \ | ||
.format("iceberg") \ | ||
.mode("overwrite") \ | ||
.saveAsTable("short_flights") | ||
``` | ||
and OpenLineage events will contain scan report: | ||
```json | ||
{ | ||
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark", | ||
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/IcebergScanReportInputDatasetFacet.json", | ||
"snapshotId": 4808226079342265000, | ||
"filterDescription": "(not_null(ref(name=\"DISTANCE\")) and ref(name=\"DISTANCE\") < \"(3-digit-int)\")", | ||
"projectedFieldNames": [ | ||
"DEPARTURE_DELAY", | ||
"ARRIVAL_DELAY", | ||
"DISTANCE", | ||
"SCHEDULED_DEPARTURE" | ||
], | ||
"scanMetrics": { | ||
"totalPlanningDuration": 32, | ||
"resultDataFiles": 1, | ||
"resultDeleteFiles": 0, | ||
"totalDataManifests": 1, | ||
"totalDeleteManifests": 0, | ||
"scannedDataManifests": 1, | ||
"skippedDataManifests": 0, | ||
"totalFileSizeInBytes": 21970429, | ||
"totalDeleteFileSizeInBytes": 0, | ||
"skippedDataFiles": 0, | ||
"skippedDeleteFiles": 0, | ||
"scannedDeleteManifests": 0, | ||
"skippedDeleteManifests": 0, | ||
"indexedDeleteFiles": 0, | ||
"equalityDeleteFiles": 0, | ||
"positionalDeleteFiles": 0 | ||
}, | ||
"metadata": { | ||
"engine-version": "3.5.0", | ||
"iceberg-version": "Apache Iceberg 1.7.1 (commit 4a432839233f2343a9eae8255532f911f06358ef)", | ||
"app-id": "local-1734096828109", | ||
"engine-name": "spark" | ||
} | ||
} | ||
``` | ||
|
||
Commit report will be reported in future Iceberg versions due to a recently resolved issue https://github.com/apache/iceberg/issues/11664 | ||
which affects commit metrics for CTAS queries. | ||
|
||
However, this can be still tested for append operations: | ||
```python | ||
spark \ | ||
.read \ | ||
.table("iceberg_flights") \ | ||
.filter("distance >= 150 AND distance < 160") \ | ||
.write \ | ||
.insertInto("short_flights") | ||
``` | ||
which results in a facet: | ||
```json | ||
{ | ||
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark", | ||
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/IcebergCommitReportOutputDatasetFacet.json", | ||
"operation": "append", | ||
"commitMetrics": { | ||
"totalDuration": 64, | ||
"attempts": 1, | ||
"addedDataFiles": 1, | ||
"totalDataFiles": 2, | ||
"totalDeleteFiles": 0, | ||
"addedRecords": 37673, | ||
"totalRecords": 251980, | ||
"addedFilesSizeInBytes": 105467, | ||
"totalFilesSizeInBytes": 802832, | ||
"totalPositionalDeletes": 0, | ||
"totalEqualityDeletes": 0 | ||
}, | ||
"snapshotId": 586841490378901100, | ||
"metadata": { | ||
"engine-version": "3.5.0", | ||
"app-id": "local-1734527649353", | ||
"engine-name": "spark", | ||
"iceberg-version": "Apache Iceberg 1.7.1 (commit 4a432839233f2343a9eae8255532f911f06358ef)" | ||
}, | ||
"sequenceNumber": 2 | ||
} | ||
``` | ||
|
||
## Conclusion | ||
|
||
When reading a gargantuan dataset, scan metrics provide details on the amount of data processed. | ||
The same happens for commit metrics and writes. | ||
These enhancements provide a more detailed view of Spark jobs to help you cost-effectively manage your data processing pipelines. | ||
The OpenLineage project is continuously evolving, and we are excited to see how these new features will be used in practice. | ||
|