Skip to content

Commit

Permalink
Implement RESTful AppendFiles Operation
Browse files Browse the repository at this point in the history
  • Loading branch information
geruh committed Dec 13, 2023
1 parent f21199d commit 8e9f963
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 12 deletions.
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.io.Serializable;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -452,4 +453,21 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.setCurrentVersionId(versionId);
}
}

class AppendFilesUpdate implements MetadataUpdate {
private final List<String> addedManifests;

public AppendFilesUpdate(List<String> addedManifests) {
this.addedManifests = addedManifests;
}

public List<String> getAddedManifests() {
return addedManifests;
}

@Override
public void applyTo(TableMetadata.Builder tableMetadataBuilder) {
tableMetadataBuilder.appendFiles(addedManifests);
}
}
}
25 changes: 25 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -57,6 +60,7 @@ private MetadataUpdateParser() {}
static final String REMOVE_STATISTICS = "remove-statistics";
static final String ADD_VIEW_VERSION = "add-view-version";
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
static final String APPEND_FILES = "append-files";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -121,6 +125,9 @@ private MetadataUpdateParser() {}
// SetCurrentViewVersion
private static final String VIEW_VERSION_ID = "view-version-id";

// Data operations
private static final String APPENDED_MANIFESTS = "appended-manifests";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -142,6 +149,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.AppendFilesUpdate.class, APPEND_FILES)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -226,6 +234,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
writeSetCurrentViewVersionId(
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
break;
case APPEND_FILES:
writeAppendFiles((MetadataUpdate.AppendFilesUpdate) metadataUpdate, generator);
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -293,6 +304,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readAddViewVersion(jsonNode);
case SET_CURRENT_VIEW_VERSION:
return readCurrentViewVersionId(jsonNode);
case APPEND_FILES:
return readAppendFiles(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -327,6 +340,13 @@ private static void writeAddPartitionSpec(
PartitionSpecParser.toJson(update.spec(), gen);
}

private static void writeAppendFiles(MetadataUpdate.AppendFilesUpdate update, JsonGenerator gen)
throws IOException {
if (update.getAddedManifests() != null) {
JsonUtil.writeStringArray(APPENDED_MANIFESTS, update.getAddedManifests(), gen);
}
}

private static void writeSetDefaultPartitionSpec(
MetadataUpdate.SetDefaultPartitionSpec update, JsonGenerator gen) throws IOException {
gen.writeNumberField(SPEC_ID, update.specId());
Expand Down Expand Up @@ -422,6 +442,11 @@ private static MetadataUpdate readAssignUUID(JsonNode node) {
return new MetadataUpdate.AssignUUID(uuid);
}

private static MetadataUpdate readAppendFiles(JsonNode node) {
List<String> metadataLocations = JsonUtil.getStringList(APPENDED_MANIFESTS, node);
return new MetadataUpdate.AppendFilesUpdate(metadataLocations);
}

private static MetadataUpdate readUpgradeFormatVersion(JsonNode node) {
int formatVersion = JsonUtil.getInt(FORMAT_VERSION, node);
return new MetadataUpdate.UpgradeFormatVersion(formatVersion);
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1743,5 +1743,10 @@ private boolean isAddedSnapshot(long snapshotId) {
private <U extends MetadataUpdate> Stream<U> changes(Class<U> updateClass) {
return changes.stream().filter(updateClass::isInstance).map(updateClass::cast);
}

public Builder appendFiles(List<String> files) {
changes.add(new MetadataUpdate.AppendFilesUpdate(files));
return this;
}
}
}
39 changes: 27 additions & 12 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.EnvironmentContext;
Expand Down Expand Up @@ -112,6 +111,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private static final Logger LOG = LoggerFactory.getLogger(RESTSessionCatalog.class);
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";
private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled";
private static final String REST_DATA_COMMIT_ENABLED = "rest-data-commit-enabled";
private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode";
private static final List<String> TOKEN_PREFERENCE_ORDER =
ImmutableList.of(
Expand All @@ -135,6 +135,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private FileIO io = null;
private MetricsReporter reporter = null;
private boolean reportingViaRestEnabled;
private boolean dataCommitViaRestEnabled;
private CloseableGroup closeables = null;

// a lazy thread pool for token refresh
Expand Down Expand Up @@ -219,7 +220,8 @@ public void initialize(String name, Map<String, String> unresolved) {
AuthSession.fromAccessToken(
client, tokenRefreshExecutor(), token, expiresAtMillis(mergedProps), catalogAuth);
}

this.dataCommitViaRestEnabled =
PropertyUtil.propertyAsBoolean(mergedProps, REST_DATA_COMMIT_ENABLED, false);
this.io = newFileIO(SessionContext.createEmpty(), mergedProps);

this.fileIOCloser = newFileIOCloser();
Expand Down Expand Up @@ -391,12 +393,15 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
tableMetadata);

trackFileIO(ops);

BaseTable table =
new BaseTable(
Table table =
new RESTTable(
ops,
fullTableName(finalIdentifier),
metricsReporter(paths.metrics(finalIdentifier), session::headers));
metricsReporter(paths.metrics(finalIdentifier), session::headers),
this.client,
paths.table(finalIdentifier),
session::headers,
dataCommitViaRestEnabled);
if (metadataType != null) {
return MetadataTableUtils.createMetadataTableInstance(table, metadataType);
}
Expand Down Expand Up @@ -464,9 +469,14 @@ public Table registerTable(
response.tableMetadata());

trackFileIO(ops);

return new BaseTable(
ops, fullTableName(ident), metricsReporter(paths.metrics(ident), session::headers));
return new RESTTable(
ops,
fullTableName(ident),
metricsReporter(paths.metrics(ident), session::headers),
client,
paths.table(ident),
session::headers,
dataCommitViaRestEnabled);
}

@Override
Expand Down Expand Up @@ -683,9 +693,14 @@ public Table create() {
response.tableMetadata());

trackFileIO(ops);

return new BaseTable(
ops, fullTableName(ident), metricsReporter(paths.metrics(ident), session::headers));
return new RESTTable(
ops,
fullTableName(ident),
metricsReporter(paths.metrics(ident), session::headers),
client,
paths.table(ident),
session::headers,
dataCommitViaRestEnabled);
}

@Override
Expand Down
59 changes: 59 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/
package org.apache.iceberg.rest;

import java.util.Map;
import java.util.function.Supplier;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.rest.operations.RestAppendFiles;

public class RESTTable extends BaseTable {
private final RESTClient client;
private final String path;
private final Supplier<Map<String, String>> headers;
private final boolean dataCommitViaRestEnabled;

public RESTTable(
TableOperations ops,
String name,
MetricsReporter reporter,
RESTClient client,
String path,
Supplier<Map<String, String>> headers,
boolean dataCommitViaRestEnabled) {
super(ops, name, reporter);
this.client = client;
this.headers = headers;
this.path = path;
this.dataCommitViaRestEnabled = dataCommitViaRestEnabled;
}

@Override
public AppendFiles newAppend() {
if (dataCommitViaRestEnabled) {
return new RestAppendFiles(client, path, headers, operations());
}
return super.newAppend();
}
}
Loading

0 comments on commit 8e9f963

Please sign in to comment.