Skip to content

Commit

Permalink
Spark 3.4: Throw better exception when filter expression cannot be tr…
Browse files Browse the repository at this point in the history
…anslated in Rewrite procedure (#8394)
  • Loading branch information
ConeyLiu authored Sep 20, 2023
1 parent a6e2459 commit 09a5dbc
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,26 @@ public void testDefaultSortOrder() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteWithUntranslatedOrUnconvertedFilter() {
createTable();
Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_data_files(table => '%s', where => 'substr(encode(c2, \"utf-8\"), 2) = \"fo\"')",
catalogName, tableIdent))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot translate Spark expression");

Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_data_files(table => '%s', where => 'substr(c2, 2) = \"fo\"')",
catalogName, tableIdent))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot convert Spark filter");
}

private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Encoders;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -202,6 +203,26 @@ public void testInvalidOption() throws Exception {
catalogName, tableIdent));
}

@Test
public void testRewriteWithUntranslatedOrUnconvertedFilter() throws Exception {
createTable();
Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_position_delete_files(table => '%s', where => 'substr(encode(data, \"utf-8\"), 2) = \"fo\"')",
catalogName, tableIdent))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot translate Spark expression");

Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_position_delete_files(table => '%s', where => 'substr(data, 2) = \"fo\"')",
catalogName, tableIdent))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot convert Spark filter");
}

private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,17 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark expression to Spark predicate
// and then converting Spark predicate to Iceberg expression.
// But these two conversions already exist and well tested. So, we are going with this approach.
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
DataSourceV2Strategy.translateFilterV2(sparkExpression) match {
case Some(filter) =>
val converted = SparkV2Filters.convert(filter)
if (converted == null) {
throw new IllegalArgumentException(s"Cannot convert Spark filter: $filter to Iceberg expression")
}

converted
case _ =>
throw new IllegalArgumentException(s"Cannot translate Spark expression: $sparkExpression to data source filter")
}
}

@throws[AnalysisException]
Expand Down

0 comments on commit 09a5dbc

Please sign in to comment.