Skip to content

Commit

Permalink
Use the new Vertx local context storage
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed May 24, 2024
1 parent 5d72d9b commit 5979b09
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.quarkus.vertx.core.runtime;

import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;

import io.vertx.core.Context;
import io.vertx.core.spi.context.storage.AccessMode;

final class QuarkusAccessModes {

/**
* Beware this {@link AccessMode#getOrCreate(AtomicReferenceArray, int, Supplier)} because, differently from
* {@link io.vertx.core.spi.context.storage.ContextLocal#get(Context, Supplier)},
* is not suitable to be used with {@link io.vertx.core.spi.context.storage.ContextLocal#get(Context, AccessMode, Supplier)}
* with the same guarantees of atomicity i.e. the supplier can get called more than once by different racing threads!
*/
public static final AccessMode ACQUIRE_RELEASE = new AccessMode() {
@Override
public Object get(AtomicReferenceArray<Object> locals, int idx) {
return locals.getAcquire(idx);
}

@Override
public void put(AtomicReferenceArray<Object> locals, int idx, Object value) {
// This is still ensuring visibility across threads and happens-before,
// but won't impose setVolatile total ordering i.e. StoreLoad barriers after write
// to make it faster
locals.setRelease(idx, value);
}

@Override
public Object getOrCreate(AtomicReferenceArray<Object> locals, int idx, Supplier<Object> initialValueSupplier) {
Object value = locals.getAcquire(idx);
if (value == null) {
value = initialValueSupplier.get();
locals.setRelease(idx, value);
}
return value;
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -46,7 +44,6 @@
import io.quarkus.vertx.core.runtime.config.ClusterConfiguration;
import io.quarkus.vertx.core.runtime.config.EventBusConfiguration;
import io.quarkus.vertx.core.runtime.config.VertxConfiguration;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.vertx.mdc.provider.LateBoundMDCProvider;
import io.quarkus.vertx.runtime.VertxCurrentContextFactory;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -584,15 +581,7 @@ public void runWith(Runnable task, Object context) {
// The CDI contexts must not be propagated
// First test if VertxCurrentContextFactory is actually used
if (currentContextFactory != null) {
List<String> keys = currentContextFactory.keys();
ConcurrentMap<Object, Object> local = vertxContext.localContextData();
if (containsScopeKey(keys, local)) {
// Duplicate the context, copy the data, remove the request context
vertxContext = vertxContext.duplicate();
vertxContext.localContextData().putAll(local);
keys.forEach(vertxContext.localContextData()::remove);
VertxContextSafetyToggle.setContextSafe(vertxContext, true);
}
vertxContext = currentContextFactory.duplicateContextIfContainsAnyCreatedScopeKeys(vertxContext);
}
vertxContext.beginDispatch();
try {
Expand All @@ -604,23 +593,6 @@ public void runWith(Runnable task, Object context) {
task.run();
}
}

private boolean containsScopeKey(List<String> keys, Map<Object, Object> localContextData) {
if (keys.isEmpty()) {
return false;
}
if (keys.size() == 1) {
// Very often there will be only one key used
return localContextData.containsKey(keys.get(0));
} else {
for (String key : keys) {
if (localContextData.containsKey(key)) {
return true;
}
}
}
return false;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.ContextLocalImpl;
import io.vertx.core.spi.context.storage.ContextLocal;

public class VertxLocalsHelper {

Expand All @@ -22,6 +24,11 @@ public static void throwOnRootContextAccess() {
public static <T> T getLocal(ContextInternal context, Object key) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
if (key instanceof ContextLocalImpl<?>) {
var localKey = (ContextLocal<T>) key;
return (T) context.getLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE);
}
assert !(key instanceof ContextLocal<?>);
return (T) context.localContextData().get(key);
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
Expand All @@ -31,7 +38,13 @@ public static <T> T getLocal(ContextInternal context, Object key) {
public static void putLocal(ContextInternal context, Object key, Object value) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
context.localContextData().put(key, value);
if (key instanceof ContextLocalImpl<?>) {
var localKey = (ContextLocal<Object>) key;
context.putLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE, value);
} else {
assert !(key instanceof ContextLocal<?>);
context.localContextData().put(key, value);
}
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
}
Expand All @@ -40,6 +53,15 @@ public static void putLocal(ContextInternal context, Object key, Object value) {
public static boolean removeLocal(ContextInternal context, Object key) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
if (key instanceof ContextLocalImpl<?>) {
var localKey = (ContextLocal<Object>) key;
if (localKey == null) {
return false;
}
context.removeLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE);
return true;
}
assert !(key instanceof ContextLocal<?>);
return context.localContextData().remove(key) != null;
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.quarkus.vertx.core.runtime.context;

import static io.quarkus.vertx.runtime.storage.QuarkusLocalStorageKeyVertxServiceProvider.ACCESS_TOGGLE_KEY;

import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.spi.context.storage.ContextLocal;

/**
* This is meant for other extensions to integrate with, to help
Expand Down Expand Up @@ -39,7 +42,6 @@
*/
public final class VertxContextSafetyToggle {

private static final Object ACCESS_TOGGLE_KEY = new Object();
public static final String UNRESTRICTED_BY_DEFAULT_PROPERTY = "io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.UNRESTRICTED_BY_DEFAULT";

/**
Expand All @@ -50,6 +52,12 @@ public final class VertxContextSafetyToggle {
private static final boolean UNRESTRICTED_BY_DEFAULT = Boolean.getBoolean(UNRESTRICTED_BY_DEFAULT_PROPERTY);
private static final boolean FULLY_DISABLED = Boolean.getBoolean(FULLY_DISABLE_PROPERTY);

public static ContextLocal<Boolean> registerAccessToggleKey() {
if (FULLY_DISABLED)
return null;
return ContextLocal.registerLocal(Boolean.class);
}

/**
* Verifies if the current Vert.x context was flagged as safe
* to be accessed by components which expect non-concurrent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package io.quarkus.vertx.runtime;

import static io.quarkus.vertx.runtime.storage.QuarkusLocalStorageKeyVertxServiceProvider.REQUEST_SCOPED_LOCAL_KEY;

import java.lang.annotation.Annotation;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.enterprise.context.RequestScoped;

import io.netty.util.concurrent.FastThreadLocal;
import io.quarkus.arc.CurrentContext;
Expand All @@ -14,23 +19,31 @@
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

public class VertxCurrentContextFactory implements CurrentContextFactory {

private static final String LOCAL_KEY_PREFIX = "io.quarkus.vertx.cdi-current-context";

private final List<String> keys;
private final List<String> unmodifiableKeys;
private final AtomicBoolean requestScopedKeyCreated;

public VertxCurrentContextFactory() {
// There will be only a few mutative operations max
this.keys = new CopyOnWriteArrayList<>();
// We do not want to allocate a new object for each VertxCurrentContextFactory#keys() invocation
this.unmodifiableKeys = Collections.unmodifiableList(keys);
this.requestScopedKeyCreated = new AtomicBoolean();
}

@Override
public <T extends InjectableContext.ContextState> CurrentContext<T> create(Class<? extends Annotation> scope) {
if (scope == RequestScoped.class) {
if (!requestScopedKeyCreated.compareAndSet(false, true)) {
throw new IllegalStateException(
"Multiple current contexts for the same scope are not supported. Current context for "
+ scope + " already exists!");
}
return new VertxCurrentContext<T>(REQUEST_SCOPED_LOCAL_KEY);
}
String key = LOCAL_KEY_PREFIX + scope.getName();
if (keys.contains(key)) {
throw new IllegalStateException(
Expand All @@ -41,20 +54,52 @@ public <T extends InjectableContext.ContextState> CurrentContext<T> create(Class
return new VertxCurrentContext<>(key);
}

/**
*
* @return an unmodifiable list of used keys
*/
public List<String> keys() {
return unmodifiableKeys;
public ContextInternal duplicateContextIfContainsAnyCreatedScopeKeys(ContextInternal vertxContext) {
if (!containsAnyCreatedScopeKeys(vertxContext)) {
return vertxContext;
}
// Duplicate the context, copy the data, remove the request context
var duplicateCtx = vertxContext.duplicate();
// TODO this is not copying any ContextLocal<?> from the original context to the new one!
var duplicateCtxData = duplicateCtx.localContextData();
duplicateCtxData.putAll(vertxContext.localContextData());
keys.forEach(duplicateCtxData::remove);
if (requestScopedKeyCreated.get()) {
duplicateCtx.removeLocal(REQUEST_SCOPED_LOCAL_KEY);
}
VertxContextSafetyToggle.setContextSafe(duplicateCtx, true);
return duplicateCtx;
}

private boolean containsAnyCreatedScopeKeys(ContextInternal vertxContext) {
boolean requestScopedKeyCreated = this.requestScopedKeyCreated.get();
if (requestScopedKeyCreated && vertxContext.getLocal(REQUEST_SCOPED_LOCAL_KEY) != null) {
return true;
}
if (keys.isEmpty()) {
return false;
}
ConcurrentMap<Object, Object> local = vertxContext.localContextData();
if (keys.size() == 1) {
// Very often there will be only one key used
return local.containsKey(keys.get(0));
} else {
for (String key : keys) {
if (local.containsKey(key)) {
return true;
}
}
}
return false;
}

private static final class VertxCurrentContext<T extends ContextState> implements CurrentContext<T> {

private final String key;
private final FastThreadLocal<T> fallback = new FastThreadLocal<>();
// It allows to use both ContextLocalImpl and String keys
private final Object key;
private volatile FastThreadLocal<T> fallback;

private VertxCurrentContext(String key) {
private VertxCurrentContext(Object key) {
this.key = key;
}

Expand All @@ -64,7 +109,24 @@ public T get() {
if (context != null && VertxContext.isDuplicatedContext(context)) {
return context.getLocal(key);
}
return fallback.get();
return fallback().get();
}

private FastThreadLocal<T> fallback() {
var fallback = this.fallback;
if (fallback == null) {
fallback = getOrCreateFallback();
}
return fallback;
}

private synchronized FastThreadLocal<T> getOrCreateFallback() {
var fallback = this.fallback;
if (fallback == null) {
fallback = new FastThreadLocal<>();
this.fallback = fallback;
}
return fallback;
}

@Override
Expand All @@ -80,7 +142,7 @@ public void set(T state) {
}

} else {
fallback.set(state);
fallback().set(state);
}
}

Expand All @@ -91,7 +153,7 @@ public void remove() {
// NOOP - the DC should not be shared.
// context.removeLocal(key);
} else {
fallback.remove();
fallback().remove();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.vertx.runtime.storage;

import io.quarkus.arc.InjectableContext;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.vertx.core.impl.VertxBuilder;
import io.vertx.core.spi.VertxServiceProvider;
import io.vertx.core.spi.context.storage.ContextLocal;

/**
* This provider exists with the sole purpose of reliably get the optimized local keys created before
* the Vertx instance is created.
*/
public class QuarkusLocalStorageKeyVertxServiceProvider implements VertxServiceProvider {

public static final ContextLocal<Boolean> ACCESS_TOGGLE_KEY = VertxContextSafetyToggle.registerAccessToggleKey();
public static final ContextLocal<InjectableContext.ContextState> REQUEST_SCOPED_LOCAL_KEY = ContextLocal
.registerLocal(InjectableContext.ContextState.class);

@Override
public void init(VertxBuilder builder) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.quarkus.vertx.runtime.storage.QuarkusLocalStorageKeyVertxServiceProvider

0 comments on commit 5979b09

Please sign in to comment.