Skip to content

Commit

Permalink
Remove the need to pass Put.expectedContent() (#6438)
Browse files Browse the repository at this point in the history
  • Loading branch information
snazy authored Mar 30, 2023
1 parent 731dfc7 commit 14891bc
Show file tree
Hide file tree
Showing 30 changed files with 94 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public UpdateNamespaceResult updateWithResponse()
.branchName(refName)
.hash(expectedHash)
.commitMeta(CommitMeta.fromMessage("update namespace " + key))
.operation(Put.of(key, updatedNamespace, oldNamespace))
.operation(Put.of(key, updatedNamespace))
.commitWithResponse();

return UpdateNamespaceResult.of(updatedNamespace, oldNamespace, commit.getTargetBranch());
Expand Down
38 changes: 35 additions & 3 deletions api/model/src/main/java/org/projectnessie/model/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.annotation.Nullable;
Expand All @@ -26,10 +27,15 @@
import org.eclipse.microprofile.openapi.annotations.media.DiscriminatorMapping;
import org.eclipse.microprofile.openapi.annotations.media.Schema;
import org.immutables.value.Value;
import org.projectnessie.model.ser.Views;

@Schema(
type = SchemaType.OBJECT,
title = "Operation",
description =
"Describes an operation to be performed against one content object.\n"
+ "\n"
+ "The Nessie backend will validate the correctness of the operations.",
oneOf = {Operation.Put.class, Operation.Unchanged.class, Operation.Delete.class},
discriminatorMapping = {
@DiscriminatorMapping(value = "PUT", schema = Operation.Put.class),
Expand All @@ -53,9 +59,22 @@ public interface Operation {
type = SchemaType.OBJECT,
title = "Put-'Content'-operation for a 'ContentKey'.",
description =
"Add or replace (put) a 'Content' object for a 'ContentKey'. "
+ "If the actual table type tracks the 'global state' of individual tables (Iceberg "
+ "as of today), every 'Put'-operation must contain a non-null value for 'expectedContent'.")
"Used to add new content or to update existing content.\n"
+ "\n"
+ "A new content object is created by populating the `value` field, the "
+ "content-id in the content object must not be present (null).\n"
+ "\n"
+ "A content object is updated by populating the `value` containing the correct "
+ "content-id.\n"
+ "\n"
+ "If the key for a content shall change (aka a rename), then use a `Delete` "
+ "operation using the current (old) key and a `Put` operation using the new key "
+ "with the `value` having the correct content-id. Both operations must happen "
+ "in the same commit.\n"
+ "\n"
+ "A content object can be replaced (think: `DROP TABLE xyz` + `CREATE TABLE xyz`) "
+ "with a `Delete` operation and a `Put` operation for a content using a `value`"
+ "representing a new content object, so without a content-id, in the same commit.")
@Value.Immutable
@JsonSerialize(as = ImmutablePut.class)
@JsonDeserialize(as = ImmutablePut.class)
Expand All @@ -67,12 +86,16 @@ interface Put extends Operation {

@Nullable
@jakarta.annotation.Nullable
@Deprecated
@SuppressWarnings("DeprecatedIsStillUsed")
@JsonView(Views.V1.class)
Content getExpectedContent();

static Put of(ContentKey key, Content content) {
return ImmutablePut.builder().key(key).content(content).build();
}

@Deprecated // for removal
static Put of(ContentKey key, Content content, Content expectedContent) {
return ImmutablePut.builder()
.key(key)
Expand All @@ -82,6 +105,15 @@ static Put of(ContentKey key, Content content, Content expectedContent) {
}
}

@Schema(
type = SchemaType.OBJECT,
title = "Delete-'Content'-operation for a 'ContentKey'.",
description =
"Used to delete an existing content key.\n"
+ "\n"
+ "If the key for a content shall change (aka a rename), then use a `Delete` "
+ "operation using the current (old) key and a `Put` operation using the new key "
+ "with the current `Content` in the the `value` field. See `Put` operation.")
@Value.Immutable
@JsonSerialize(as = ImmutableDelete.class)
@JsonDeserialize(as = ImmutableDelete.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public Stream<FileReference> extractFiles(ContentReference contentReference) {
contentReference.snapshotId(),
"Iceberg content is expected to have a non-null snapshot-ID");

// This is to respect Nessie's global state
Snapshot snapshot =
snapshotId < 0L ? tableMetadata.currentSnapshot() : tableMetadata.snapshot(snapshotId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,8 @@ class NessieLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
val currentTable = getTable(path.getParent, targetRef)
val table =
updateDeltaTable(currentTable, path, targetRef, lastCheckpoint)
val put = currentTable
.map(
Put.of(
DeltaContentKeyUtil.fromHadoopPath(path.getParent),
table,
_
)
)
.getOrElse(
Put.of(DeltaContentKeyUtil.fromHadoopPath(path.getParent), table)
)
val put =
Put.of(DeltaContentKeyUtil.fromHadoopPath(path.getParent), table)
val meta = CommitMeta
.builder()
.message(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void commit(
builder.message(buildCommitMsg(base, metadata) + " " + key.getName());
Branch branch =
api.commitMultipleOperations()
.operation(Operation.Put.of(key, newView, icebergView))
.operation(Operation.Put.of(key, newView))
.commitMeta(NessieUtil.catalogOptions(builder, catalogOptions).build())
.branch(reference.getAsBranch())
.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class CommitToBranchSimulation extends Simulation {
.commitMeta(
CommitMeta.fromMessage(s"test-commit $userId $commitNum")
)
.operation(Put.of(key, table, expectedTable.orNull))
.operation(Put.of(key, table))
.commit()

session.set("branch", updatedBranch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class KeyListSpillingSimulation extends Simulation {
val key = ContentKey.of(s"$userId $commitIndex $i" + padding)
val table = IcebergTable
.of(s"metadata_${userId}_${commitNum}_$i", 42, 43, 44, 45)
Put.of(key, table, null)
Put.of(key, table)
})

// Call the Nessie client operation to perform a commit with one or more put ops
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ class MixedWorkloadsSimulation extends Simulation {
.commitMeta(fromMessage(s"Update table ${tableAndLock._1}"))
.operation(
Operation.Put
.of(tableAndLock._1, updatedTable, currentTable)
.of(tableAndLock._1, updatedTable)
)
.commit()
session.set("updated", true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public void updateProperties(

Namespace updatedNamespace = ImmutableNamespace.copyOf(namespace).withProperties(properties);

Put put = Put.of(updatedNamespace.toContentKey(), updatedNamespace, namespace);
Put put = Put.of(updatedNamespace.toContentKey(), updatedNamespace);
commit(
BranchName.of(refWithHash.getValue().getName()),
"update properties for namespace " + updatedNamespace.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,9 +991,7 @@ protected static org.projectnessie.versioned.Operation toOp(Operation o) {
return Delete.of(key);
} else if (o instanceof Operation.Put) {
Operation.Put put = (Operation.Put) o;
return put.getExpectedContent() != null
? Put.of(key, put.getContent(), put.getExpectedContent())
: Put.of(key, put.getContent());
return Put.of(key, put.getContent());
} else if (o instanceof Operation.Unchanged) {
return Unchanged.of(key);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ public void commitResponse() throws BaseNessieClientServerException {
commit(
response.getTargetBranch(),
fromMessage("test"),
Put.of(
key1, IcebergTable.of("loc", 1, 2, 3, 4, contentIds.get(key1)), contents.get(key1)),
Put.of(
key2,
IcebergTable.of("blah", 1, 2, 3, 4, contentIds.get(key2)),
contents.get(key2)));
Put.of(key1, IcebergTable.of("loc", 1, 2, 3, 4, contentIds.get(key1))),
Put.of(key2, IcebergTable.of("blah", 1, 2, 3, 4, contentIds.get(key2))));
soft.assertThat(response.getAddedContents()).isNull();
}

Expand Down Expand Up @@ -397,9 +393,7 @@ public void commitLogPaging() throws BaseNessieClientServerException {
Put op;
try {
Content existing = contentApi().getContent(key, branch.getName(), currentHash).getContent();
op =
Put.of(
key, IcebergTable.of("some-file-" + i, 42, 42, 42, 42, existing.getId()), existing);
op = Put.of(key, IcebergTable.of("some-file-" + i, 42, 42, 42, 42, existing.getId()));
} catch (NessieNotFoundException notFound) {
op = Put.of(key, IcebergTable.of("some-file-" + i, 42, 42, 42, 42));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void renameTable() throws Exception {
commit.getTargetBranch(),
fromMessage("setup"),
Delete.of(oldName),
Put.of(newName, table, table)))
Put.of(newName, table)))
.doesNotThrowAnyException();

soft.assertThat(contents(main.getName(), null, oldName, newName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void mergeTransplant(
source.getName(),
committed1.getHash(),
fromMessage("test-branch2"),
Put.of(key1, table1, table1))
Put.of(key1, table1))
.getTargetBranch();
soft.assertThat(committed2.getHash()).isNotNull();

Expand Down Expand Up @@ -546,8 +546,7 @@ public void mergeWithNamespaces(ReferenceMode refMode) throws BaseNessieClientSe
contentApi().getContent(key1, committed1.getName(), committed1.getHash()).getContent();

Branch committed2 =
commit(committed1, fromMessage("test-branch2"), Put.of(key1, table1, table1))
.getTargetBranch();
commit(committed1, fromMessage("test-branch2"), Put.of(key1, table1)).getTargetBranch();
soft.assertThat(committed2.getHash()).isNotNull();

commit(base, fromMessage("test-main"), Put.of(key2, table2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,7 @@ protected String createCommits(
try {
Content existing =
contentApi().getContent(key, branch.getName(), currentHash).getContent();
op =
Put.of(
key,
IcebergTable.of("some-file-" + i, 42, 42, 42, 42, existing.getId()),
existing);
op = Put.of(key, IcebergTable.of("some-file-" + i, 42, 42, 42, 42, existing.getId()));
} catch (NessieContentNotFoundException notFound) {
op = Put.of(key, IcebergTable.of("some-file-" + i, 42, 42, 42, 42));
}
Expand Down Expand Up @@ -554,12 +550,7 @@ protected static List<Operation> operationsWithoutContentId(List<Operation> oper
protected static Operation operationWithoutContentId(Operation op) {
if (op instanceof Put) {
Put put = (Put) op;
return put.getExpectedContent() != null
? Put.of(
put.getKey(),
contentWithoutId(put.getContent()),
contentWithoutId(put.getExpectedContent()))
: Put.of(put.getKey(), contentWithoutId(put.getContent()));
return Put.of(put.getKey(), contentWithoutId(put.getContent()));
}
return op;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.DeltaLakeTable;
import org.projectnessie.model.GetMultipleContentsResponse;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.IcebergView;
import org.projectnessie.model.ImmutableDeltaLakeTable;
import org.projectnessie.model.ImmutableIcebergTable;
import org.projectnessie.model.ImmutableIcebergView;
Expand Down Expand Up @@ -258,13 +255,7 @@ public void execute() throws BaseNessieClientServerException {
existingContent,
random,
existingContent != null ? existingContent.getId() : null);
if (existingContent instanceof IcebergTable
|| existingContent instanceof IcebergView
|| existingContent instanceof DeltaLakeTable) {
commit.operation(Put.of(key, newContents, existingContent));
} else {
commit.operation(Put.of(key, newContents));
}
commit.operation(Put.of(key, newContents));
}
try {
Branch newHead = commit.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private void commitSameContent(

for (Map.Entry<ContentKey, Content> entry : contentMap.entrySet()) {
Content content = entry.getValue();
request.operation(Operation.Put.of(entry.getKey(), content, content));
request.operation(Operation.Put.of(entry.getKey(), content));
}

Branch head = request.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ private void doCommit(
key,
// Must add randomness here, otherwise concurrent threads will compute the same
// hashes, because parent, "content", key are all the same.
onRef("commit value " + ThreadLocalRandom.current().nextLong(), contentId),
onRef("foo", contentId)));
onRef("commit value " + ThreadLocalRandom.current().nextLong(), contentId)));
}

bp.versionStore.commit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,10 @@ public Hash commit(
if (operation instanceof Put) {
Put op = (Put) operation;
Content content = op.getValue();
Content expected = op.getExpectedValue();

if (content.getId() == null) {
// No content-ID --> New content

checkArgument(
expected == null,
"Expected content must not be set when creating new content. "
+ "The put operation's content has no content ID and is considered as new. "
+ "Key: '%s'",
op.getKey());

// assign content-ID
String cid = UUID.randomUUID().toString();
content = STORE_WORKER.applyId(content, cid);
Expand All @@ -155,21 +147,6 @@ public Hash commit(
contentId,
payloadForContent(content),
STORE_WORKER.toStoreOnReferenceState(content)));

if (expected != null) {
String expectedId = expected.getId();
checkArgument(
expectedId != null,
"Content id for expected content must not be null, key '%s'",
op.getKey());
ContentId expectedContentId = ContentId.of(expectedId);
checkArgument(
contentId.equals(expectedContentId),
"Content ids for new ('%s') and expected ('%s') content differ for key '%s'",
contentId,
expectedContentId,
op.getKey());
}
} else if (operation instanceof Delete) {
commitAttempt.addDeletes(operation.getKey());
} else if (operation instanceof Unchanged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ static Stream<RenameTable> commitRenameTableParams() {
.boxed()
.flatMap(i -> Stream.of(new RenameTable(i, i, i, i), new RenameTable(0, 0, 0, 0)));

// duplicate all params to use and not use global state
return Stream.concat(zero, intervals);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public interface Delete extends Operation {
/**
* Creates a delete operation for the given key.
*
* <p>If the key for a content shall change (aka a rename), then use a {@link Delete} operation
* using the current (old) key and a {@link Put} operation using the new key providing the {@code
* value} with the correct content ID.
*
* @param key the key impacted by the operation
* @return a delete operation for the key
*/
Expand Down
Loading

0 comments on commit 14891bc

Please sign in to comment.