Skip to content

Commit

Permalink
Ensure "nessie.commit.id" table property is set when updating the table
Browse files Browse the repository at this point in the history
Spark sets the table property NESSIE_COMMIT_ID_PROPERTY in
NessieTableOperations#loadTableMetadata.
Then NessieIcebergClient.commitTable uses this property.

In Trino, this property is never set but used in
NessieIcebergClient.commitTable as it is a common code.
Hence, the commit id is old and doesn't allow new commits.

Use the common code (available From Iceberg 1.4.0)
NessieUtil.updateTableMetadataWithNessieSpecificProperties in Trino,
which handles setting the property like "nessie.commit.id".
  • Loading branch information
ajantha-bhat committed Nov 21, 2023
1 parent 7839ca7 commit 2303374
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -224,6 +225,13 @@ protected String writeNewMetadata(TableMetadata metadata, int newVersion)
}

protected void refreshFromMetadataLocation(String newLocation)
{
refreshFromMetadataLocation(
newLocation,
metadataLocation -> TableMetadataParser.read(fileIo, fileIo.newInputFile(metadataLocation)));
}

protected void refreshFromMetadataLocation(String newLocation, Function<String, TableMetadata> metadataLoader)
{
// use null-safe equality check because new tables have a null metadata location
if (Objects.equals(currentMetadataLocation, newLocation)) {
Expand All @@ -239,7 +247,7 @@ protected void refreshFromMetadataLocation(String newLocation)
.withMaxDuration(Duration.ofMinutes(10))
.abortOn(failure -> failure instanceof ValidationException || isNotFoundException(failure))
.build())
.get(() -> TableMetadataParser.read(fileIo, io().newInputFile(newLocation)));
.get(() -> metadataLoader.apply(newLocation));
}
catch (Throwable failure) {
if (isNotFoundException(failure)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.apache.iceberg.nessie.NessieUtil;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.ContentKey;
Expand Down Expand Up @@ -80,6 +82,16 @@ public TableMetadata refresh(boolean invalidateCaches)
return super.refresh(invalidateCaches);
}

@Override
protected void refreshFromMetadataLocation(String newLocation)
{
super.refreshFromMetadataLocation(
newLocation,
location -> NessieUtil.updateTableMetadataWithNessieSpecificProperties(
TableMetadataParser.read(fileIo, location),
location, table, getSchemaTableName().toString(), nessieClient.getReference()));
}

@Override
protected String getRefreshedLocation(boolean invalidateCaches)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat)
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_trino_reading_spark_iceberg_table_properties_" + storageFormat);
Expand Down Expand Up @@ -986,6 +986,24 @@ public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion)
onTrino().executeQuery(format("DROP TABLE %s", trinoTableName(tableSameLocation2)));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE})
public void testTrinoWritingDataAfterSpark()
{
String baseTableName = toLowerCase("test_trino_write_after_spark");
String sparkTableName = sparkTableName(baseTableName);
String trinoTableName = trinoTableName(baseTableName);

onSpark().executeQuery("CREATE TABLE " + sparkTableName + " (a INT) USING ICEBERG");
onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES 1");

onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES 2");

List<Row> expected = ImmutableList.of(row(1), row(2));
assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected);
assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected);
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion)
{
Expand Down

0 comments on commit 2303374

Please sign in to comment.