Skip to content

Commit

Permalink
Ability to export repositories in V1 format (#7596)
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra authored Oct 6, 2023
1 parent 45379d2 commit c72b482
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ as necessary. Empty sections will not end in the release notes.

- Spark SQL extensions now support the `DROP ... IF EXISTS` syntax for branches and tags.
- `table-prefix` configuration option added to DynamoDB version store.
- Ability to export repositories in V1 format. This is useful for migrating repositories to older
Nessie servers that do not support the new storage model.

### Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ExportRepository extends BaseCommand {
static final String SINGLE_BRANCH = "--single-branch-current-content";
static final String CONTENT_BATCH_SIZE = "--content-batch-size";
static final String COMMIT_BATCH_SIZE = "--commit-batch-size";
static final String EXPORT_VERSION = "--export-version";

enum Format {
ZIP,
Expand Down Expand Up @@ -127,6 +128,13 @@ enum Format {
+ ".")
private Integer commitBatchSize;

@CommandLine.Option(
names = EXPORT_VERSION,
defaultValue = "" + ExportImportConstants.DEFAULT_EXPORT_VERSION,
description =
"The export version, defaults to " + ExportImportConstants.DEFAULT_EXPORT_VERSION + ".")
private int exportVersion;

@Override
protected Integer callWithDatabaseAdapter() throws Exception {
return export(
Expand Down Expand Up @@ -162,7 +170,8 @@ Integer export(Consumer<NessieExporter.Builder> builderConsumer) throws Exceptio
NessieExporter.builder()
.exportFileSupplier(exportFileSupplier)
.fullScan(fullScan)
.contentsFromBranch(contentsFromBranch);
.contentsFromBranch(contentsFromBranch)
.exportVersion(exportVersion);
builderConsumer.accept(builder);
if (maxFileSize != null) {
builder.maxFileSize(maxFileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public final class ExportImportConstants {
public static final int DEFAULT_EXPECTED_COMMIT_COUNT = 1_000_000;
public static final int DEFAULT_COMMIT_BATCH_SIZE = 20;
public static final int DEFAULT_ATTACHMENT_BATCH_SIZE = 20;
public static final int DEFAULT_EXPORT_VERSION = 2;

private ExportImportConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.Content;
import org.projectnessie.nessie.relocated.protobuf.ByteString;
import org.projectnessie.versioned.BranchName;
import org.projectnessie.versioned.NamedRef;
import org.projectnessie.versioned.TagName;
import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException;
import org.projectnessie.versioned.storage.common.logic.CommitLogic;
import org.projectnessie.versioned.storage.common.logic.HeadsAndForkPoints;
Expand All @@ -55,23 +59,32 @@
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.common.persist.Reference;
import org.projectnessie.versioned.storage.versionstore.RefMapping;
import org.projectnessie.versioned.storage.versionstore.TypeMapping;
import org.projectnessie.versioned.transfer.files.ExportFileSupplier;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.Commit;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.ExportVersion;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.HeadsAndForks;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.NamedReference;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.Operation;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.OperationType;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.Ref;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.RefType;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.RepositoryDescriptionProto;

final class ExportPersist extends ExportCommon {
ExportPersist(ExportFileSupplier exportFiles, NessieExporter exporter) {

private final ExportVersion exportVersion;

ExportPersist(
ExportFileSupplier exportFiles, NessieExporter exporter, ExportVersion exportVersion) {
super(exportFiles, exporter);
this.exportVersion = exportVersion;
}

@Override
ExportVersion getExportVersion() {
return ExportVersion.V2;
return exportVersion;
}

@Override
Expand Down Expand Up @@ -104,8 +117,9 @@ private HeadsAndForkPoints scanAllReferences(Consumer<CommitObj> commitHandler)

ReferenceLogic referenceLogic = referenceLogic(persist);
CommitLogic commitLogic = commitLogic(persist);
String referencePrefix = exportVersion == ExportVersion.V2 ? null : RefMapping.REFS;
referenceLogic
.queryReferences(referencesQuery())
.queryReferences(referencesQuery(referencePrefix))
.forEachRemaining(
ref -> {
Deque<ObjId> commitsToProcess = new ArrayDeque<>();
Expand Down Expand Up @@ -141,20 +155,43 @@ private HeadsAndForkPoints scanDatabase(Consumer<CommitObj> commitHandler) {
@Override
void exportReferences(ExportContext exportContext) {
ReferenceLogic referenceLogic = referenceLogic(persist());
for (PagedResult<Reference, String> refs = referenceLogic.queryReferences(referencesQuery());
String referencePrefix = exportVersion == ExportVersion.V2 ? null : RefMapping.REFS;
for (PagedResult<Reference, String> refs =
referenceLogic.queryReferences(referencesQuery(referencePrefix));
refs.hasNext(); ) {
Reference reference = refs.next();
ObjId extendedInfoObj = reference.extendedInfoObj();
Ref.Builder refBuilder =
Ref.newBuilder().setName(reference.name()).setPointer(reference.pointer().asBytes());
if (extendedInfoObj != null) {
refBuilder.setExtendedInfoObj(extendedInfoObj.asBytes());
if (exportVersion == ExportVersion.V1) {
NamedRef namedRef = RefMapping.referenceToNamedRef(reference);
NamedReference.Builder namedReference =
NamedReference.newBuilder()
.setRefType(refType(namedRef))
.setName(namedRef.getName())
.setCommitId(reference.pointer().asBytes());
// reference.extendedInfoObj() cannot be exported in V1
exportContext.writeNamedReference(namedReference.build());
} else {
ObjId extendedInfoObj = reference.extendedInfoObj();
Ref.Builder refBuilder =
Ref.newBuilder().setName(reference.name()).setPointer(reference.pointer().asBytes());
if (extendedInfoObj != null) {
refBuilder.setExtendedInfoObj(extendedInfoObj.asBytes());
}
exportContext.writeRef(refBuilder.build());
}
exportContext.writeRef(refBuilder.build());
exporter.progressListener().progress(ProgressEvent.NAMED_REFERENCE_WRITTEN);
}
}

private RefType refType(NamedRef namedRef) {
if (namedRef instanceof TagName) {
return RefType.Tag;
}
if (namedRef instanceof BranchName) {
return RefType.Branch;
}
throw new IllegalArgumentException("Unknown named reference type " + namedRef);
}

@Override
void writeRepositoryDescription() throws IOException {
RepositoryDescription repositoryDescription =
Expand Down Expand Up @@ -219,6 +256,17 @@ private Commit mapCommitObj(CommitObj c, IndexesLogic indexesLogic, Map<ObjId, O
.setMessage(c.message())
.setCommitSequence(c.seq())
.setCreatedTimeMicros(c.created());

if (exportVersion == ExportVersion.V1) {
try {
CommitMeta commitMeta = TypeMapping.toCommitMeta(c);
byte[] commitMetaBytes = exporter.objectMapper().writeValueAsBytes(commitMeta);
b.setMetadata(ByteString.copyFrom(commitMetaBytes));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

c.headers()
.keySet()
.forEach(h -> b.addHeadersBuilder().setName(h).addAllValues(c.headers().getAll(h)));
Expand All @@ -230,7 +278,11 @@ private Commit mapCommitObj(CommitObj c, IndexesLogic indexesLogic, Map<ObjId, O
CommitOp content = op.content();
Operation.Builder opBuilder = b.addOperationsBuilder().setPayload(content.payload());

opBuilder.addContentKey(op.key().rawString());
if (exportVersion == ExportVersion.V1) {
opBuilder.addAllContentKey(TypeMapping.storeKeyToKey(op.key()).getElements());
} else {
opBuilder.addContentKey(op.key().rawString());
}

ObjId valueId = content.value();
if (valueId != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.projectnessie.versioned.store.DefaultStoreWorker;
import org.projectnessie.versioned.transfer.files.ExportFileSupplier;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.ExportMeta;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.ExportVersion;

@Value.Immutable
public abstract class NessieExporter {
Expand Down Expand Up @@ -91,6 +92,8 @@ public interface Builder {
*/
Builder commitBatchSize(int commitBatchSize);

Builder exportVersion(int exportVersion);

NessieExporter build();
}

Expand Down Expand Up @@ -178,6 +181,11 @@ int commitBatchSize() {
return ExportImportConstants.DEFAULT_COMMIT_BATCH_SIZE;
}

@Value.Default
int exportVersion() {
return ExportImportConstants.DEFAULT_EXPORT_VERSION;
}

abstract ExportFileSupplier exportFileSupplier();

@Value.Default
Expand All @@ -198,6 +206,7 @@ public ExportMeta exportNessieRepository() throws IOException {
return new ExportDatabaseAdapter(exportFiles, this).exportRepo();
}

return new ExportPersist(exportFiles, this).exportRepo();
return new ExportPersist(exportFiles, this, ExportVersion.forNumber(exportVersion()))
.exportRepo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public ImportResult importNessieRepository() throws IOException {
default:
throw new IllegalStateException(
String.format(
"This Nessie-version version does not support importing a %s (%d) export",
"This Nessie version does not support importing a %s (%d) export",
exportMeta.getVersion().name(), exportMeta.getVersionValue()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
@NessieDbAdapterName(InmemoryDatabaseAdapterFactory.NAME)
@NessieExternalDatabase(InmemoryTestConnectionProviderSource.class)
@NessieBackendName(InmemoryBackendFactory.NAME)
public class TestMigrationToPersist extends BaseExportImport {
public class TestMigrationFromDatabaseAdapterToPersist extends BaseExportImport {
@NessieDbAdapter protected static DatabaseAdapter databaseAdapter;
@NessieDbAdapter protected static VersionStore daVersionStore;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (C) 2022 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.versioned.transfer;

import com.google.errorprone.annotations.MustBeClosed;
import java.io.IOException;
import java.util.Optional;
import java.util.stream.Stream;
import org.junit.jupiter.api.extension.ExtendWith;
import org.projectnessie.versioned.BranchName;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.ReferenceConflictException;
import org.projectnessie.versioned.ReferenceNotFoundException;
import org.projectnessie.versioned.VersionStore;
import org.projectnessie.versioned.persist.adapter.CommitLogEntry;
import org.projectnessie.versioned.persist.adapter.DatabaseAdapter;
import org.projectnessie.versioned.persist.inmem.InmemoryDatabaseAdapterFactory;
import org.projectnessie.versioned.persist.inmem.InmemoryTestConnectionProviderSource;
import org.projectnessie.versioned.persist.tests.extension.DatabaseAdapterExtension;
import org.projectnessie.versioned.persist.tests.extension.NessieDbAdapter;
import org.projectnessie.versioned.persist.tests.extension.NessieDbAdapterName;
import org.projectnessie.versioned.persist.tests.extension.NessieExternalDatabase;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.inmemory.InmemoryBackendFactory;
import org.projectnessie.versioned.storage.testextension.NessieBackendName;
import org.projectnessie.versioned.storage.testextension.NessiePersist;
import org.projectnessie.versioned.storage.testextension.PersistExtension;
import org.projectnessie.versioned.storage.versionstore.VersionStoreImpl;
import org.projectnessie.versioned.transfer.files.FileExporter;
import org.projectnessie.versioned.transfer.files.FileImporter;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.ExportMeta;
import org.projectnessie.versioned.transfer.serialize.TransferTypes.ExportVersion;

@ExtendWith({PersistExtension.class, DatabaseAdapterExtension.class})
@NessieDbAdapterName(InmemoryDatabaseAdapterFactory.NAME)
@NessieExternalDatabase(InmemoryTestConnectionProviderSource.class)
@NessieBackendName(InmemoryBackendFactory.NAME)
public class TestMigrationFromPersistToDatabaseAdapter extends BaseExportImport {

@NessieDbAdapter static DatabaseAdapter databaseAdapter;
@NessieDbAdapter static VersionStore daVersionStore;

@NessiePersist static Persist persist;

@Override
ExportVersion exportVersion() {
return ExportVersion.V1;
}

@Override
VersionStore sourceVersionStore() {
return new VersionStoreImpl(persist);
}

@Override
VersionStore targetVersionStore() {
return daVersionStore;
}

@Override
void prepareTargetRepo() {
// Initialize repository w/o a default branch
databaseAdapter.eraseRepo();
databaseAdapter.initializeRepo("main");
try {
databaseAdapter.delete(BranchName.of("main"), Optional.empty());
} catch (ReferenceNotFoundException | ReferenceConflictException e) {
throw new RuntimeException(e);
}
}

@Override
ImportResult importRepo() throws IOException {
NessieImporter importer =
NessieImporter.builder()
.databaseAdapter(databaseAdapter)
.importFileSupplier(FileImporter.builder().sourceDirectory(dir).build())
.build();
return importer.importNessieRepository();
}

@Override
ExportMeta exportRepo(boolean fullScan) throws IOException {
NessieExporter exporter =
NessieExporter.builder()
.persist(persist)
.exportVersion(1) // Must set export version to 1
.fullScan(fullScan)
.exportFileSupplier(FileExporter.builder().targetDirectory(dir).build())
.build();
return exporter.exportNessieRepository();
}

@Override
@MustBeClosed
Stream<Hash> scanAllTargetCommits() {
return databaseAdapter.scanAllCommitLogEntries().map(CommitLogEntry::getHash);
}

@Override
protected void checkRepositoryDescription() {}
}

0 comments on commit c72b482

Please sign in to comment.