From 97f8632ed014510c2f0215119f37e115da76be2a Mon Sep 17 00:00:00 2001 From: GitHub Action Website Snapshot <> Date: Mon, 23 Dec 2024 08:22:05 +0000 Subject: [PATCH] Refreshing website content from main repo. Source commit: https://github.com/OpenLineage/OpenLineage/commit/41afe4dd50773a5c0902679c91622031d64a06b9 --- blog/spark-dataset-statistics/index.mdx | 203 ++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 blog/spark-dataset-statistics/index.mdx diff --git a/blog/spark-dataset-statistics/index.mdx b/blog/spark-dataset-statistics/index.mdx new file mode 100644 index 0000000..c296f76 --- /dev/null +++ b/blog/spark-dataset-statistics/index.mdx @@ -0,0 +1,203 @@ +--- +title: Capturing dataset statistics in Apache Spark +date: 2024-12-20 +authors: [Leszczynski] +description: Capturing dataset statistics in Apache Spark +--- + +OpenLineage events enable the creation of a consistent lineage graph, where dataset vertices are connected through the jobs that read from and write to them. This graph becomes even more valuable when its nodes and edges are enriched with additional metadata for practical use cases. One important aspect to capture is the amount of data processed, as it facilitates applications such as cost estimation and data quality monitoring, among others. +In this post, we introduce recent developments in Spark dataset statistics collection and reporting within OpenLineage events. We outline the basic statistics included, as well as the detailed scan and commit reports generated when processing Iceberg datasets. + + + +## Input and output facets + +OpenLineage events are made of three main building blocks: runs, jobs, and datasets. +Metadata about processed data presents an interesting challenge: it is both a property of a run and inherently tied to the context of a specific dataset. +On one hand, it is a property of a run. On the other hand, it makes +sense only in the context of a given dataset. For this purpose, `inputFacets` and `outputFacets` were attached +to datasets, although logically they describe dataset in the context of a given run. + +## Basic input and output statistics + +OpenLineage spec has already contained for some time [OutputStatisticsDatasetFacet](https://openlineage.io/docs/spec/facets/dataset-facets/output-dataset-facets/output_statistics/). +Recently, we added [InputStatisticsDatasetFacet](https://openlineage.io/docs/spec/facets/dataset-facets/input-dataset-facets/input_statistics/). +Both facets contain basic statistics like number of rows processed, size in bytes, and file count. + +We can demonstrate them in action by running Jupyter quickstart demo which emit OpenLineage events to +docker logs. We start Spark session with: +```python +from pyspark.sql import SparkSession + +spark = (SparkSession.builder.master('local') + .appName('sample_spark') + .config('spark.jars.packages', 'io.openlineage:openlineage-spark_2.12:1.26.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1') + .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener') + .config('spark.openlineage.transport.type', 'console') + .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", "/tmp/iceberg") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .getOrCreate()) +spark.sparkContext.setLogLevel("INFO") +``` +load example flights dataset to a dataframe: +``` +!wget https://github.com/plotly/datasets/raw/refs/heads/master/2015_flights.parquet +``` +```python +flights = spark.read.parquet("2015_flights.parquet") +``` + +When running a simple query on the dataset +```python +flights \ + .filter("distance < 150") \ + .write \ + .mode("overwrite") \ + .saveAsTable("short_flights") +``` +we can see the input statistics facet: +```json +{ + "inputStatistics": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json#/$defs/InputStatisticsInputDatasetFacet", + "size": 25238218, + "fileCount": 1 + } +} +``` +and the output statistics facet: +```json +{ + "outputStatistics": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet", + "rowCount": 214307, + "size": 878649, + "fileCount": 1 + } +} +``` +A limitation of this approach is that inputs' row count is collected for Spark V2 datasets only. + +## Iceberg metrics reported with OpenLineage + +A much more detailed report is collected when processing Iceberg datasets. Iceberg provides +[extensive statistics about datasets through Metrics API](https://iceberg.apache.org/docs/latest/metrics-reporting). +When a `MetricsReporter` is configured, it gets notified with scan and commit metrics. +By default, a `LoggingMetricsReporter` is configured, but the API allows to implement +[custom reporters](https://iceberg.apache.org/docs/latest/metrics-reporting/#via-catalog-configuration). +This is what OpenLineage Spark agent does. It instantiates `OpenLineageMetricsReporter` and injects +it into Iceberg catalog. + +Then, statistics sent into `OpenLineageMetricsReporter` are collected and +sent with OpenLineage events. This is possible as each report contains a snapshot id which is also +contained as dataset version within OpenLineage events. As a result, we can correlate +Iceberg reports with OpenLineage datasets, and we are able to attach +`IcebergScanReportInputDatasetFacet` and `IcebergCommitReportInputDatasetFacet`. +Iceberg dataset facets are modelled after Iceberg metric result interfaces. + +We can run some operation on Iceberg datasets: +```python +flights.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_flights") +spark \ + .read \ + .table("iceberg_flights") \ + .filter("distance < 150") \ + .write \ + .format("iceberg") \ + .mode("overwrite") \ + .saveAsTable("short_flights") +``` +and OpenLineage events will contain scan report: +```json +{ + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/IcebergScanReportInputDatasetFacet.json", + "snapshotId": 4808226079342265000, + "filterDescription": "(not_null(ref(name=\"DISTANCE\")) and ref(name=\"DISTANCE\") < \"(3-digit-int)\")", + "projectedFieldNames": [ + "DEPARTURE_DELAY", + "ARRIVAL_DELAY", + "DISTANCE", + "SCHEDULED_DEPARTURE" + ], + "scanMetrics": { + "totalPlanningDuration": 32, + "resultDataFiles": 1, + "resultDeleteFiles": 0, + "totalDataManifests": 1, + "totalDeleteManifests": 0, + "scannedDataManifests": 1, + "skippedDataManifests": 0, + "totalFileSizeInBytes": 21970429, + "totalDeleteFileSizeInBytes": 0, + "skippedDataFiles": 0, + "skippedDeleteFiles": 0, + "scannedDeleteManifests": 0, + "skippedDeleteManifests": 0, + "indexedDeleteFiles": 0, + "equalityDeleteFiles": 0, + "positionalDeleteFiles": 0 + }, + "metadata": { + "engine-version": "3.5.0", + "iceberg-version": "Apache Iceberg 1.7.1 (commit 4a432839233f2343a9eae8255532f911f06358ef)", + "app-id": "local-1734096828109", + "engine-name": "spark" + } +} +``` + +Commit report will be reported in future Iceberg versions due to a recently resolved issue https://github.com/apache/iceberg/issues/11664 +which affects commit metrics for CTAS queries. + +However, this can be still tested for append operations: +```python +spark \ + .read \ + .table("iceberg_flights") \ + .filter("distance >= 150 AND distance < 160") \ + .write \ + .insertInto("short_flights") +``` +which results in a facet: + +```json +{ + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.26.0-SNAPSHOT/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/IcebergCommitReportOutputDatasetFacet.json", + "operation": "append", + "commitMetrics": { + "totalDuration": 64, + "attempts": 1, + "addedDataFiles": 1, + "totalDataFiles": 2, + "totalDeleteFiles": 0, + "addedRecords": 37673, + "totalRecords": 251980, + "addedFilesSizeInBytes": 105467, + "totalFilesSizeInBytes": 802832, + "totalPositionalDeletes": 0, + "totalEqualityDeletes": 0 + }, + "snapshotId": 586841490378901100, + "metadata": { + "engine-version": "3.5.0", + "app-id": "local-1734527649353", + "engine-name": "spark", + "iceberg-version": "Apache Iceberg 1.7.1 (commit 4a432839233f2343a9eae8255532f911f06358ef)" + }, + "sequenceNumber": 2 +} +``` + +## Conclusion + +When reading a gargantuan dataset, scan metrics provide details on the amount of data processed. +The same happens for commit metrics and writes. +These enhancements provide a more detailed view of Spark jobs to help you cost-effectively manage your data processing pipelines. +The OpenLineage project is continuously evolving, and we are excited to see how these new features will be used in practice. +