diff --git a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/QuarkusStoreConfig.java b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/QuarkusStoreConfig.java index 508e670775d..b2789c374fe 100644 --- a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/QuarkusStoreConfig.java +++ b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/QuarkusStoreConfig.java @@ -20,6 +20,8 @@ import io.smallrye.config.WithConverter; import io.smallrye.config.WithDefault; import io.smallrye.config.WithName; +import java.time.Duration; +import java.util.Optional; import java.util.OptionalDouble; import java.util.OptionalInt; import org.projectnessie.versioned.storage.common.config.StoreConfig; @@ -136,4 +138,12 @@ public interface QuarkusStoreConfig extends StoreConfig { */ @WithName(CONFIG_CACHE_CAPACITY_FRACTION_ADJUST_MB) OptionalInt cacheCapacityFractionAdjustMB(); + + @WithName(CONFIG_REFERENCE_CACHE_TTL) + @Override + Optional referenceCacheTtl(); + + @WithName(CONFIG_REFERENCE_NEGATIVE_CACHE_TTL) + @Override + Optional referenceCacheNegativeTtl(); } diff --git a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/providers/storage/PersistProvider.java b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/providers/storage/PersistProvider.java index 6014cb195b2..dfaf11de215 100644 --- a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/providers/storage/PersistProvider.java +++ b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/providers/storage/PersistProvider.java @@ -146,12 +146,21 @@ public Persist producePersist(MeterRegistry meterRegistry) { String cacheInfo; if (effectiveCacheSizeMB > 0) { - CacheConfig cacheConfig = - CacheConfig.builder() - .capacityMb(effectiveCacheSizeMB) - .meterRegistry(meterRegistry) - .build(); - CacheBackend cacheBackend = PersistCaches.newBackend(cacheConfig); + CacheConfig.Builder cacheConfig = + CacheConfig.builder().capacityMb(effectiveCacheSizeMB).meterRegistry(meterRegistry); + + storeConfig + .referenceCacheTtl() + .ifPresent( + refTtl -> { + LOGGER.warn( + "Reference caching is an experimental feature but enabled with a TTL of {}", + refTtl); + cacheConfig.referenceTtl(refTtl); + }); + storeConfig.referenceCacheNegativeTtl().ifPresent(cacheConfig::referenceNegativeTtl); + + CacheBackend cacheBackend = PersistCaches.newBackend(cacheConfig.build()); persist = cacheBackend.wrap(persist); cacheInfo = "with " + effectiveCacheSizeMB + " MB objects cache"; } else { diff --git a/versioned/storage/batching/src/main/java/org/projectnessie/versioned/storage/batching/BatchingPersistImpl.java b/versioned/storage/batching/src/main/java/org/projectnessie/versioned/storage/batching/BatchingPersistImpl.java index 9f4d1e6d473..a13d935ad5a 100644 --- a/versioned/storage/batching/src/main/java/org/projectnessie/versioned/storage/batching/BatchingPersistImpl.java +++ b/versioned/storage/batching/src/main/java/org/projectnessie/versioned/storage/batching/BatchingPersistImpl.java @@ -404,6 +404,13 @@ public Reference fetchReference(@Nonnull @javax.annotation.Nonnull String name) return delegate().fetchReference(name); } + @Override + @Nullable + @javax.annotation.Nullable + public Reference fetchReferenceForUpdate(@Nonnull @javax.annotation.Nonnull String name) { + return delegate().fetchReferenceForUpdate(name); + } + @Override @Nonnull @javax.annotation.Nonnull @@ -411,6 +418,13 @@ public Reference[] fetchReferences(@Nonnull @javax.annotation.Nonnull String[] n return delegate().fetchReferences(names); } + @Override + @Nonnull + @javax.annotation.Nonnull + public Reference[] fetchReferencesForUpdate(@Nonnull @javax.annotation.Nonnull String[] names) { + return delegate().fetchReferencesForUpdate(names); + } + @Override @Nonnull @javax.annotation.Nonnull diff --git a/versioned/storage/bigtable/src/main/java/org/projectnessie/versioned/storage/bigtable/BigTablePersist.java b/versioned/storage/bigtable/src/main/java/org/projectnessie/versioned/storage/bigtable/BigTablePersist.java index 81f29c8d01b..0e13e61f54c 100644 --- a/versioned/storage/bigtable/src/main/java/org/projectnessie/versioned/storage/bigtable/BigTablePersist.java +++ b/versioned/storage/bigtable/src/main/java/org/projectnessie/versioned/storage/bigtable/BigTablePersist.java @@ -173,7 +173,7 @@ public Reference addReference(@Nonnull Reference reference) throws RefAlreadyExi .filter(FILTERS.family().exactMatch(FAMILY_REFS)) .filter(FILTERS.qualifier().exactMatch(QUALIFIER_REFS)); - boolean success = + boolean failure = backend .client() .checkAndMutateRow( @@ -181,7 +181,7 @@ public Reference addReference(@Nonnull Reference reference) throws RefAlreadyExi .condition(condition) .otherwise(mutation)); - if (success) { + if (failure) { throw new RefAlreadyExistsException(fetchReference(reference.name())); } diff --git a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CacheBackend.java b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CacheBackend.java index 85c4febd2af..9085bbd782e 100644 --- a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CacheBackend.java +++ b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CacheBackend.java @@ -15,17 +15,24 @@ */ package org.projectnessie.versioned.storage.cache; +import static org.projectnessie.versioned.storage.common.persist.ObjId.zeroLengthObjId; +import static org.projectnessie.versioned.storage.common.persist.Reference.reference; + import jakarta.annotation.Nonnull; import org.projectnessie.versioned.storage.common.persist.Backend; import org.projectnessie.versioned.storage.common.persist.Obj; import org.projectnessie.versioned.storage.common.persist.ObjId; import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; /** * Provides the cache primitives for a caching {@link Persist} facade, suitable for multiple * repositories. It is adviseable to have one {@link CacheBackend} per {@link Backend}. */ public interface CacheBackend { + Reference NON_EXISTENT_REFERENCE_SENTINEL = + reference("NON_EXISTENT", zeroLengthObjId(), false, -1L, null); + Obj get(@Nonnull String repositoryId, @Nonnull ObjId id); void put(@Nonnull String repositoryId, @Nonnull Obj obj); @@ -34,5 +41,13 @@ public interface CacheBackend { void clear(@Nonnull String repositoryId); - Persist wrap(@Nonnull Persist perist); + Persist wrap(@Nonnull Persist persist); + + Reference getReference(@Nonnull String repositoryId, @Nonnull String name); + + void removeReference(@Nonnull String repositoryId, @Nonnull String name); + + void putReference(@Nonnull String repositoryId, @Nonnull Reference r); + + void putNegative(@Nonnull String repositoryId, @Nonnull String name); } diff --git a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CacheConfig.java b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CacheConfig.java index 3f910d6625b..3a94da34206 100644 --- a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CacheConfig.java +++ b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CacheConfig.java @@ -15,18 +15,30 @@ */ package org.projectnessie.versioned.storage.cache; +import static com.google.common.base.Preconditions.checkState; + import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.micrometer.core.instrument.MeterRegistry; +import java.time.Duration; import java.util.Optional; import java.util.function.LongSupplier; import org.immutables.value.Value; @Value.Immutable public interface CacheConfig { + + String INVALID_REFERENCE_NEGATIVE_TTL = + "Cache reference-negative-TTL must only be present, if reference-TTL is configured, and must only be positive."; + String INVALID_REFERENCE_TTL = "Cache reference-TTL must be positive, if present."; + long capacityMb(); Optional meterRegistry(); + Optional referenceTtl(); + + Optional referenceNegativeTtl(); + @Value.Default default LongSupplier clockNanos() { return System::nanoTime; @@ -36,6 +48,18 @@ static Builder builder() { return ImmutableCacheConfig.builder(); } + @Value.Check + default void check() { + referenceTtl() + .ifPresent(ttl -> checkState(ttl.compareTo(Duration.ZERO) > 0, INVALID_REFERENCE_TTL)); + referenceNegativeTtl() + .ifPresent( + ttl -> + checkState( + referenceTtl().isPresent() && ttl.compareTo(Duration.ZERO) > 0, + INVALID_REFERENCE_NEGATIVE_TTL)); + } + interface Builder { @CanIgnoreReturnValue Builder capacityMb(long capacityMb); @@ -43,6 +67,12 @@ interface Builder { @CanIgnoreReturnValue Builder meterRegistry(MeterRegistry meterRegistry); + @CanIgnoreReturnValue + Builder referenceTtl(Duration referenceTtl); + + @CanIgnoreReturnValue + Builder referenceNegativeTtl(Duration referenceNegativeTtl); + @CanIgnoreReturnValue Builder clockNanos(LongSupplier clockNanos); diff --git a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CachingPersistImpl.java b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CachingPersistImpl.java index 6e46bff9d8e..54bf5c36f0f 100644 --- a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CachingPersistImpl.java +++ b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CachingPersistImpl.java @@ -15,6 +15,8 @@ */ package org.projectnessie.versioned.storage.cache; +import static org.projectnessie.versioned.storage.cache.CacheBackend.NON_EXISTENT_REFERENCE_SENTINEL; + import jakarta.annotation.Nonnull; import java.util.Set; import org.projectnessie.versioned.storage.common.config.StoreConfig; @@ -261,40 +263,148 @@ public String name() { return persist.name(); } + // References + @Override @Nonnull public Reference addReference(@Nonnull Reference reference) throws RefAlreadyExistsException { - return persist.addReference(reference); + Reference r = null; + try { + return r = persist.addReference(reference); + } finally { + if (r != null) { + cache.putReference(r); + } else { + cache.removeReference(reference.name()); + } + } } @Override @Nonnull public Reference markReferenceAsDeleted(@Nonnull Reference reference) throws RefNotFoundException, RefConditionFailedException { - return persist.markReferenceAsDeleted(reference); + Reference r = null; + try { + return r = persist.markReferenceAsDeleted(reference); + } finally { + if (r != null) { + cache.putReference(r); + } else { + cache.removeReference(reference.name()); + } + } } @Override public void purgeReference(@Nonnull Reference reference) throws RefNotFoundException, RefConditionFailedException { - persist.purgeReference(reference); + try { + persist.purgeReference(reference); + } finally { + cache.removeReference(reference.name()); + } } @Override @Nonnull public Reference updateReferencePointer(@Nonnull Reference reference, @Nonnull ObjId newPointer) throws RefNotFoundException, RefConditionFailedException { - return persist.updateReferencePointer(reference, newPointer); + Reference r = null; + try { + return r = persist.updateReferencePointer(reference, newPointer); + } finally { + if (r != null) { + cache.putReference(r); + } else { + cache.removeReference(reference.name()); + } + } } @Override public Reference fetchReference(@Nonnull String name) { - return persist.fetchReference(name); + return fetchReferenceInternal(name, false); + } + + @Override + public Reference fetchReferenceForUpdate(@Nonnull String name) { + return fetchReferenceInternal(name, true); + } + + private Reference fetchReferenceInternal(@Nonnull String name, boolean bypassCache) { + Reference r = null; + if (!bypassCache) { + r = cache.getReference(name); + if (r == NON_EXISTENT_REFERENCE_SENTINEL) { + return null; + } + } + + if (r == null) { + r = persist.fetchReferenceForUpdate(name); + if (r == null) { + cache.putNegative(name); + } else { + cache.putReference(r); + } + } + return r; } @Override @Nonnull public Reference[] fetchReferences(@Nonnull String[] names) { - return persist.fetchReferences(names); + return fetchReferencesInternal(names, false); + } + + @Override + @Nonnull + public Reference[] fetchReferencesForUpdate(@Nonnull String[] names) { + return fetchReferencesInternal(names, true); + } + + private Reference[] fetchReferencesInternal(@Nonnull String[] names, boolean bypassCache) { + Reference[] r = new Reference[names.length]; + + String[] backend = null; + if (!bypassCache) { + for (int i = 0; i < names.length; i++) { + String name = names[i]; + if (name != null) { + Reference cr = cache.getReference(name); + if (cr != null) { + if (cr != NON_EXISTENT_REFERENCE_SENTINEL) { + r[i] = cr; + } + } else { + if (backend == null) { + backend = new String[names.length]; + } + backend[i] = name; + } + } + } + } else { + backend = names; + } + + if (backend != null) { + Reference[] br = persist.fetchReferencesForUpdate(backend); + for (int i = 0; i < br.length; i++) { + String name = backend[i]; + if (name != null) { + Reference ref = br[i]; + if (ref != null) { + r[i] = ref; + cache.putReference(ref); + } else { + cache.putNegative(name); + } + } + } + } + + return r; } } diff --git a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java index 5afd263c9c2..fd56f8fe001 100644 --- a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java +++ b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java @@ -15,12 +15,15 @@ */ package org.projectnessie.versioned.storage.cache; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.projectnessie.versioned.storage.common.persist.ObjType.CACHE_UNLIMITED; import static org.projectnessie.versioned.storage.common.persist.ObjType.NOT_CACHED; +import static org.projectnessie.versioned.storage.serialize.ProtoSerialization.deserializeReference; import static org.projectnessie.versioned.storage.serialize.ProtoSerialization.serializeObj; +import static org.projectnessie.versioned.storage.serialize.ProtoSerialization.serializeReference; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -28,23 +31,32 @@ import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter; import jakarta.annotation.Nonnull; +import java.time.Duration; import org.checkerframework.checker.index.qual.NonNegative; import org.projectnessie.versioned.storage.common.exceptions.ObjTooLargeException; import org.projectnessie.versioned.storage.common.persist.Obj; import org.projectnessie.versioned.storage.common.persist.ObjId; import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; import org.projectnessie.versioned.storage.serialize.ProtoSerialization; final class CaffeineCacheBackend implements CacheBackend { public static final String CACHE_NAME = "nessie-objects"; + private static final byte[] NON_EXISTING_SENTINEL = "NON_EXISTING".getBytes(UTF_8); private final CacheConfig config; final Cache cache; + private final long refCacheTtlNanos; + private final long refCacheNegativeTtlNanos; + CaffeineCacheBackend(CacheConfig config) { this.config = config; + refCacheTtlNanos = config.referenceTtl().orElse(Duration.ZERO).toNanos(); + refCacheNegativeTtlNanos = config.referenceNegativeTtl().orElse(Duration.ZERO).toNanos(); + Caffeine cacheBuilder = Caffeine.newBuilder() .maximumWeight(config.capacityMb() * 1024L * 1024L) @@ -154,6 +166,56 @@ public void clear(@Nonnull String repositoryId) { cache.asMap().keySet().removeIf(k -> k.repositoryId.equals(repositoryId)); } + private ObjId refObjId(String name) { + return ObjId.objIdFromByteArray(("r:" + name).getBytes(UTF_8)); + } + + @Override + public void removeReference(@Nonnull String repositoryId, @Nonnull String name) { + if (refCacheTtlNanos <= 0L) { + return; + } + ObjId id = refObjId(name); + CacheKeyValue key = cacheKey(repositoryId, id); + cache.invalidate(key); + } + + @Override + public void putReference(@Nonnull String repositoryId, @Nonnull Reference r) { + if (refCacheTtlNanos <= 0L) { + return; + } + ObjId id = refObjId(r.name()); + CacheKeyValue key = + cacheKeyValue(repositoryId, id, config.clockNanos().getAsLong() + refCacheTtlNanos); + cache.put(key, serializeReference(r)); + } + + @Override + public void putNegative(@Nonnull String repositoryId, @Nonnull String name) { + if (refCacheNegativeTtlNanos <= 0L) { + return; + } + ObjId id = refObjId(name); + CacheKeyValue key = + cacheKeyValue(repositoryId, id, config.clockNanos().getAsLong() + refCacheNegativeTtlNanos); + cache.put(key, NON_EXISTING_SENTINEL); + } + + @Override + public Reference getReference(@Nonnull String repositoryId, @Nonnull String name) { + if (refCacheTtlNanos <= 0L) { + return null; + } + ObjId id = refObjId(name); + CacheKeyValue keyValue = cacheKey(repositoryId, id); + byte[] bytes = cache.getIfPresent(keyValue); + if (bytes == NON_EXISTING_SENTINEL) { + return NON_EXISTENT_REFERENCE_SENTINEL; + } + return bytes != null ? deserializeReference(bytes) : null; + } + static CacheKeyValue cacheKey(String repositoryId, ObjId id) { return new CacheKeyValue(repositoryId, id); } diff --git a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/ObjCache.java b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/ObjCache.java index cd706e61660..3960a98ab29 100644 --- a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/ObjCache.java +++ b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/ObjCache.java @@ -19,6 +19,7 @@ import org.projectnessie.versioned.storage.common.persist.Obj; import org.projectnessie.versioned.storage.common.persist.ObjId; import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; /** Cache primitives for a specific repository ID, used for one {@link Persist} instance. */ public interface ObjCache { @@ -30,4 +31,12 @@ public interface ObjCache { void remove(@Nonnull ObjId id); void clear(); + + Reference getReference(@Nonnull String name); + + void removeReference(@Nonnull String name); + + void putReference(@Nonnull Reference r); + + void putNegative(@Nonnull String name); } diff --git a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/ObjCacheImpl.java b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/ObjCacheImpl.java index 4494e415506..ce49c21a33c 100644 --- a/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/ObjCacheImpl.java +++ b/versioned/storage/cache/src/main/java/org/projectnessie/versioned/storage/cache/ObjCacheImpl.java @@ -19,6 +19,7 @@ import org.projectnessie.versioned.storage.common.config.StoreConfig; import org.projectnessie.versioned.storage.common.persist.Obj; import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Reference; final class ObjCacheImpl implements ObjCache { private final CacheBackend backend; @@ -48,4 +49,24 @@ public void remove(@Nonnull ObjId id) { public void clear() { backend.clear(repositoryId); } + + @Override + public void removeReference(@Nonnull String name) { + backend.removeReference(repositoryId, name); + } + + @Override + public void putReference(@Nonnull Reference r) { + backend.putReference(repositoryId, r); + } + + @Override + public Reference getReference(@Nonnull String name) { + return backend.getReference(repositoryId, name); + } + + @Override + public void putNegative(@Nonnull String name) { + backend.putNegative(repositoryId, name); + } } diff --git a/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestCacheConfig.java b/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestCacheConfig.java new file mode 100644 index 00000000000..85b31aecef3 --- /dev/null +++ b/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestCacheConfig.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cache; + +import static org.projectnessie.versioned.storage.cache.CacheConfig.INVALID_REFERENCE_NEGATIVE_TTL; +import static org.projectnessie.versioned.storage.cache.CacheConfig.INVALID_REFERENCE_TTL; + +import java.time.Duration; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestCacheConfig { + @InjectSoftAssertions protected SoftAssertions soft; + + @Test + public void allDefaults() { + soft.assertThatCode(() -> defaultBuilder().build()).doesNotThrowAnyException(); + } + + @Test + public void referenceCaching() { + soft.assertThatCode(() -> defaultBuilder().referenceTtl(Duration.ofMinutes(1)).build()) + .doesNotThrowAnyException(); + soft.assertThatCode( + () -> + defaultBuilder() + .referenceTtl(Duration.ofMinutes(1)) + .referenceNegativeTtl(Duration.ofMinutes(1)) + .build()) + .doesNotThrowAnyException(); + soft.assertThatIllegalStateException() + .isThrownBy(() -> defaultBuilder().referenceTtl(Duration.ofMinutes(0)).build()) + .withMessage(INVALID_REFERENCE_TTL); + soft.assertThatIllegalStateException() + .isThrownBy(() -> defaultBuilder().referenceTtl(Duration.ofMinutes(-1)).build()) + .withMessage(INVALID_REFERENCE_TTL); + soft.assertThatIllegalStateException() + .isThrownBy(() -> defaultBuilder().referenceNegativeTtl(Duration.ofMinutes(1)).build()) + .withMessage(INVALID_REFERENCE_NEGATIVE_TTL); + soft.assertThatIllegalStateException() + .isThrownBy( + () -> + defaultBuilder() + .referenceTtl(Duration.ofMinutes(1)) + .referenceNegativeTtl(Duration.ofMinutes(-1)) + .build()) + .withMessage(INVALID_REFERENCE_NEGATIVE_TTL); + soft.assertThatIllegalStateException() + .isThrownBy( + () -> + defaultBuilder() + .referenceTtl(Duration.ofMinutes(1)) + .referenceNegativeTtl(Duration.ofMinutes(0)) + .build()) + .withMessage(INVALID_REFERENCE_NEGATIVE_TTL); + } + + private static CacheConfig.Builder defaultBuilder() { + return CacheConfig.builder().capacityMb(1); + } +} diff --git a/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestReferenceCaching.java b/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestReferenceCaching.java new file mode 100644 index 00000000000..33e21c7af1f --- /dev/null +++ b/versioned/storage/cache/src/test/java/org/projectnessie/versioned/storage/cache/TestReferenceCaching.java @@ -0,0 +1,258 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cache; + +import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; +import static org.projectnessie.versioned.storage.common.persist.Reference.reference; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.projectnessie.versioned.storage.common.exceptions.RefAlreadyExistsException; +import org.projectnessie.versioned.storage.common.exceptions.RefConditionFailedException; +import org.projectnessie.versioned.storage.common.exceptions.RefNotFoundException; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.PersistExtension; + +@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) +public class TestReferenceCaching { + public static final String REF_NAME = "refs/heads/foo"; + @InjectSoftAssertions protected SoftAssertions soft; + + Persist wrapWithCache(Persist persist, LongSupplier clockNanos) { + return PersistCaches.newBackend( + CacheConfig.builder() + .capacityMb(32) + .clockNanos(clockNanos) + .referenceTtl(Duration.ofMinutes(1)) + .referenceNegativeTtl(Duration.ofSeconds(1)) + .build()) + .wrap(persist); + } + + // Two caching `Persist` instances, using _independent_ cache backends. + Persist withCache1; + Persist withCache2; + + AtomicLong nowNanos; + + @BeforeEach + void wrapCaches(@NessiePersist Persist persist1, @NessiePersist Persist persist2) { + nowNanos = new AtomicLong(); + withCache1 = wrapWithCache(persist1, nowNanos::get); + withCache2 = wrapWithCache(persist2, nowNanos::get); + } + + /** Explicit cache-expiry via {@link Persist#fetchReferenceForUpdate(String)}. */ + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void referenceCacheInconsistency(boolean bulk) throws Exception { + // Create ref via instance 1 + Reference ref = + reference(REF_NAME, randomObjId(), false, withCache1.config().currentTimeMicros(), null); + withCache1.addReference(ref); + + // Populate cache in instance 2 + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isEqualTo(ref); + + // Update ref via instance 1 + Reference refUpdated = withCache1.updateReferencePointer(ref, randomObjId()); + soft.assertThat(refUpdated).isNotEqualTo(ref); + + soft.assertThat(fetchRef(withCache1, bulk, ref.name())).isEqualTo(refUpdated); + // Other test instance did NOT update its cache + soft.assertThat(fetchRef(withCache2, bulk, ref.name())) + .extracting(Reference::pointer) + .describedAs("Previous: %s, updated: %s", ref.pointer(), refUpdated.pointer()) + .isEqualTo(ref.pointer()) + .isNotEqualTo(refUpdated.pointer()); + + // + + if (bulk) { + soft.assertThat(withCache2.fetchReferencesForUpdate(new String[] {ref.name()})) + .containsExactly(refUpdated); + } else { + soft.assertThat(withCache2.fetchReferenceForUpdate(ref.name())).isEqualTo(refUpdated); + } + } + + /** Reference cache TTL expiry. */ + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void referenceCacheExpiry(boolean bulk) throws Exception { + // Create ref via instance 1 + Reference ref = + reference(REF_NAME, randomObjId(), false, withCache1.config().currentTimeMicros(), null); + withCache1.addReference(ref); + + // Populate cache in instance 2 + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isEqualTo(ref); + + // Update ref via instance 1 + Reference refUpdated = withCache1.updateReferencePointer(ref, randomObjId()); + soft.assertThat(refUpdated).isNotEqualTo(ref); + + soft.assertThat(fetchRef(withCache1, bulk, ref.name())).isEqualTo(refUpdated); + // Other test instance did NOT update its cache + soft.assertThat(fetchRef(withCache2, bulk, ref.name())) + .extracting(Reference::pointer) + .describedAs("Previous: %s, updated: %s", ref.pointer(), refUpdated.pointer()) + .isEqualTo(ref.pointer()) + .isNotEqualTo(refUpdated.pointer()); + + // + + nowNanos.addAndGet(Duration.ofMinutes(2).toNanos()); + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isEqualTo(refUpdated); + } + + /** Tests negative-cache behavior (non-existence of a reference). */ + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void referenceCacheNegativeExpiry(boolean bulk) throws Exception { + // Populate both caches w/ negative entries + soft.assertThat(fetchRef(withCache1, bulk, REF_NAME)).isNull(); + soft.assertThat(fetchRef(withCache2, bulk, REF_NAME)).isNull(); + + // Create ref via instance 1 + Reference ref = + reference(REF_NAME, randomObjId(), false, withCache1.config().currentTimeMicros(), null); + withCache1.addReference(ref); + + // Cache 1 has "correct" entry + soft.assertThat(fetchRef(withCache1, bulk, ref.name())).isEqualTo(ref); + // Cache 2 has stale negative entry + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isNull(); + + // Expire negative cache entries + nowNanos.addAndGet(Duration.ofSeconds(2).toNanos()); + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isEqualTo(ref); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void addReference(boolean bulk) throws Exception { + // Create ref via instance 1 + Reference ref = + reference(REF_NAME, randomObjId(), false, withCache1.config().currentTimeMicros(), null); + withCache1.addReference(ref); + + // Try addReference via instance 2 + soft.assertThatThrownBy(() -> withCache2.addReference(ref)) + .isInstanceOf(RefAlreadyExistsException.class); + + // Update ref via instance 1 + Reference refUpdated = withCache1.updateReferencePointer(ref, randomObjId()); + soft.assertThat(refUpdated).isNotEqualTo(ref); + + soft.assertThat(fetchRef(withCache1, bulk, ref.name())).isEqualTo(refUpdated); + // Other test instance DID populate its cache + soft.assertThat(fetchRef(withCache2, bulk, ref.name())) + .extracting(Reference::pointer) + .describedAs("Previous: %s, updated: %s", ref.pointer(), refUpdated.pointer()) + .isEqualTo(refUpdated.pointer()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void markReferenceAsDeletedAndPurge(boolean bulk) throws Exception { + // Create ref via instance 1 + Reference ref = + reference(REF_NAME, randomObjId(), false, withCache1.config().currentTimeMicros(), null); + withCache1.addReference(ref); + + // Try addReference via instance 2 + soft.assertThatThrownBy(() -> withCache2.addReference(ref)) + .isInstanceOf(RefAlreadyExistsException.class); + + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isEqualTo(ref); + + // Mark ref as deleted via instance 1 + Reference refDeleted = withCache1.markReferenceAsDeleted(ref); + soft.assertThat(refDeleted).isNotEqualTo(ref); + + // instance 2 still has the cached deleted instance + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isEqualTo(ref); + + // Try markReferenceAsDeleted via instance 2 + soft.assertThatThrownBy(() -> withCache2.markReferenceAsDeleted(ref)) + .isInstanceOf(RefConditionFailedException.class); + + soft.assertThat(fetchRef(withCache1, bulk, ref.name())).isEqualTo(refDeleted); + // Other test instance DID populate its cache + soft.assertThat(fetchRef(withCache2, bulk, ref.name())) + .extracting(Reference::pointer) + .describedAs("Previous: %s, updated: %s", ref.pointer(), refDeleted.pointer()) + .isEqualTo(refDeleted.pointer()); + + // + // purge + // + + // Mark ref as deleted via instance 1 + soft.assertThatCode(() -> withCache1.purgeReference(refDeleted)).doesNotThrowAnyException(); + + // instance 2 still has the cached deleted instance + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isEqualTo(refDeleted); + + // Try markReferenceAsDeleted via instance 2 + soft.assertThatThrownBy(() -> withCache2.purgeReference(ref)) + .isInstanceOf(RefNotFoundException.class); + + soft.assertThat(fetchRef(withCache1, bulk, ref.name())).isNull(); + // Other test instance DID populate its cache + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isNull(); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void purgeVsUpdate(boolean bulk) throws Exception { + // Create ref via instance 1 + Reference ref = + reference(REF_NAME, randomObjId(), false, withCache1.config().currentTimeMicros(), null); + withCache1.addReference(ref); + + soft.assertThat(fetchRef(withCache2, bulk, ref.name())).isEqualTo(ref); + + // Mark ref as deleted via instance 1 + Reference refDeleted = withCache1.markReferenceAsDeleted(ref); + soft.assertThat(refDeleted).isNotEqualTo(ref); + + // Mark ref as deleted via instance 1 + soft.assertThatCode(() -> withCache1.purgeReference(refDeleted)).doesNotThrowAnyException(); + + soft.assertThat(fetchRef(withCache1, bulk, ref.name())).isNull(); + + soft.assertThatThrownBy(() -> withCache2.updateReferencePointer(ref, randomObjId())) + .isInstanceOf(RefNotFoundException.class); + } + + static Reference fetchRef(Persist persist, boolean bulk, String refName) { + return bulk + ? persist.fetchReferences(new String[] {refName})[0] + : persist.fetchReference(refName); + } +} diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/config/StoreConfig.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/config/StoreConfig.java index 1238146e85f..7333ed6a3b5 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/config/StoreConfig.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/config/StoreConfig.java @@ -19,7 +19,9 @@ import static java.util.concurrent.TimeUnit.SECONDS; import java.time.Clock; +import java.time.Duration; import java.time.Instant; +import java.util.Optional; import java.util.function.Function; import org.immutables.value.Value; @@ -67,6 +69,10 @@ public interface StoreConfig { String CONFIG_PREVIOUS_HEAD_TIME_SPAN_SECONDS = "ref-previous-head-time-span-seconds"; long DEFAULT_PREVIOUS_HEAD_TIME_SPAN_SECONDS = 5 * 60; + String CONFIG_REFERENCE_CACHE_TTL = "reference-cache-ttl"; + + String CONFIG_REFERENCE_NEGATIVE_CACHE_TTL = "reference-cache-negative-ttl"; + /** * Whether namespace validation is enabled, changing this to false will break the Nessie * specification! @@ -250,6 +256,27 @@ default long referencePreviousHeadTimeSpanSeconds() { return DEFAULT_PREVIOUS_HEAD_TIME_SPAN_SECONDS; } + /** + * Defines the duration how long references shall be kept in the cache. Enables reference-caching, + * if configured with a positive duration value, defaults to not cache references. If reference + * caching is enabled, it is highly recommended to also enable negative reference caching. + * + *

This is an experimental feature, currently only for single Nessie node deployments! If + * in doubt, leave this un-configured! + */ + Optional referenceCacheTtl(); + + /** + * Defines the duration how long sentinels for non-existing references shall be kept in the cache + * (negative reference caching). Enabled, if configured with a positive duration value, default is + * not enabled. If reference caching is enabled, it is highly recommended to also enable negative + * reference caching. + * + *

This is an experimental feature, currently only for single Nessie node deployments! If + * in doubt, leave this un-configured! + */ + Optional referenceCacheNegativeTtl(); + /** * Retrieves the current timestamp in microseconds since epoch, using the configured {@link * #clock()}. @@ -334,6 +361,14 @@ default Adjustable fromFunction(Function configFunction) { if (v != null) { a = a.withReferencePreviousHeadTimeSpanSeconds(Long.parseLong(v.trim())); } + v = configFunction.apply(CONFIG_REFERENCE_CACHE_TTL); + if (v != null) { + a = a.withReferenceCacheTtl(Duration.parse(v.trim())); + } + v = configFunction.apply(CONFIG_REFERENCE_NEGATIVE_CACHE_TTL); + if (v != null) { + a = a.withReferenceCacheNegativeTtl(Duration.parse(v.trim())); + } return a; } @@ -379,5 +414,11 @@ default Adjustable fromFunction(Function configFunction) { Adjustable withReferencePreviousHeadCount(int referencePreviousHeadCount); Adjustable withReferencePreviousHeadTimeSpanSeconds(long referencePreviousHeadTimeSpanSeconds); + + /** See {@link StoreConfig#referenceCacheTtl()}. */ + Adjustable withReferenceCacheTtl(Duration referenceCacheTtl); + + /** See {@link StoreConfig#referenceCacheNegativeTtl()}. */ + Adjustable withReferenceCacheNegativeTtl(Duration referencecacheNegativeTtl); } } diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogic.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogic.java index 547f344d116..3b07406bdb2 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogic.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogic.java @@ -40,6 +40,9 @@ public interface ReferenceLogic { @Nonnull List getReferences(@Nonnull List references); + @Nonnull + List getReferencesForUpdate(@Nonnull List references); + @Nonnull default Reference getReference(@Nonnull String name) throws RefNotFoundException { List refs = getReferences(Collections.singletonList(name)); @@ -50,6 +53,16 @@ default Reference getReference(@Nonnull String name) throws RefNotFoundException return ref; } + @Nonnull + default Reference getReferenceForUpdate(@Nonnull String name) throws RefNotFoundException { + List refs = getReferencesForUpdate(Collections.singletonList(name)); + Reference ref = refs.get(0); + if (ref == null) { + throw new RefNotFoundException(name); + } + return ref; + } + /** * Performs the query against existing references according to the given {@link ReferencesQuery}, * which should really depend on the serialized result of the query result in a "public API". diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogicImpl.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogicImpl.java index 179cec5c9a4..071fabd3679 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogicImpl.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogicImpl.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.function.Function; import java.util.function.Supplier; import org.projectnessie.nessie.relocated.protobuf.ByteString; import org.projectnessie.versioned.storage.common.exceptions.CommitConflictException; @@ -186,6 +187,17 @@ final class ReferenceLogicImpl implements ReferenceLogic { @Override @Nonnull public List getReferences(@Nonnull List references) { + return getReferences(references, persist::fetchReferences); + } + + @Override + @Nonnull + public List getReferencesForUpdate(@Nonnull List references) { + return getReferences(references, persist::fetchReferencesForUpdate); + } + + private List getReferences( + @Nonnull List references, Function fetchRefs) { int refCount = references.size(); String[] refsArray; int refRefsIndex = references.indexOf(REF_REFS.name()); @@ -196,7 +208,7 @@ public List getReferences(@Nonnull List references) { refRefsIndex = references.size(); refsArray[refRefsIndex] = REF_REFS.name(); } - Reference[] refs = persist.fetchReferences(refsArray); + Reference[] refs = fetchRefs.apply(refsArray); Supplier refsIndexSupplier = createRefsIndexSupplier(refs[refRefsIndex]); @@ -370,7 +382,7 @@ public void deleteReference(@Nonnull String name, @Nonnull ObjId expectedPointer throws RefNotFoundException, RefConditionFailedException, RetryTimeoutException { checkArgument(!isInternalReferenceName(name)); - Reference reference = persist.fetchReference(name); + Reference reference = persist.fetchReferenceForUpdate(name); Supplier indexSupplier = null; if (reference == null) { StoreKey nameKey = key(name); @@ -471,7 +483,7 @@ CommitReferenceResult commitCreateReference( return commitRetry( persist, (p, retryState) -> { - Reference refRefs = requireNonNull(p.fetchReference(REF_REFS.name())); + Reference refRefs = requireNonNull(p.fetchReferenceForUpdate(REF_REFS.name())); RefObj ref = ref(name, pointer, refCreatedTimestamp, extendedInfoObj); try { p.storeObj(ref); @@ -512,7 +524,7 @@ CommitReferenceResult commitCreateReference( StoreIndexElement el = indexSupplier.get().index().get(key(name)); checkNotNull(el, "Key %s missing in index", name); - Reference existing = persist.fetchReference(name); + Reference existing = persist.fetchReferenceForUpdate(name); if (existing != null) { return new CommitReferenceResult(reference, existing, REF_ROW_EXISTS); @@ -550,7 +562,7 @@ void commitDeleteReference(Reference reference, ObjId expectedRefRefsHead) commitRetry( persist, (p, retryState) -> { - Reference refRefs = requireNonNull(p.fetchReference(REF_REFS.name())); + Reference refRefs = requireNonNull(p.fetchReferenceForUpdate(REF_REFS.name())); if (expectedRefRefsHead != null && !refRefs.pointer().equals(expectedRefRefsHead)) { throw new RuntimeException(REF_REFS_ADVANCED); } @@ -774,7 +786,7 @@ private Reference maybeRecover( } private boolean refRefsOutOfDate(SuppliedCommitIndex index) { - Reference refRefs = persist.fetchReference(REF_REFS.name()); + Reference refRefs = persist.fetchReferenceForUpdate(REF_REFS.name()); return !index.pointer().equals(requireNonNull(refRefs).pointer()); } diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/RepositoryLogicImpl.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/RepositoryLogicImpl.java index e2fb9bc82fa..67994b4741d 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/RepositoryLogicImpl.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/RepositoryLogicImpl.java @@ -96,7 +96,7 @@ public void initialize( @Override public boolean repositoryExists() { try { - Reference ref = persist.fetchReference(REF_REPO.name()); + Reference ref = persist.fetchReferenceForUpdate(REF_REPO.name()); if (ref == null) { return false; } @@ -207,7 +207,8 @@ public RepositoryDescription updateRepositoryDescription(RepositoryDescription n persist, (p, retryState) -> { try { - Reference reference = requireNonNull(persist.fetchReference(REF_REPO.name())); + Reference reference = + requireNonNull(persist.fetchReferenceForUpdate(REF_REPO.name())); return stringLogic(persist) .updateStringOnRef( reference, @@ -237,7 +238,7 @@ public RepositoryDescription updateRepositoryDescription(RepositoryDescription n @SuppressWarnings({"JavaTimeDefaultTimeZone"}) private void initializeInternalRef( InternalRef internalRef, Consumer commitEnhancer) { - Reference reference = persist.fetchReference(internalRef.name()); + Reference reference = persist.fetchReferenceForUpdate(internalRef.name()); if (reference == null) { CreateCommit.Builder c = diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/ObservingPersist.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/ObservingPersist.java index 14b6d32da19..4c3d14dbee7 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/ObservingPersist.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/ObservingPersist.java @@ -120,6 +120,24 @@ public Reference[] fetchReferences(@Nonnull String[] names) { return delegate.fetchReferences(names); } + @WithSpan + @Override + @Counted(PREFIX) + @Timed(value = PREFIX, histogram = true) + @Nullable + public Reference fetchReferenceForUpdate(@Nonnull String name) { + return delegate.fetchReferenceForUpdate(name); + } + + @WithSpan + @Override + @Counted(PREFIX) + @Timed(value = PREFIX, histogram = true) + @Nonnull + public Reference[] fetchReferencesForUpdate(@Nonnull String[] names) { + return delegate.fetchReferencesForUpdate(names); + } + @WithSpan @Override @Counted(PREFIX) diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/Persist.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/Persist.java index b48ba6f672b..e4a275d8bba 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/Persist.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/persist/Persist.java @@ -120,23 +120,57 @@ Reference updateReferencePointer(@Nonnull Reference reference, @Nonnull ObjId ne *

Do not use this function from service implementations, use {@link ReferenceLogic} * instead! * + *

This function leverages the references-cache, if enabled, and must only be used for read + * operations. Updating operations must use {@link #fetchReferenceForUpdate(String)}. + * + *

Database specific implementations of {@link Persist} must implement this function, without + * caching. + * * @return the reference or {@code null}, if it does not exist */ @Nullable Reference fetchReference(@Nonnull String name); + /** + * This is similar to {@link #fetchReference(String)}, but does always fetch the reference from + * the backend, refreshing the references-cache. + * + *

Database specific implementations of {@link Persist} must not implement this function. + */ + @Nullable + default Reference fetchReferenceForUpdate(@Nonnull String name) { + return fetchReference(name); + } + /** * Like {@link #fetchReference(String)}, but finds multiple references by name at once, leveraging * bulk queries against databases. * *

Non-existing references are returned as {@code null} elements in the returned array. * + *

This function leverages the references-cache, if enabled, and must only be used for read + * operations. Updating operations must use {@link #fetchReferencesForUpdate(String[])}. + * + *

Database specific implementations of {@link Persist} must implement this function, without + * caching. + * *

Do not use this function from service implementations, use {@link ReferenceLogic} * instead! */ @Nonnull Reference[] fetchReferences(@Nonnull String[] names); + /** + * This is similar to #fetchReferences(String[]), but does always fetch the reference from the + * backend, refreshing the references-cache. + * + *

Database specific implementations of {@link Persist} must not implement this function. + */ + @Nonnull + default Reference[] fetchReferencesForUpdate(@Nonnull String[] names) { + return fetchReferences(names); + } + // Objects /** diff --git a/versioned/storage/store/build.gradle.kts b/versioned/storage/store/build.gradle.kts index 3c6b2e5001d..5b8a1018751 100644 --- a/versioned/storage/store/build.gradle.kts +++ b/versioned/storage/store/build.gradle.kts @@ -51,6 +51,7 @@ dependencies { compileOnly("com.fasterxml.jackson.core:jackson-annotations") testImplementation(project(":nessie-server-store")) + testImplementation(project(":nessie-versioned-storage-cache")) testImplementation(project(":nessie-versioned-storage-common-tests")) testImplementation(project(":nessie-versioned-storage-inmemory")) testImplementation(project(":nessie-versioned-storage-testextension")) diff --git a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/BaseCommitHelper.java b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/BaseCommitHelper.java index 0d2380496fb..a9b0c06f9cd 100644 --- a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/BaseCommitHelper.java +++ b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/BaseCommitHelper.java @@ -208,7 +208,7 @@ static R committingOperation( RefMapping refMapping = new RefMapping(p); Reference reference; try { - reference = refMapping.resolveNamedRef(branch); + reference = refMapping.resolveNamedRefForUpdate(branch); } catch (ReferenceNotFoundException e) { throw new CommitWrappedException(e); } diff --git a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/RefMapping.java b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/RefMapping.java index e95892535ec..ea47cc7073a 100644 --- a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/RefMapping.java +++ b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/RefMapping.java @@ -246,6 +246,10 @@ CommitObj resolveRefHead(@Nonnull Ref ref) throws ReferenceNotFoundException { throw new IllegalArgumentException("Unsupported ref type, got " + ref); } + CommitObj resolveRefHeadForUpdate(@Nonnull NamedRef namedRef) throws ReferenceNotFoundException { + return resolveNamedRefHead(resolveNamedRefForUpdate(namedRef)); + } + CommitObj resolveNamedRefHead(@Nonnull NamedRef namedRef) throws ReferenceNotFoundException { return resolveNamedRefHead(resolveNamedRef(namedRef)); } @@ -269,6 +273,18 @@ public Reference resolveNamedRef(@Nonnull NamedRef namedRef) throws ReferenceNot } } + @Nonnull + public Reference resolveNamedRefForUpdate(@Nonnull NamedRef namedRef) + throws ReferenceNotFoundException { + String refName = namedRefToRefName(namedRef); + ReferenceLogic referenceLogic = referenceLogic(persist); + try { + return referenceLogic.getReferenceForUpdate(refName); + } catch (RefNotFoundException e) { + throw referenceNotFound(namedRef); + } + } + public Reference resolveNamedRef(@Nonnull String refName) throws ReferenceNotFoundException { ReferenceLogic referenceLogic = referenceLogic(persist); List refs = diff --git a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/RepositoryConfigBackend.java b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/RepositoryConfigBackend.java index aa05e68adb7..8f1aac50769 100644 --- a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/RepositoryConfigBackend.java +++ b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/RepositoryConfigBackend.java @@ -74,7 +74,7 @@ class RepositoryConfigBackend { public List getConfigs(Set repositoryConfigTypes) { try { Persist p = persist; - Reference reference = configsRef(); + Reference reference = configsRef(false); IndexesLogic indexesLogic = indexesLogic(p); CommitObj head = commitLogic(p).headCommit(reference); StoreIndex index = indexesLogic.buildCompleteIndexOrEmpty(head); @@ -117,7 +117,7 @@ public RepositoryConfig updateConfig(RepositoryConfig repositoryConfig) (p, retryState) -> { Reference reference; try { - reference = configsRef(); + reference = configsRef(true); } catch (RetryTimeoutException ex) { throw new CommitWrappedException(new CommitRetry.RetryException(Optional.empty())); } @@ -175,11 +175,14 @@ private static RepositoryConfig deserialize(StringValue value) { } /** Retrieves the configs-reference, creates the reference, if it does not exist. */ - private Reference configsRef() throws RetryTimeoutException { + private Reference configsRef(boolean bypassCache) throws RetryTimeoutException { ReferenceLogic referenceLogic = referenceLogic(persist); Reference reference; try { - reference = referenceLogic.getReference(REPO_CONFIG_REF); + reference = + bypassCache + ? referenceLogic.getReferenceForUpdate(REPO_CONFIG_REF) + : referenceLogic.getReference(REPO_CONFIG_REF); } catch (RefNotFoundException e) { try { reference = referenceLogic.createReference(REPO_CONFIG_REF, ObjId.EMPTY_OBJ_ID, null); diff --git a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java index 0480cb86794..92edc67a099 100644 --- a/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java +++ b/versioned/storage/store/src/main/java/org/projectnessie/versioned/storage/versionstore/VersionStoreImpl.java @@ -224,7 +224,7 @@ public ReferenceCreatedResult create(NamedRef namedRef, Optional targetHas ? asBranchName(namedRef.getName()) : asTagName(namedRef.getName()); try { - referenceLogic.getReference(mustNotExist); + referenceLogic.getReferenceForUpdate(mustNotExist); // A tag with the same name as the branch being created (or a branch with the same name // as the tag being created) already exists. throw referenceAlreadyExists(namedRef); @@ -252,7 +252,7 @@ public ReferenceAssignedResult assign(NamedRef namedRef, Hash expectedHash, Hash ReferenceLogic referenceLogic = referenceLogic(persist); Reference expected; try { - expected = referenceLogic.getReference(refName); + expected = referenceLogic.getReferenceForUpdate(refName); } catch (RefNotFoundException e) { throw referenceNotFound(namedRef); } @@ -323,7 +323,7 @@ public ReferenceDeletedResult delete(NamedRef namedRef, Hash hash) throw referenceNotFound(namedRef); } catch (RefConditionFailedException e) { RefMapping refMapping = new RefMapping(persist); - CommitObj headCommit = refMapping.resolveRefHead(namedRef); + CommitObj headCommit = refMapping.resolveRefHeadForUpdate(namedRef); throw referenceConflictException( namedRef, objIdToHash(expected), headCommit != null ? headCommit.id() : EMPTY_OBJ_ID); } catch (RetryTimeoutException e) { diff --git a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/PersistDelegate.java b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/PersistDelegate.java index b053a001d70..d7b471417ec 100644 --- a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/PersistDelegate.java +++ b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/PersistDelegate.java @@ -96,12 +96,23 @@ public Reference fetchReference(@Nonnull String name) { return delegate.fetchReference(name); } + @Override + public Reference fetchReferenceForUpdate(@Nonnull String name) { + return delegate.fetchReferenceForUpdate(name); + } + @Override @Nonnull public Reference[] fetchReferences(@Nonnull String[] names) { return delegate.fetchReferences(names); } + @Override + @Nonnull + public Reference[] fetchReferencesForUpdate(@Nonnull String[] names) { + return delegate.fetchReferencesForUpdate(names); + } + @Override @Nonnull public Obj fetchObj(@Nonnull ObjId id) throws ObjNotFoundException { diff --git a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestRefMapping.java b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestRefMapping.java index 91020d84e22..c44e97c9e15 100644 --- a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestRefMapping.java +++ b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestRefMapping.java @@ -398,13 +398,23 @@ public void resolveRefs() throws Exception { soft.assertThat(refMapping.resolveNamedRef("tag")).isEqualTo(tag); soft.assertThat(refMapping.resolveNamedRef(BranchName.of("branch"))).isEqualTo(branch); soft.assertThat(refMapping.resolveNamedRef(TagName.of("tag"))).isEqualTo(tag); + soft.assertThat(refMapping.resolveNamedRefForUpdate(BranchName.of("branch"))).isEqualTo(branch); + soft.assertThat(refMapping.resolveNamedRefForUpdate(TagName.of("tag"))).isEqualTo(tag); soft.assertThat(refMapping.resolveNamedRef("empty")).isEqualTo(emptyBranch); soft.assertThat(refMapping.resolveNamedRef(BranchName.of("empty"))).isEqualTo(emptyBranch); soft.assertThat(refMapping.resolveNamedRef(TagName.of("empty"))).isEqualTo(emptyTag); + soft.assertThat(refMapping.resolveNamedRefForUpdate(BranchName.of("empty"))) + .isEqualTo(emptyBranch); + soft.assertThat(refMapping.resolveNamedRefForUpdate(TagName.of("empty"))).isEqualTo(emptyTag); soft.assertThatThrownBy(() -> refMapping.resolveNamedRef("does-not-exist")) .isInstanceOf(ReferenceNotFoundException.class); + soft.assertThatThrownBy( + () -> refMapping.resolveNamedRefForUpdate(BranchName.of("does-not-exist"))) + .isInstanceOf(ReferenceNotFoundException.class); + soft.assertThatThrownBy(() -> refMapping.resolveNamedRefForUpdate(TagName.of("does-not-exist"))) + .isInstanceOf(ReferenceNotFoundException.class); soft.assertThat(refMapping.resolveNamedRefHead(branch)) .isEqualTo(persist.fetchTypedObj(commitId, COMMIT, CommitObj.class)); @@ -423,6 +433,11 @@ public void resolveRefs() throws Exception { soft.assertThat(refMapping.resolveRefHead(Hash.of(commitId.toString()))) .isEqualTo(persist.fetchTypedObj(commitId, COMMIT, CommitObj.class)); + soft.assertThat(refMapping.resolveRefHeadForUpdate(BranchName.of("branch"))) + .isEqualTo(persist.fetchTypedObj(commitId, COMMIT, CommitObj.class)); + soft.assertThat(refMapping.resolveRefHeadForUpdate(TagName.of("tag"))) + .isEqualTo(persist.fetchTypedObj(commitId, COMMIT, CommitObj.class)); + soft.assertThat(refMapping.resolveNamedRefHead(emptyBranch)).isNull(); soft.assertThat(refMapping.resolveNamedRefHead(emptyTag)).isNull(); soft.assertThatThrownBy(() -> refMapping.resolveNamedRefHead(notThere)) diff --git a/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreReferenceCaching.java b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreReferenceCaching.java new file mode 100644 index 00000000000..ecb5e6ce01e --- /dev/null +++ b/versioned/storage/store/src/test/java/org/projectnessie/versioned/storage/versionstore/TestVersionStoreReferenceCaching.java @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.versionstore; + +import static java.util.Collections.singletonList; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.versioned.BranchName; +import org.projectnessie.versioned.Commit; +import org.projectnessie.versioned.CommitResult; +import org.projectnessie.versioned.GetNamedRefsParams; +import org.projectnessie.versioned.Put; +import org.projectnessie.versioned.ReferenceInfo; +import org.projectnessie.versioned.VersionStore; +import org.projectnessie.versioned.storage.cache.CacheConfig; +import org.projectnessie.versioned.storage.cache.PersistCaches; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.PersistExtension; + +@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) +public class TestVersionStoreReferenceCaching { + @InjectSoftAssertions protected SoftAssertions soft; + + Persist wrapWithCache(Persist persist, LongSupplier clockNanos) { + return PersistCaches.newBackend( + CacheConfig.builder() + .capacityMb(32) + .clockNanos(clockNanos) + .referenceTtl(Duration.ofMinutes(1)) + .referenceNegativeTtl(Duration.ofSeconds(1)) + .build()) + .wrap(persist); + } + + // Two caching `Persist` instances, using _independent_ cache backends. + Persist withCache1; + Persist withCache2; + + AtomicLong nowNanos; + + protected VersionStore store1; + protected VersionStore store2; + + @BeforeEach + void wrapCaches(@NessiePersist Persist persist1, @NessiePersist Persist persist2) { + nowNanos = new AtomicLong(); + withCache1 = wrapWithCache(persist1, nowNanos::get); + withCache2 = wrapWithCache(persist2, nowNanos::get); + + store1 = ValidatingVersionStoreImpl.of(soft, withCache1); + store2 = ValidatingVersionStoreImpl.of(soft, withCache2); + } + + @Test + public void referenceCachingConcurrentCommit() throws Exception { + BranchName main = BranchName.of("main"); + + ReferenceInfo head1 = + store1.getNamedRef(main.getName(), GetNamedRefsParams.DEFAULT); + ReferenceInfo head2 = + store2.getNamedRef(main.getName(), GetNamedRefsParams.DEFAULT); + soft.assertThat(head2).isEqualTo(head1); + + CommitResult committed1 = + store1.commit( + main, + Optional.of(head1.getHash()), + CommitMeta.fromMessage("commit via 1"), + singletonList(Put.of(ContentKey.of("table1"), IcebergTable.of("/foo", 1, 2, 3, 43)))); + + soft.assertThat(committed1.getCommit().getParentHash()).isEqualTo(head1.getHash()); + + ReferenceInfo head1afterCommit1 = + store1.getNamedRef(main.getName(), GetNamedRefsParams.DEFAULT); + ReferenceInfo head2afterCommit1 = + store2.getNamedRef(main.getName(), GetNamedRefsParams.DEFAULT); + soft.assertThat(head1afterCommit1.getHash()) + .isEqualTo(committed1.getCommitHash()) + .isNotEqualTo(head1.getHash()); + soft.assertThat(head2afterCommit1).isEqualTo(head2); + + CommitResult committed2 = + store2.commit( + main, + Optional.of(head2afterCommit1.getHash()), + CommitMeta.fromMessage("commit via 2"), + singletonList(Put.of(ContentKey.of("table2"), IcebergTable.of("/foo", 1, 2, 3, 43)))); + + soft.assertThat(committed2.getCommit().getParentHash()) + .isEqualTo(committed1.getCommit().getHash()); + + ReferenceInfo head1afterCommit2 = + store1.getNamedRef(main.getName(), GetNamedRefsParams.DEFAULT); + ReferenceInfo head2afterCommit2 = + store2.getNamedRef(main.getName(), GetNamedRefsParams.DEFAULT); + soft.assertThat(head1afterCommit2).isEqualTo(head1afterCommit1); + soft.assertThat(head2afterCommit2.getHash()) + .isEqualTo(committed2.getCommitHash()) + .isNotEqualTo(head1afterCommit2.getHash()); + + // + + nowNanos.addAndGet(Duration.ofMinutes(5).toNanos()); + + ReferenceInfo head1afterExpiry = + store1.getNamedRef(main.getName(), GetNamedRefsParams.DEFAULT); + ReferenceInfo head2afterExpiry = + store2.getNamedRef(main.getName(), GetNamedRefsParams.DEFAULT); + + soft.assertThat(head1afterExpiry).isEqualTo(head2afterExpiry).isEqualTo(head2afterCommit2); + } +} diff --git a/versioned/storage/testextension/src/main/java/org/projectnessie/versioned/storage/testextension/ClassPersistInstances.java b/versioned/storage/testextension/src/main/java/org/projectnessie/versioned/storage/testextension/ClassPersistInstances.java index 25658f88dd0..ae2e349b289 100644 --- a/versioned/storage/testextension/src/main/java/org/projectnessie/versioned/storage/testextension/ClassPersistInstances.java +++ b/versioned/storage/testextension/src/main/java/org/projectnessie/versioned/storage/testextension/ClassPersistInstances.java @@ -19,6 +19,7 @@ import static org.projectnessie.versioned.storage.testextension.PersistExtension.KEY_REUSABLE_BACKEND; import static org.projectnessie.versioned.storage.testextension.PersistExtension.NAMESPACE; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -55,7 +56,11 @@ final class ClassPersistInstances { cacheBackend = nessiePersistCache != null && nessiePersistCache.capacityMb() >= 0 ? PersistCaches.newBackend( - CacheConfig.builder().capacityMb(nessiePersistCache.capacityMb()).build()) + CacheConfig.builder() + .capacityMb(nessiePersistCache.capacityMb()) + .referenceTtl(Duration.ofMinutes(1)) + .referenceNegativeTtl(Duration.ofMinutes(1)) + .build()) : null; backendTestFactory = reusableTestBackend.backendTestFactory(context); diff --git a/versioned/transfer/src/testFixtures/java/org/projectnessie/versioned/transfer/AbstractExportImport.java b/versioned/transfer/src/testFixtures/java/org/projectnessie/versioned/transfer/AbstractExportImport.java index cd1b78ca5a0..1706044c7fe 100644 --- a/versioned/transfer/src/testFixtures/java/org/projectnessie/versioned/transfer/AbstractExportImport.java +++ b/versioned/transfer/src/testFixtures/java/org/projectnessie/versioned/transfer/AbstractExportImport.java @@ -308,12 +308,33 @@ public Reference fetchReference(@Nonnull String name) { return inmemory.fetchReference(name); } + @Nullable + @Override + public Reference fetchReferenceForUpdate(@Nonnull String name) { + if (name.startsWith("refs/heads/branch-")) { + int refNum = parseInt(name.substring("refs/heads/branch-".length())); + return reference( + name, + intToObjId(refNum), + false, + TimeUnit.NANOSECONDS.toMicros(System.nanoTime()), + null); + } + return inmemory.fetchReferenceForUpdate(name); + } + @Nonnull @Override public Reference[] fetchReferences(@Nonnull String[] names) { return inmemory.fetchReferences(names); } + @Nonnull + @Override + public Reference[] fetchReferencesForUpdate(@Nonnull String[] names) { + return fetchReferences(names); + } + @Nonnull @Override public Obj fetchObj(@Nonnull ObjId id) throws ObjNotFoundException {