Skip to content

Commit

Permalink
Extensible object types (#7771)
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra authored Dec 1, 2023
1 parent 08de363 commit 33dd10d
Show file tree
Hide file tree
Showing 111 changed files with 5,440 additions and 2,726 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,19 @@ as necessary. Empty sections will not end in the release notes.

### Upgrade notes

* Nessie Quarkus parts are now built against Java 17 and Java 17 is required to run Nessie Quarkus Server directly.
- Nessie Quarkus parts are now built against Java 17 and Java 17 is required to run Nessie Quarkus Server directly.
If you use the Docker image, nothing needs to be done, because the image already contains a compatible Java runtime.
- Due to the introduction of extensible object types in the storage layer, some storage backends
will require a schema upgrade:
- JDBC: the following SQL statement must be executed on the Nessie database (please adapt the
statement to the actual database SQL dialect):
```sql
ALTER TABLE objs ADD COLUMN x_class VARCHAR, ADD COLUMN x_data BYTEA;
```
- Cassandra: the following CQL statement must be executed on the Nessie database and keyspace:
```cql
ALTER TABLE <keyspace>.objs ADD x_class text, ADD x_data blob;
```

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public <T extends Obj> T fetchTypedObj(
try {
Obj r = pendingObj(id);
if (r != null) {
if (r.type() != type) {
if (!r.type().equals(type)) {
throw new ObjNotFoundException(id);
}
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
Expand All @@ -72,6 +73,7 @@
import org.projectnessie.versioned.storage.common.persist.Obj;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.ObjType;
import org.projectnessie.versioned.storage.common.persist.ObjTypes;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.common.persist.Reference;

Expand Down Expand Up @@ -316,7 +318,7 @@ public <T extends Obj> T fetchTypedObj(
@Nonnull @jakarta.annotation.Nonnull ObjId id, ObjType type, Class<T> typeClass)
throws ObjNotFoundException {
Obj obj = fetchObj(id);
if (obj.type() != type) {
if (!obj.type().equals(type)) {
throw new ObjNotFoundException(id);
}
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -385,11 +387,11 @@ public boolean storeObj(
}
}

private static final ByteString[] OBJ_TYPE_VALUES =
Arrays.stream(ObjType.values())
.map(Enum::name)
.map(ByteString::copyFromUtf8)
.toArray(ByteString[]::new);
private static final Map<ObjType, ByteString> OBJ_TYPE_VALUES =
ObjTypes.allObjTypes().stream()
.collect(
ImmutableMap.toImmutableMap(
Function.identity(), (ObjType type) -> ByteString.copyFromUtf8(type.name())));

@NotNull
private ConditionalRowMutation mutationForStoreObj(
Expand All @@ -409,10 +411,7 @@ private ConditionalRowMutation mutationForStoreObj(
Mutation.create()
.setCell(FAMILY_OBJS, QUALIFIER_OBJS, CELL_TIMESTAMP, ref)
.setCell(
FAMILY_OBJS,
QUALIFIER_OBJ_TYPE,
CELL_TIMESTAMP,
OBJ_TYPE_VALUES[obj.type().ordinal()]);
FAMILY_OBJS, QUALIFIER_OBJ_TYPE, CELL_TIMESTAMP, OBJ_TYPE_VALUES.get(obj.type()));
Filter condition =
FILTERS
.chain()
Expand Down Expand Up @@ -441,14 +440,12 @@ public boolean[] storeObjs(@Nonnull @jakarta.annotation.Nonnull Obj[] objs)

@SuppressWarnings("unchecked")
ApiFuture<Boolean>[] futures = new ApiFuture[objs.length];
int idx = 0;
for (int i = 0; i < objs.length; i++) {
Obj obj = objs[i];
if (obj != null) {
ConditionalRowMutation conditionalRowMutation = mutationForStoreObj(obj, false);
futures[idx] = backend.client().checkAndMutateRowAsync(conditionalRowMutation);
futures[i] = backend.client().checkAndMutateRowAsync(conditionalRowMutation);
}
idx++;
}

boolean[] r = new boolean[objs.length];
Expand Down Expand Up @@ -520,7 +517,7 @@ public void upsertObj(@Nonnull @jakarta.annotation.Nonnull Obj obj) throws ObjTo
FAMILY_OBJS,
QUALIFIER_OBJ_TYPE,
CELL_TIMESTAMP,
OBJ_TYPE_VALUES[obj.type().ordinal()]));
OBJ_TYPE_VALUES.get(obj.type())));
} catch (ApiException e) {
throw apiException(e);
}
Expand Down Expand Up @@ -556,7 +553,7 @@ public void upsertObjs(@Nonnull @jakarta.annotation.Nonnull Obj[] objs)
FAMILY_OBJS,
QUALIFIER_OBJ_TYPE,
CELL_TIMESTAMP,
OBJ_TYPE_VALUES[obj.type().ordinal()]));
OBJ_TYPE_VALUES.get(obj.type())));
}
} catch (ApiException e) {
throw apiException(e);
Expand Down Expand Up @@ -596,7 +593,7 @@ private class ScanAllObjectsIterator extends AbstractIterator<Obj>

Filters.InterleaveFilter typeFilter = null;
boolean all = true;
for (ObjType type : ObjType.values()) {
for (ObjType type : ObjTypes.allObjTypes()) {
boolean match = filter.test(type);
if (match) {
if (typeFilter == null) {
Expand All @@ -606,7 +603,7 @@ private class ScanAllObjectsIterator extends AbstractIterator<Obj>
FILTERS
.chain()
.filter(FILTERS.qualifier().exactMatch(QUALIFIER_OBJ_TYPE))
.filter(FILTERS.value().exactMatch(OBJ_TYPE_VALUES[type.ordinal()])));
.filter(FILTERS.value().exactMatch(OBJ_TYPE_VALUES.get(type))));
} else {
all = false;
}
Expand Down Expand Up @@ -658,7 +655,7 @@ protected Obj computeNext() {
}
}

public static ObjId deserializeObjId(ByteString bytes) {
private static ObjId deserializeObjId(ByteString bytes) {
if (bytes == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public <T extends Obj> T fetchTypedObj(
throws ObjNotFoundException {
Obj o = cache.get(id);
if (o != null) {
if (o.type() != type) {
if (!o.type().equals(type)) {
throw new ObjNotFoundException(id);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import static com.datastax.oss.driver.api.core.ConsistencyLevel.LOCAL_QUORUM;
import static com.datastax.oss.driver.api.core.ConsistencyLevel.LOCAL_SERIAL;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.lang.String.format;
import static java.util.Map.entry;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.COLS_OBJS_ALL;
Expand All @@ -41,14 +44,12 @@
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.SELECT_BATCH_SIZE;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.TABLE_OBJS;
import static org.projectnessie.versioned.storage.cassandra.CassandraConstants.TABLE_REFS;
import static org.projectnessie.versioned.storage.cassandra.CqlColumnType.NAME;
import static org.projectnessie.versioned.storage.cassandra.CqlColumnType.OBJ_ID;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
Expand All @@ -58,13 +59,11 @@
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.servererrors.CASWriteUnknownException;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Array;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
Expand All @@ -75,11 +74,11 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.agrona.collections.Hashing;
import org.agrona.collections.Object2IntHashMap;
import org.jetbrains.annotations.NotNull;
import org.projectnessie.versioned.storage.common.persist.Backend;
import org.projectnessie.versioned.storage.common.persist.PersistFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -299,32 +298,41 @@ private R[] resultToArray() {
}
}

private BoundStatement buildStatement(String cql, Object[] values) {
@NotNull
BoundStatement buildStatement(String cql, Object... values) {
PreparedStatement prepared =
statements.computeIfAbsent(cql, c -> session.prepare(format(c, config.keyspace())));
return prepared
.bind(values)
.boundStatementBuilder(values)
.setTimeout(config.dmlTimeout())
.setConsistencyLevel(LOCAL_QUORUM)
.setSerialConsistencyLevel(LOCAL_SERIAL);
.setSerialConsistencyLevel(LOCAL_SERIAL)
.build();
}

boolean executeCas(String cql, Object... values) {
@NotNull
BoundStatementBuilder newBoundStatementBuilder(String cql) {
PreparedStatement prepared =
statements.computeIfAbsent(cql, c -> session.prepare(format(c, config.keyspace())));
return prepared.boundStatementBuilder();
}

boolean executeCas(BoundStatement stmt) {
try {
ResultSet rs = execute(cql, values);
ResultSet rs = execute(stmt);
return rs.wasApplied();
} catch (DriverException e) {
handleDriverException(e);
return false;
}
}

ResultSet execute(String cql, Object... values) {
return session.execute(buildStatement(cql, values));
ResultSet execute(BoundStatement stmt) {
return session.execute(stmt);
}

CompletionStage<AsyncResultSet> executeAsync(String cql, Object... values) {
return session.executeAsync(buildStatement(cql, values));
CompletionStage<AsyncResultSet> executeAsync(BoundStatement stmt) {
return session.executeAsync(stmt);
}

void handleDriverException(DriverException e) {
Expand Down Expand Up @@ -383,55 +391,46 @@ public void setupSchema() {
COL_REFS_CREATED_AT,
COL_REFS_EXTENDED_INFO,
COL_REFS_PREVIOUS)
.collect(Collectors.toSet()),
ImmutableMap.of(COL_REPO_ID, NAME.type(), COL_REFS_NAME, NAME.type()));
.collect(toImmutableSet()),
List.of(COL_REPO_ID, COL_REFS_NAME));
createTableIfNotExists(
keyspace.get(),
TABLE_OBJS,
CREATE_TABLE_OBJS,
Stream.concat(
Stream.of(COL_REPO_ID), Arrays.stream(COLS_OBJS_ALL.split(",")).map(String::trim))
.collect(Collectors.toSet()),
ImmutableMap.of(COL_REPO_ID, NAME.type(), COL_OBJ_ID, OBJ_ID.type()));
Stream.concat(Stream.of(COL_REPO_ID), COLS_OBJS_ALL.stream()).collect(toImmutableSet()),
List.of(COL_REPO_ID, COL_OBJ_ID));
}

private void createTableIfNotExists(
KeyspaceMetadata meta,
String tableName,
String createTable,
Set<String> expectedColumns,
Map<String, String> expectedPrimaryKey) {
Set<CqlColumn> expectedColumns,
List<CqlColumn> expectedPrimaryKey) {

Optional<TableMetadata> table = meta.getTable(tableName);

Object[] types =
Arrays.stream(CqlColumnType.values()).map(CqlColumnType::type).toArray(Object[]::new);
createTable = format(createTable, meta.getName());
createTable = MessageFormat.format(createTable, types);

if (table.isPresent()) {
Set<String> columns =
table.get().getColumns().values().stream()
.map(ColumnMetadata::getName)
.map(CqlIdentifier::asInternal)
.collect(Collectors.toSet());
Map<String, String> primaryKey =
table.get().getPartitionKey().stream()
.collect(
Collectors.toMap(c -> c.getName().asInternal(), c -> c.getType().toString()));

checkState(
primaryKey.equals(expectedPrimaryKey),
checkPrimaryKey(table.get(), expectedPrimaryKey),
"Expected primary key columns %s do not match existing primary key columns %s for table '%s'. DDL template:\n%s",
expectedPrimaryKey,
primaryKey,
expectedPrimaryKey.stream()
.map(col -> entry(col.name(), col.type().dataType()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue)),
table.get().getPartitionKey().stream()
.map(col -> entry(col.getName(), col.getType()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue)),
tableName,
createTable);

checkState(
columns.containsAll(expectedColumns),
checkColumns(table.get(), expectedColumns),
"Expected columns %s do not contain all columns %s for table '%s'. DDL template:\n%s",
expectedColumns,
columns,
table.get().getColumns().keySet(),
tableName,
createTable);

Expand All @@ -444,6 +443,31 @@ private void createTableIfNotExists(
session.execute(stmt);
}

private boolean checkPrimaryKey(TableMetadata table, List<CqlColumn> expectedPrimaryKey) {
List<ColumnMetadata> partitionKey = table.getPartitionKey();
if (partitionKey.size() == expectedPrimaryKey.size()) {
for (int i = 0; i < partitionKey.size(); i++) {
ColumnMetadata column = partitionKey.get(i);
CqlColumn expectedColumn = expectedPrimaryKey.get(i);
if (!column.getName().asInternal().equals(expectedColumn.name())
|| !column.getType().equals(expectedColumn.type().dataType())) {
return false;
}
}
return true;
}
return false;
}

private boolean checkColumns(TableMetadata table, Set<CqlColumn> expectedColumns) {
for (CqlColumn expectedColumn : expectedColumns) {
if (table.getColumn(expectedColumn.name()).isEmpty()) {
return false;
}
}
return true;
}

@Override
public String configInfo() {
return "keyspace: "
Expand All @@ -464,16 +488,16 @@ public void eraseRepositories(Set<String> repositoryIds) {

try (LimitedConcurrentRequests requests =
new LimitedConcurrentRequests(MAX_CONCURRENT_DELETES)) {
for (Row row : execute(ERASE_REFS_SCAN, repoIdList)) {
for (Row row : execute(buildStatement(ERASE_REFS_SCAN, repoIdList))) {
String repoId = row.getString(0);
String ref = row.getString(1);
requests.submitted(executeAsync(ERASE_REF, repoId, ref));
requests.submitted(executeAsync(buildStatement(ERASE_REF, repoId, ref)));
}

for (Row row : execute(ERASE_OBJS_SCAN, repoIdList)) {
for (Row row : execute(buildStatement(ERASE_OBJS_SCAN, repoIdList))) {
String repoId = row.getString(0);
String objId = row.getString(1);
requests.submitted(executeAsync(ERASE_OBJ, repoId, objId));
requests.submitted(executeAsync(buildStatement(ERASE_OBJ, repoId, objId)));
}
}
// We must ensure that the system clock advances a little, so that C*'s next write-timestamp
Expand Down
Loading

0 comments on commit 33dd10d

Please sign in to comment.