Skip to content

Commit

Permalink
Fix clean-up of reference index objects (#10083)
Browse files Browse the repository at this point in the history
Consider index objects and index stripes "referenced"

This is a follow-up to #9688 and a pre-requisite to #10048.
  • Loading branch information
dimas-b authored Dec 12, 2024
1 parent 7edc417 commit 9c42476
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 22 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ as necessary. Empty sections will not end in the release notes.

### Fixes

- Fix large index processing in the `cleanup-repository` admin command.

### Commits

## [0.101.1] Release (2024-12-09)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic;
import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic;
import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery;
import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.INDEX_SEGMENTS;
import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.VALUE;
import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID;

Expand All @@ -38,6 +39,7 @@
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.objtypes.CommitOp;
import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj;
import org.projectnessie.versioned.storage.common.objtypes.IndexSegmentsObj;
import org.projectnessie.versioned.storage.common.persist.Obj;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.Persist;
Expand Down Expand Up @@ -274,6 +276,16 @@ private void handleCommit(CommitObj commit) {
.commitRelatedObjects(commit)
.forEach(this::pendingObj);

commit
.referenceIndexStripes()
.forEach(
indexStripe ->
referencedObjectsContext.referencedObjects().markReferenced(indexStripe.segment()));

if (commit.referenceIndex() != null) {
pendingObj(commit.referenceIndex());
}

var indexesLogic = indexesLogic(referencedObjectsContext.persist());
var index = indexesLogic.buildCompleteIndexOrEmpty(commit);
for (StoreIndexElement<CommitOp> indexElement : index) {
Expand Down Expand Up @@ -354,6 +366,11 @@ private void handleObj(Obj obj) {
.valueFromStore(contentValueObj.payload(), contentValueObj.data());

handleContent(content);
} else if (INDEX_SEGMENTS.equals(objType)) {
var segments = (IndexSegmentsObj) obj;
segments
.stripes()
.forEach(s -> referencedObjectsContext.referencedObjects().markReferenced(s.segment()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import static java.util.UUID.randomUUID;
import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8;
import static org.projectnessie.versioned.storage.cleanup.Cleanup.createCleanup;
import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_MAX_INCREMENTAL_INDEX_SIZE;
import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_MAX_REFERENCE_STRIPES_PER_COMMIT;
import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_MAX_SERIALIZED_INDEX_SIZE;
import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key;
import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd;
import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder;
Expand All @@ -41,16 +44,28 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.projectnessie.nessie.immutables.NessieImmutable;
import org.projectnessie.versioned.storage.common.exceptions.CommitConflictException;
import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException;
import org.projectnessie.versioned.storage.common.logic.CommitLogic;
import org.projectnessie.versioned.storage.common.logic.CreateCommit;
import org.projectnessie.versioned.storage.common.logic.InternalRef;
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.objtypes.Compression;
import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj;
import org.projectnessie.versioned.storage.common.objtypes.IndexSegmentsObj;
import org.projectnessie.versioned.storage.common.objtypes.IndexStripe;
import org.projectnessie.versioned.storage.common.persist.Obj;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.testextension.NessiePersist;
import org.projectnessie.versioned.storage.testextension.NessieStoreConfig;
import org.projectnessie.versioned.storage.testextension.PersistExtension;
import org.projectnessie.versioned.store.DefaultStoreWorker;

@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class})
@NessieStoreConfig(name = CONFIG_MAX_INCREMENTAL_INDEX_SIZE, value = "10000")
@NessieStoreConfig(name = CONFIG_MAX_SERIALIZED_INDEX_SIZE, value = "10000")
@NessieStoreConfig(name = CONFIG_MAX_REFERENCE_STRIPES_PER_COMMIT, value = "10")
public class TestCleanup {
@InjectSoftAssertions protected SoftAssertions soft;

Expand Down Expand Up @@ -464,37 +479,110 @@ void withSecondaryParents() throws Exception {
.containsExactly(Optional.empty(), 5L + 13L + 12L, 0L);
}

@Test
void withReferenceIndexStripes() throws Exception {
soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();

var referenceLogic = referenceLogic(persist);
var commitLogic = commitLogic(persist);

// Note: the 200 value works in conjunction with config overrides at the class level
var root = commit(commitLogic, EMPTY_OBJ_ID, 200, "withIndexStripes-root-", "main");
var c1 = commit(commitLogic, root.id(), "withIndexStripes", "c1", "main");

referenceLogic.createReference("refs/heads/withIndexStripes", c1.id(), null);

var indexStripes = c1.referenceIndexStripes();
soft.assertThat(indexStripes).hasSizeGreaterThan(1);

var resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros());
soft.assertThat(resolveAndPurge.purgeResult().stats().failure()).isEmpty();

var filter = resolveAndPurge.resolveResult().purgeObjectsContext().referencedObjects();
for (IndexStripe indexStripe : indexStripes) {
soft.assertThat(indexStripe).matches(s -> filter.isProbablyReferenced(s.segment()));
}
}

@Test
void withReferenceIndexSegments() throws Exception {
soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();

var referenceLogic = referenceLogic(persist);
var commitLogic = commitLogic(persist);

// Note: the 1000 value works in conjunction with config overrides at the class level
var root = commit(commitLogic, EMPTY_OBJ_ID, 1000, "withIndexSegments-root-", "main");
var c1 = commit(commitLogic, root.id(), "withIndexSegments", "c1", "main");

var indexId = c1.referenceIndex();
var indexObj = persist.fetchObj(requireNonNull(indexId));
soft.assertThat(indexObj).isInstanceOf(IndexSegmentsObj.class);
var segments = (IndexSegmentsObj) indexObj;
soft.assertThat(segments.stripes()).hasSizeGreaterThan(1);

referenceLogic.createReference("refs/heads/withIndexSegments", c1.id(), null);

var resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros());
soft.assertThat(resolveAndPurge.purgeResult().stats().failure()).isEmpty();

var filter = resolveAndPurge.resolveResult().purgeObjectsContext().referencedObjects();
soft.assertThat(indexId).extracting(filter::isProbablyReferenced).isEqualTo(true);
soft.assertThat(segments.stripes()).allMatch(s -> filter.isProbablyReferenced(s.segment()));
}

private ObjId buildNewCommitChain(CommitLogic commitLogic, String discrim) throws Exception {
var head = EMPTY_OBJ_ID;
for (int i = 0; i < 5; i++) {
var cid1 = randomUUID();
var obj1 =
contentValue(
cid1.toString(),
127,
DefaultStoreWorker.instance()
.toStoreOnReferenceState(onRef("obj " + i + " " + discrim, cid1.toString())));
var commit =
commitLogic.doCommit(
newCommitBuilder()
.commitType(NORMAL)
.parentCommitId(head)
.addAdds(
commitAdd(
key("store", "key", Integer.toString(i), discrim),
42,
obj1.id(),
null,
cid1))
.headers(newCommitHeaders().add("created", "foo-" + i + "-" + discrim).build())
.message("commit " + i + " " + discrim)
.build(),
List.of(obj1));
commit(commitLogic, head, "obj " + i + " " + discrim, Integer.toString(i), discrim);
head = requireNonNull(commit).id();
}
return head;
}

private CommitObj commit(
CommitLogic commitLogic, ObjId parent, String onRefState, String key, String discrim)
throws CommitConflictException, ObjNotFoundException {
var builder =
newCommitBuilder()
.commitType(NORMAL)
.parentCommitId(parent)
.headers(newCommitHeaders().add("created", "foo-" + key + "-" + discrim).build())
.message("commit " + key + " " + discrim);
var contentValue = addValue(builder, onRefState, key, discrim);
return commitLogic.doCommit(builder.build(), List.of(contentValue));
}

private CommitObj commit(
CommitLogic commitLogic, ObjId parent, int numKeys, String keyPrefix, String discrim)
throws CommitConflictException, ObjNotFoundException {
var builder =
newCommitBuilder()
.commitType(NORMAL)
.parentCommitId(parent)
.headers(newCommitHeaders().add("created", numKeys + "-" + discrim).build())
.message("commit multiple: " + numKeys + " " + discrim);
List<Obj> values = new ArrayList<>();
for (int i = 0; i < numKeys; i++) {
values.add(addValue(builder, "obj " + keyPrefix + i, keyPrefix + i, discrim));
}
return commitLogic.doCommit(builder.build(), values);
}

private ContentValueObj addValue(
CreateCommit.Builder builder, String onRefState, String key, String discrim) {
var cid = randomUUID();
var contentValue =
contentValue(
cid.toString(),
127,
DefaultStoreWorker.instance()
.toStoreOnReferenceState(onRef(onRefState, cid.toString())));
builder.addAdds(commitAdd(key("store", "key", key, discrim), 42, contentValue.id(), null, cid));
return contentValue;
}

ResolvePurgeResult resolveAndPurge(long maxObjReferenced) throws Exception {
var cleanup = createCleanup(CleanupParams.builder().build());
var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced);
Expand Down

0 comments on commit 9c42476

Please sign in to comment.