Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating SparkScan to only read Apache DataSketches #11035

Merged
merged 10 commits into from
Oct 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1";

public static final String PRESTO_SUM_DATA_SIZE_BYTES_V1 = "presto-sum-data-size-bytes-v1";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to store the exact parameter used by presto as part of iceberg.
we can use it in the test or even use a dummy identifier to simulate the existence of additional non supported metadata.

separately we should reach agreement on what is the right way to store the data size in the puffin file cross engines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove this since Iceberg doesn't support this yet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, it shouldn't be here. If it is a generic blob type we want to support across engines, we should discuss this on the dev list and vote.

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,28 +199,24 @@ protected Statistics estimateStatistics(Snapshot snapshot) {
List<BlobMetadata> metadataList = (files.get(0)).blobMetadata();

for (BlobMetadata blobMetadata : metadataList) {
int id = blobMetadata.fields().get(0);
String colName = table.schema().findColumnName(id);
NamedReference ref = FieldReference.column(colName);

Long ndv = null;
if (blobMetadata
.type()
.equals(org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1)) {
int id = blobMetadata.fields().get(0);
String colName = table.schema().findColumnName(id);
NamedReference ref = FieldReference.column(colName);
Long ndv = null;
String ndvStr = blobMetadata.properties().get(NDV_KEY);
if (!Strings.isNullOrEmpty(ndvStr)) {
ndv = Long.parseLong(ndvStr);
} else {
LOG.debug("ndv is not set in BlobMetadata for column {}", colName);
}
} else {
LOG.debug("DataSketch blob is not available for column {}", colName);
}
ColumnStatistics colStats =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we should group the metadata by field first and then extract all of the relevant metadata and create the SparkColumnStatistics instance for the column
This is not specifically related to this PR because this was the behaviour before but we might want to address it as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

new SparkColumnStatistics(ndv, null, null, null, null, null, null);

ColumnStatistics colStats =
new SparkColumnStatistics(ndv, null, null, null, null, null, null);

colStatsMap.put(ref, colStats);
colStatsMap.put(ref, colStats);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.source;

import static org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1;
import static org.apache.iceberg.puffin.StandardBlobTypes.PRESTO_SUM_DATA_SIZE_BYTES_V1;
import static org.apache.iceberg.spark.SystemFunctionPushDownHelper.createPartitionedTable;
import static org.apache.iceberg.spark.SystemFunctionPushDownHelper.createUnpartitionedTable;
import static org.apache.iceberg.spark.SystemFunctionPushDownHelper.timestampStrToDayOrdinal;
Expand Down Expand Up @@ -178,6 +179,59 @@ public void testTableWithoutColStats() throws NoSuchTableException {
reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, Maps.newHashMap()));
}

@TestTemplate
public void testTableWithoutApacheDatasketchColStat() throws NoSuchTableException {
sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "a"),
new SimpleRecord(4, "b"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(tableName)
.append();

Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();

SparkScanBuilder scanBuilder =
new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
SparkScan scan = (SparkScan) scanBuilder.build();

Map<String, String> reportColStatsDisabled =
ImmutableMap.of(
SQLConf.CBO_ENABLED().key(), "true", SparkSQLProperties.REPORT_COLUMN_STATS, "false");

Map<String, String> reportColStatsEnabled =
ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");

GenericStatisticsFile statisticsFile =
new GenericStatisticsFile(
snapshotId,
"/test/statistics/file.puffin",
100,
42,
ImmutableList.of(
new GenericBlobMetadata(
PRESTO_SUM_DATA_SIZE_BYTES_V1,
snapshotId,
1,
ImmutableList.of(1),
ImmutableMap.of("data_size", "4"))));

table.updateStatistics().setStatistics(snapshotId, statisticsFile).commit();

checkColStatisticsNotReported(scan, 4L);
withSQLConf(reportColStatsDisabled, () -> checkColStatisticsNotReported(scan, 4L));
// The expected col NDVs are nulls
withSQLConf(
reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, Maps.newHashMap()));
}

@TestTemplate
public void testTableWithOneColStats() throws NoSuchTableException {
sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
Expand Down Expand Up @@ -232,6 +286,67 @@ public void testTableWithOneColStats() throws NoSuchTableException {
withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, expectedOneNDV));
}

@TestTemplate
public void testTableWithOneApacheDatasketchColStatAndOneDifferentColStat()
throws NoSuchTableException {
sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "a"),
new SimpleRecord(4, "b"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(tableName)
.append();

Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();

SparkScanBuilder scanBuilder =
new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
SparkScan scan = (SparkScan) scanBuilder.build();

Map<String, String> reportColStatsDisabled =
ImmutableMap.of(
SQLConf.CBO_ENABLED().key(), "true", SparkSQLProperties.REPORT_COLUMN_STATS, "false");

Map<String, String> reportColStatsEnabled =
ImmutableMap.of(SQLConf.CBO_ENABLED().key(), "true");

GenericStatisticsFile statisticsFile =
new GenericStatisticsFile(
snapshotId,
"/test/statistics/file.puffin",
100,
42,
ImmutableList.of(
new GenericBlobMetadata(
APACHE_DATASKETCHES_THETA_V1,
snapshotId,
1,
ImmutableList.of(1),
ImmutableMap.of("ndv", "4")),
new GenericBlobMetadata(
PRESTO_SUM_DATA_SIZE_BYTES_V1,
snapshotId,
1,
ImmutableList.of(1),
ImmutableMap.of("data_size", "2"))));

table.updateStatistics().setStatistics(snapshotId, statisticsFile).commit();

checkColStatisticsNotReported(scan, 4L);
withSQLConf(reportColStatsDisabled, () -> checkColStatisticsNotReported(scan, 4L));

Map<String, Long> expectedOneNDV = Maps.newHashMap();
expectedOneNDV.put("id", 4L);
withSQLConf(reportColStatsEnabled, () -> checkColStatisticsReported(scan, 4L, expectedOneNDV));
}

@TestTemplate
public void testTableWithTwoColStats() throws NoSuchTableException {
sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
Expand Down