Skip to content

Commit

Permalink
Bump Iceberg and Nessie versions
Browse files Browse the repository at this point in the history
- Bump Iceberg to 1.5.0 and Nessie to 0.77.1
- Remove deprecated usage
- Fix regression when reading manifest file by overriding
newInputfile and passing in the known length

Co-authored-by: amogh-jahagirdar <[email protected]>
  • Loading branch information
ajantha-bhat and amogh-jahagirdar committed Mar 15, 2024
1 parent a4bf353 commit ec1d5dd
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 27 deletions.
4 changes: 2 additions & 2 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<!-- Nessie version (matching to Iceberg release) must be bumped along with Iceberg version bump to avoid compatibility issues -->
<dep.nessie.version>0.71.1</dep.nessie.version>
<dep.nessie.version>0.77.1</dep.nessie.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -338,7 +338,7 @@
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.3</version>
<version>5.3.1</version>
<scope>runtime</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.projectnessie.client.NessieClientBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.auth.BearerAuthenticationProvider;
import org.projectnessie.client.http.HttpClientBuilder;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.lang.Math.toIntExact;
Expand All @@ -47,7 +47,7 @@ protected void setup(Binder binder)
@Singleton
public static NessieIcebergClient createNessieIcebergClient(IcebergNessieCatalogConfig icebergNessieCatalogConfig)
{
HttpClientBuilder builder = HttpClientBuilder.builder()
NessieClientBuilder builder = NessieClientBuilder.createClientBuilderFromSystemSettings()
.withUri(icebergNessieCatalogConfig.getServerUri())
.withDisableCompression(!icebergNessieCatalogConfig.isCompressionEnabled())
.withReadTimeout(toIntExact(icebergNessieCatalogConfig.getReadTimeout().toMillis()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,14 @@ protected String getRefreshedLocation(boolean invalidateCaches)
protected void commitNewTable(TableMetadata metadata)
{
verify(version.isEmpty(), "commitNewTable called on a table which already exists");
String contentId = table == null ? null : table.getId();
try {
nessieClient.commitTable(null, metadata, writeNewMetadata(metadata, 0), table, toKey(new SchemaTableName(database, this.tableName)));
nessieClient.commitTable(
null,
metadata,
writeNewMetadata(metadata, 0),
contentId,
toKey(database, tableName));
}
catch (NessieNotFoundException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit: ref '%s' no longer exists", nessieClient.refName()), e);
Expand All @@ -125,8 +131,16 @@ protected void commitNewTable(TableMetadata metadata)
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
verify(version.orElseThrow() >= 0, "commitToExistingTable called on a new table");
if (table == null) {
table = nessieClient.table(toIdentifier(new SchemaTableName(database, tableName)));
}
try {
nessieClient.commitTable(base, metadata, writeNewMetadata(metadata, version.getAsInt() + 1), table, toKey(new SchemaTableName(database, this.tableName)));
nessieClient.commitTable(
base,
metadata,
writeNewMetadata(metadata, version.getAsInt() + 1),
table.getId(),
toKey(database, tableName));
}
catch (NessieNotFoundException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit: ref '%s' no longer exists", nessieClient.refName()), e);
Expand All @@ -144,8 +158,9 @@ protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata m
throw new UnsupportedOperationException();
}

private static ContentKey toKey(SchemaTableName tableName)
private static ContentKey toKey(String databaseName, String tableName)
{
return ContentKey.of(Namespace.parse(tableName.getSchemaName()), tableName.getTableName());
SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName);
return ContentKey.of(Namespace.parse(schemaTableName.getSchemaName()), schemaTableName.getTableName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import com.google.common.collect.Iterables;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -28,6 +31,7 @@
import java.util.Map;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

Expand Down Expand Up @@ -88,6 +92,37 @@ public void deleteFiles(Iterable<String> pathsToDelete)
partitions.forEach(this::deleteBatch);
}

// TODO: remove below workarounds after https://github.com/apache/iceberg/pull/9953
@Override
public InputFile newInputFile(ManifestFile manifest)
{
checkArgument(
manifest.keyMetadata() == null,
"Cannot decrypt manifest: %s (use EncryptingFileIO)",
manifest.path());
return newInputFile(manifest.path(), manifest.length());
}

@Override
public InputFile newInputFile(DataFile file)
{
checkArgument(
file.keyMetadata() == null,
"Cannot decrypt data file: %s (use EncryptingFileIO)",
file.path());
return newInputFile(file.path().toString(), file.fileSizeInBytes());
}

@Override
public InputFile newInputFile(DeleteFile file)
{
checkArgument(
file.keyMetadata() == null,
"Cannot decrypt delete file: %s (use EncryptingFileIO)",
file.path());
return newInputFile(file.path().toString(), file.fileSizeInBytes());
}

private void deleteBatch(List<String> filesToDelete)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.projectnessie.client.NessieClientBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.http.HttpClientBuilder;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -94,7 +94,7 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations)
TrinoFileSystemFactory fileSystemFactory = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS);
IcebergNessieCatalogConfig icebergNessieCatalogConfig = new IcebergNessieCatalogConfig()
.setServerUri(URI.create(nessieContainer.getRestApiUri()));
NessieApiV1 nessieApi = HttpClientBuilder.builder()
NessieApiV1 nessieApi = NessieClientBuilder.createClientBuilderFromSystemSettings()
.withUri(nessieContainer.getRestApiUri())
.build(NessieApiV1.class);
NessieIcebergClient nessieClient = new NessieIcebergClient(nessieApi, icebergNessieCatalogConfig.getDefaultReferenceName(), null, ImmutableMap.of());
Expand All @@ -118,7 +118,7 @@ public void testDefaultLocation()
IcebergNessieCatalogConfig icebergNessieCatalogConfig = new IcebergNessieCatalogConfig()
.setDefaultWarehouseDir(tmpDirectory.toAbsolutePath().toString())
.setServerUri(URI.create(nessieContainer.getRestApiUri()));
NessieApiV1 nessieApi = HttpClientBuilder.builder()
NessieApiV1 nessieApi = NessieClientBuilder.createClientBuilderFromSystemSettings()
.withUri(nessieContainer.getRestApiUri())
.build(NessieApiV1.class);
NessieIcebergClient nessieClient = new NessieIcebergClient(nessieApi, icebergNessieCatalogConfig.getDefaultReferenceName(), null, ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,6 @@ public void testDropTableWithMissingManifestListFile()
.hasMessageContaining("Table location should not exist");
}

@Test
@Override
public void testDropTableWithMissingDataFile()
{
assertThatThrownBy(super::testDropTableWithMissingDataFile)
.hasMessageContaining("Table location should not exist");
}

@Test
@Override
public void testDropTableWithNonExistentTableLocation()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public class NessieContainer
{
private static final Logger log = Logger.get(NessieContainer.class);

public static final String DEFAULT_IMAGE = "projectnessie/nessie:0.71.1";
public static final String DEFAULT_IMAGE = "ghcr.io/projectnessie/nessie:0.77.1";
public static final String DEFAULT_HOST_NAME = "nessie";
public static final String VERSION_STORE_TYPE = "INMEMORY";
public static final String VERSION_STORE_TYPE = "IN_MEMORY";

public static final int PORT = 19121;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

public final class TestingTrinoIcebergJdbcUtil
{
public static final String CREATE_CATALOG_TABLE = JdbcUtil.CREATE_CATALOG_TABLE;
public static final String CREATE_NAMESPACE_PROPERTIES_TABLE = JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE;
public static final String CREATE_CATALOG_TABLE = JdbcUtil.V0_CREATE_CATALOG_SQL;
public static final String CREATE_NAMESPACE_PROPERTIES_TABLE = JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE_SQL;

private TestingTrinoIcebergJdbcUtil() {}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
<dep.errorprone.version>2.25.0</dep.errorprone.version>
<dep.flyway.version>10.9.0</dep.flyway.version>
<dep.google.http.client.version>1.44.1</dep.google.http.client.version>
<dep.iceberg.version>1.4.3</dep.iceberg.version>
<dep.iceberg.version>1.5.0</dep.iceberg.version>
<dep.jna.version>5.14.0</dep.jna.version>
<dep.joda.version>2.12.7</dep.joda.version>
<dep.jsonwebtoken.version>0.12.5</dep.jsonwebtoken.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class EnvSinglenodeSparkIcebergNessie

private static final int SPARK_THRIFT_PORT = 10213;
private static final int NESSIE_PORT = 19120;
private static final String NESSIE_VERSION = "0.71.1";
private static final String NESSIE_VERSION = "0.77.1";
private static final String SPARK = "spark";

private final DockerFiles dockerFiles;
Expand Down Expand Up @@ -99,8 +99,8 @@ private DockerContainer createSparkContainer()

private DockerContainer createNessieContainer()
{
DockerContainer container = new DockerContainer("projectnessie/nessie:" + NESSIE_VERSION, "nessie-server")
.withEnv("NESSIE_VERSION_STORE_TYPE", "INMEMORY")
DockerContainer container = new DockerContainer("ghcr.io/projectnessie/nessie:" + NESSIE_VERSION, "nessie-server")
.withEnv("NESSIE_VERSION_STORE_TYPE", "IN_MEMORY")
.withEnv("QUARKUS_HTTP_PORT", Integer.valueOf(NESSIE_PORT).toString())
.withStartupCheckStrategy(new IsRunningStartupCheckStrategy())
.waitingFor(forSelectedPorts(NESSIE_PORT));
Expand Down

0 comments on commit ec1d5dd

Please sign in to comment.