From 23033746ec5caeb5027f470db1a558d33129dc33 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Wed, 25 Oct 2023 13:21:35 +0530 Subject: [PATCH] Ensure "nessie.commit.id" table property is set when updating the table Spark sets the table property NESSIE_COMMIT_ID_PROPERTY in NessieTableOperations#loadTableMetadata. Then NessieIcebergClient.commitTable uses this property. In Trino, this property is never set but used in NessieIcebergClient.commitTable as it is a common code. Hence, the commit id is old and doesn't allow new commits. Use the common code (available From Iceberg 1.4.0) NessieUtil.updateTableMetadataWithNessieSpecificProperties in Trino, which handles setting the property like "nessie.commit.id". --- .../AbstractIcebergTableOperations.java | 10 +++++++++- .../nessie/IcebergNessieTableOperations.java | 12 +++++++++++ .../TestIcebergSparkCompatibility.java | 20 ++++++++++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 36d09f2d0d19..94fec2f420a2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -39,6 +39,7 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; +import java.util.function.Function; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -224,6 +225,13 @@ protected String writeNewMetadata(TableMetadata metadata, int newVersion) } protected void refreshFromMetadataLocation(String newLocation) + { + refreshFromMetadataLocation( + newLocation, + metadataLocation -> TableMetadataParser.read(fileIo, fileIo.newInputFile(metadataLocation))); + } + + protected void refreshFromMetadataLocation(String newLocation, Function metadataLoader) { // use null-safe equality check because new tables have a null metadata location if (Objects.equals(currentMetadataLocation, newLocation)) { @@ -239,7 +247,7 @@ protected void refreshFromMetadataLocation(String newLocation) .withMaxDuration(Duration.ofMinutes(10)) .abortOn(failure -> failure instanceof ValidationException || isNotFoundException(failure)) .build()) - .get(() -> TableMetadataParser.read(fileIo, io().newInputFile(newLocation))); + .get(() -> metadataLoader.apply(newLocation)); } catch (Throwable failure) { if (isNotFoundException(failure)) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java index 75be83f8293d..fddbd0e025fb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java @@ -19,9 +19,11 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.nessie.NessieIcebergClient; +import org.apache.iceberg.nessie.NessieUtil; import org.projectnessie.error.NessieConflictException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.model.ContentKey; @@ -80,6 +82,16 @@ public TableMetadata refresh(boolean invalidateCaches) return super.refresh(invalidateCaches); } + @Override + protected void refreshFromMetadataLocation(String newLocation) + { + super.refreshFromMetadataLocation( + newLocation, + location -> NessieUtil.updateTableMetadataWithNessieSpecificProperties( + TableMetadataParser.read(fileIo, location), + location, table, getSchemaTableName().toString(), nessieClient.getReference())); + } + @Override protected String getRefreshedLocation(boolean invalidateCaches) { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 9d51741b127e..773a96fcce45 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -590,7 +590,7 @@ public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storageFormat, int specVersion) { String baseTableName = toLowerCase("test_trino_reading_spark_iceberg_table_properties_" + storageFormat); @@ -986,6 +986,24 @@ public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion) onTrino().executeQuery(format("DROP TABLE %s", trinoTableName(tableSameLocation2))); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}) + public void testTrinoWritingDataAfterSpark() + { + String baseTableName = toLowerCase("test_trino_write_after_spark"); + String sparkTableName = sparkTableName(baseTableName); + String trinoTableName = trinoTableName(baseTableName); + + onSpark().executeQuery("CREATE TABLE " + sparkTableName + " (a INT) USING ICEBERG"); + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES 1"); + + onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES 2"); + + List expected = ImmutableList.of(row(1), row(2)); + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion) {