Skip to content

Commit

Permalink
proposals (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
snazy authored May 30, 2024
1 parent 1a03544 commit a22c499
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 156 deletions.
4 changes: 4 additions & 0 deletions gc/gc-iceberg-files/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ dependencies {
testFixturesRuntimeOnly("com.google.cloud:google-cloud-storage")
testFixturesRuntimeOnly(libs.google.cloud.nio)

testFixturesApi(platform(libs.azuresdk.bom))
testFixturesApi("com.azure:azure-storage-file-datalake")
testFixturesRuntimeOnly("com.azure:azure-identity")

testFixturesApi(platform(libs.junit.bom))
testFixturesApi(libs.bundles.junit.testing)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.gc.iceberg.files;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.projectnessie.gc.files.DeleteSummary;
import org.projectnessie.gc.files.FileReference;
import org.projectnessie.objectstoragemock.Bucket;
import org.projectnessie.objectstoragemock.MockObject;
import org.projectnessie.objectstoragemock.ObjectStorageMock;
import org.projectnessie.storage.uri.StorageUri;

public abstract class AbstractFiles {

@Test
public void iceberg() throws Exception {
StorageUri baseUri = storageUri("/path/");

Set<String> keys = new TreeSet<>();
keys.add("path/file-1");
keys.add("path/file-2");
keys.add("path/file-3");
keys.add("path/dir-1/file-4");
keys.add("path/dir-1/dir-2/file-5");

try (ObjectStorageMock.MockServer server = createServer(keys);
IcebergFiles icebergFiles = createIcebergFiles(server)) {

Set<StorageUri> expect =
keys.stream().map(this::storageUri).collect(Collectors.toCollection(HashSet::new));

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files)
.allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri))
.map(FileReference::absolutePath)
.containsExactlyInAnyOrderElementsOf(expect);
}

icebergFiles.deleteMultiple(
baseUri,
Stream.of(
FileReference.of(StorageUri.of("file-2"), baseUri, -1L),
FileReference.of(StorageUri.of("file-3"), baseUri, -1L)));
expect.remove(baseUri.resolve("file-2"));
expect.remove(baseUri.resolve("file-3"));

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files)
.allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri))
.map(FileReference::absolutePath)
.containsExactlyInAnyOrderElementsOf(expect);
}

icebergFiles.delete(FileReference.of(StorageUri.of("dir-1/file-4"), baseUri, -1L));
expect.remove(baseUri.resolve("dir-1/file-4"));

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files)
.allSatisfy(f -> assertThat(f.base()).isEqualTo(baseUri))
.map(FileReference::absolutePath)
.containsExactlyInAnyOrderElementsOf(expect);
}
}
}

/**
* Creates many files, lists the files, deletes 10% of the created files, lists again.
*
* <p>Minio in the used configuration is not particularly fast - creating 100000 objects with 4
* threads (more crashes w/ timeouts) takes about ~30 minutes (plus ~3 seconds for listing 100000
* objects, plus ~3 seconds for deleting 10000 objects).
*/
@ParameterizedTest
@ValueSource(ints = {500})
public void manyFiles(int numFiles) throws Exception {
StorageUri baseUri = storageUri("/path/");

Set<String> keys =
IntStream.range(0, numFiles)
.mapToObj(i -> String.format("path/%d/%d", i % 100, i))
.collect(Collectors.toCollection(HashSet::new));

try (ObjectStorageMock.MockServer server = createServer(keys);
IcebergFiles icebergFiles = createIcebergFiles(server)) {

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files).hasSize(numFiles);
}

int deletes = numFiles / 10;
assertThat(
icebergFiles.deleteMultiple(
baseUri,
IntStream.range(0, deletes)
.mapToObj(i -> StorageUri.of(String.format("%d/%d", i % 100, i)))
.map(p -> FileReference.of(p, baseUri, -1L))))
.isEqualTo(DeleteSummary.of(deletes, 0L));

try (Stream<FileReference> files = icebergFiles.listRecursively(baseUri)) {
assertThat(files).hasSize(numFiles - deletes);
}
}
}

private ObjectStorageMock.MockServer createServer(Set<String> keys) {
return ObjectStorageMock.builder()
.putBuckets(
bucket(),
Bucket.builder()
.lister(
(String prefix, String offset) ->
keys.stream()
.map(
key ->
new Bucket.ListElement() {
@Override
public String key() {
return key;
}

@Override
public MockObject object() {
return MockObject.builder().build();
}
}))
.object(key -> keys.contains(key) ? MockObject.builder().build() : null)
.deleter(keys::remove)
.build())
.build()
.start();
}

private IcebergFiles createIcebergFiles(ObjectStorageMock.MockServer server) {
return IcebergFiles.builder()
.properties(icebergProperties(server))
.hadoopConfiguration(hadoopConfiguration(server))
.build();
}

protected abstract String bucket();

protected abstract StorageUri storageUri(String path);

protected abstract Map<String, ? extends String> icebergProperties(
ObjectStorageMock.MockServer server);

protected abstract Configuration hadoopConfiguration(ObjectStorageMock.MockServer server);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.gc.iceberg.files;

import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer;
import org.projectnessie.storage.uri.StorageUri;

public class TestIcebergADLSFiles extends AbstractFiles {

@Override
protected String bucket() {
return "$root";
}

@Override
protected Map<String, ? extends String> icebergProperties(MockServer server) {
Map<String, String> props = new HashMap<>();

props.put("adls.connection-string.account", server.getAdlsGen2BaseUri().toString());
props.put("adls.auth.shared-key.account.name", "[email protected]");
props.put("adls.auth.shared-key.account.key", "key");

return props;
}

protected Configuration hadoopConfiguration(MockServer server) {
Configuration conf = new Configuration();

conf.set("fs.azure.impl", "org.apache.hadoop.fs.azure.AzureNativeFileSystemStore");
conf.set("fs.AbstractFileSystem.azure.impl", "org.apache.hadoop.fs.azurebfs.Abfs");
conf.set("fs.azure.storage.emulator.account.name", "account");
conf.set("fs.azure.account.auth.type", "SharedKey");
conf.set("fs.azure.account.key.account", "<base-64-encoded-secret>");

return conf;
}

@Override
protected StorageUri storageUri(String path) {
return StorageUri.of(String.format("abfs://%s@account/", bucket())).resolve(path);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.gc.iceberg.files;

import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Disabled;
import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer;
import org.projectnessie.storage.uri.StorageUri;

@Disabled(
"Requires implementation of the /batch/storage/v1 endpoint in object-storage-mock. "
+ "That consumes a multipart/mixed content, which contains a series of serialized HTTP requests.")
public class TestIcebergGCSFiles extends AbstractFiles {

@Override
protected String bucket() {
return "bucket";
}

@Override
protected Map<String, ? extends String> icebergProperties(MockServer server) {
Map<String, String> props = new HashMap<>();

props.put("gcs.project-id", "my-project");
// MUST NOT end with a trailing slash, otherwise code like
// com.google.cloud.storage.spi.v1.HttpStorageRpc.DefaultRpcBatch.submit inserts an ambiguous
// empty path segment ("//").
String uri = server.getGcsBaseUri().toString();
uri = uri.substring(0, uri.length() - 1);
props.put("gcs.service.host", uri);
props.put("gcs.no-auth", "true");

return props;
}

protected Configuration hadoopConfiguration(MockServer server) {
Configuration conf = new Configuration();

conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
conf.set("fs.gs.project.id", "projectId");
conf.set("fs.gs.auth.type", "none");

return conf;
}

@Override
protected StorageUri storageUri(String path) {
return StorageUri.of(String.format("gs://%s/", bucket())).resolve(path);
}
}
Loading

0 comments on commit a22c499

Please sign in to comment.