Skip to content

Commit

Permalink
Derive location of new tables from parent namespaces, add some vali…
Browse files Browse the repository at this point in the history
…dations (#9612)

Whether the `location` property/attribute can be set is delegated to access checks, therefore the currently existing restriction that the location cannot be set is relaxed w/ this change if no access control for it is in place.

This change also adds somewhat rudimentary checks whether the configured location is valid.

Fixes #9331 
Fixes #9558
  • Loading branch information
snazy authored Sep 24, 2024
1 parent 375541c commit caf47ae
Show file tree
Hide file tree
Showing 16 changed files with 341 additions and 79 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ as necessary. Empty sections will not end in the release notes.

### Changes

- The base `location` of a new entity (e.g. tables) created via Iceberg REST is derived from the nearest
parent namespace that has an explicitly set `location` property. (Path separator character is `/`.)
- The `location` property on tables (and view) created via Iceberg REST may be explicitly configured, as
long as it can be resolved against the configured object storage locations. (Path separator character
is `/`.)

### Deprecations

### Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import org.projectnessie.storage.uri.StorageUri;
Expand Down Expand Up @@ -60,4 +61,11 @@ void trinoSampleConfig(
StorageUri warehouse,
Map<String, String> icebergConfig,
BiConsumer<String, String> properties);

/**
* Checks whether the given storage URI can be resolved.
*
* @return an empty optional, if the URI can be resolved, otherwise an error message.
*/
Optional<String> canResolve(StorageUri uri);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import org.projectnessie.catalog.files.api.ObjectIO;
Expand All @@ -43,6 +44,16 @@ public InputStream readObject(StorageUri uri) throws IOException {
return resolve(uri).readObject(uri);
}

@Override
public Optional<String> canResolve(StorageUri uri) {
try {
ObjectIO resolved = resolve(uri);
return resolved.canResolve(uri);
} catch (IllegalArgumentException e) {
return Optional.of(e.getMessage());
}
}

@Override
public void configureIcebergWarehouse(
StorageUri warehouse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ AdlsOptions adlsOptions() {
return adlsOptions;
}

SecretsProvider secretsProvider() {
return secretsProvider;
}

public DataLakeFileClient fileClientForLocation(StorageUri uri) {
AdlsLocation location = adlsLocation(uri);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ public void deleteObjects(List<StorageUri> uris) throws IOException {
}
}

@Override
public Optional<String> canResolve(StorageUri uri) {
try {
DataLakeFileClient file = clientSupplier.fileClientForLocation(uri);
return file != null ? Optional.empty() : Optional.of("ADLS client could not be constructed");
} catch (IllegalArgumentException e) {
return Optional.of(e.getMessage());
}
}

@Override
public void configureIcebergWarehouse(
StorageUri warehouse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,19 @@ public void deleteObjects(List<StorageUri> uris) {
}
}

@Override
public Optional<String> canResolve(StorageUri uri) {
try {
GcsLocation location = gcsLocation(uri);
GcsBucketOptions bucketOptions = storageSupplier.bucketOptions(location);
@SuppressWarnings("resource")
Storage client = storageSupplier.forLocation(bucketOptions);
return client != null ? Optional.empty() : Optional.of("GCS client could not be constructed");
} catch (IllegalArgumentException e) {
return Optional.of(e.getMessage());
}
}

@Override
public void configureIcebergWarehouse(
StorageUri warehouse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import org.projectnessie.catalog.files.api.ObjectIO;
Expand Down Expand Up @@ -97,6 +98,16 @@ public void trinoSampleConfig(
Map<String, String> icebergConfig,
BiConsumer<String, String> properties) {}

@Override
public Optional<String> canResolve(StorageUri uri) {
Path path = LocalObjectIO.filePath(uri);
// We expect a directory here (or non-existing here), because the URI is meant to store other
// files
return Files.isRegularFile(path)
? Optional.of(path + " does must not be file")
: Optional.empty();
}

private static Path filePath(StorageUri uri) {
return Paths.get(uri.requiredPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ public void deleteObjects(List<StorageUri> uris) {
}
}

@Override
public Optional<String> canResolve(StorageUri uri) {
String scheme = uri.scheme();
if (!isS3scheme(scheme)) {
return Optional.of("Not an S3 URI");
}

try {
S3Client s3client = s3clientSupplier.getClient(uri);
return s3client != null
? Optional.empty()
: Optional.of("S3 client could not be constructed");
} catch (IllegalArgumentException e) {
return Optional.of(e.getMessage());
}
}

private static String withoutLeadingSlash(StorageUri uri) {
String path = uri.requiredPath();
return path.startsWith("/") ? path.substring(1) : path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,8 @@
import org.projectnessie.catalog.model.statistics.NessiePartitionStatisticsFile;
import org.projectnessie.catalog.model.statistics.NessieStatisticsFile;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
import org.projectnessie.model.Namespace;

public class NessieModelIceberg {
private NessieModelIceberg() {}
Expand Down Expand Up @@ -795,14 +793,8 @@ public static long safeUnbox(Long value, long defaultValue) {
*
* <p>Also: we deliberately ignore the TableProperties.WRITE_METADATA_LOCATION property here.
*/
public static String icebergBaseLocation(String warehouseLocation, ContentKey key) {
public static String icebergNewEntityBaseLocation(String baseLocation) {
// FIXME escape or remove forbidden chars, cf. #8524
String baseLocation = warehouseLocation;
Namespace ns = key.getNamespace();
if (!ns.isEmpty()) {
baseLocation = concatLocation(warehouseLocation, ns.toString());
}
baseLocation = concatLocation(baseLocation, key.getName());
return baseLocation + "_" + randomUUID();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import jakarta.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.immutables.value.Value;
Expand Down Expand Up @@ -73,6 +74,17 @@ public boolean hasUpdate(Class<? extends IcebergMetadataUpdate> update) {
@Value.Derived
public <T, U extends IcebergMetadataUpdate> T getSingleUpdateValue(
Class<U> update, Function<U, T> mapper) {
return getSingleUpdate(update, mapper)
.orElseThrow(
() ->
new IllegalArgumentException(
"Could not find any updates of type " + update.getSimpleName()));
}

@JsonIgnore
@Value.Derived
public <T, U extends IcebergMetadataUpdate> Optional<T> getSingleUpdate(
Class<U> update, Function<U, T> mapper) {
return updates().stream()
.filter(update::isInstance)
.map(update::cast)
Expand All @@ -81,11 +93,7 @@ public <T, U extends IcebergMetadataUpdate> T getSingleUpdateValue(
(e1, e2) -> {
throw new IllegalArgumentException(
"Found multiple updates of type " + update.getSimpleName());
})
.orElseThrow(
() ->
new IllegalArgumentException(
"Could not find any updates of type " + update.getSimpleName()));
});
}

@Value.Check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
import jakarta.annotation.Nullable;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.projectnessie.api.v2.params.ParsedReference;
import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot;
import org.projectnessie.catalog.service.config.WarehouseConfig;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.Reference;
import org.projectnessie.services.authz.ApiContext;
import org.projectnessie.storage.uri.StorageUri;
import org.projectnessie.versioned.RequestMeta;

public interface CatalogService {
Expand Down Expand Up @@ -78,4 +82,21 @@ interface CatalogUriResolver {
URI icebergSnapshot(
Reference effectiveReference, ContentKey key, NessieEntitySnapshot<?> snapshot);
}

Optional<String> validateStorageLocation(String location);

StorageUri locationForEntity(
WarehouseConfig warehouse,
ContentKey contentKey,
Content.Type contentType,
ApiContext apiContext,
String refName,
String hash)
throws NessieNotFoundException;

StorageUri locationForEntity(
WarehouseConfig warehouse,
ContentKey contentKey,
List<ContentKey> keysInOrder,
Map<ContentKey, Content> contentsMap);
}
Loading

0 comments on commit caf47ae

Please sign in to comment.