Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the new Vertx local context storage #40725

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.quarkus.vertx.core.runtime;

import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;

import io.vertx.core.Context;
import io.vertx.core.spi.context.storage.AccessMode;

final class QuarkusAccessModes {

/**
* Beware this {@link AccessMode#getOrCreate(AtomicReferenceArray, int, Supplier)} because, differently from
* {@link io.vertx.core.spi.context.storage.ContextLocal#get(Context, Supplier)},
* is not suitable to be used with {@link io.vertx.core.spi.context.storage.ContextLocal#get(Context, AccessMode, Supplier)}
* with the same guarantees of atomicity i.e. the supplier can get called more than once by different racing threads!
*/
public static final AccessMode ACQUIRE_RELEASE = new AccessMode() {
@Override
public Object get(AtomicReferenceArray<Object> locals, int idx) {
return locals.getAcquire(idx);
}

@Override
public void put(AtomicReferenceArray<Object> locals, int idx, Object value) {
// This is still ensuring visibility across threads and happens-before,
// but won't impose setVolatile total ordering i.e. StoreLoad barriers after write
// to make it faster
locals.setRelease(idx, value);
}

@Override
public Object getOrCreate(AtomicReferenceArray<Object> locals, int idx, Supplier<Object> initialValueSupplier) {
Object value = locals.getAcquire(idx);
if (value == null) {
value = initialValueSupplier.get();
locals.setRelease(idx, value);
}
return value;
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -46,7 +44,6 @@
import io.quarkus.vertx.core.runtime.config.ClusterConfiguration;
import io.quarkus.vertx.core.runtime.config.EventBusConfiguration;
import io.quarkus.vertx.core.runtime.config.VertxConfiguration;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.vertx.mdc.provider.LateBoundMDCProvider;
import io.quarkus.vertx.runtime.VertxCurrentContextFactory;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -584,15 +581,7 @@ public void runWith(Runnable task, Object context) {
// The CDI contexts must not be propagated
// First test if VertxCurrentContextFactory is actually used
if (currentContextFactory != null) {
List<String> keys = currentContextFactory.keys();
ConcurrentMap<Object, Object> local = vertxContext.localContextData();
if (containsScopeKey(keys, local)) {
// Duplicate the context, copy the data, remove the request context
vertxContext = vertxContext.duplicate();
vertxContext.localContextData().putAll(local);
keys.forEach(vertxContext.localContextData()::remove);
VertxContextSafetyToggle.setContextSafe(vertxContext, true);
}
vertxContext = currentContextFactory.duplicateContextIfContainsAnyCreatedScopeKeys(vertxContext);
}
vertxContext.beginDispatch();
try {
Expand All @@ -604,23 +593,6 @@ public void runWith(Runnable task, Object context) {
task.run();
}
}

private boolean containsScopeKey(List<String> keys, Map<Object, Object> localContextData) {
if (keys.isEmpty()) {
return false;
}
if (keys.size() == 1) {
// Very often there will be only one key used
return localContextData.containsKey(keys.get(0));
} else {
for (String key : keys) {
if (localContextData.containsKey(key)) {
return true;
}
}
}
return false;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.ContextLocalImpl;
import io.vertx.core.spi.context.storage.ContextLocal;

public class VertxLocalsHelper {

Expand All @@ -22,6 +24,11 @@ public static void throwOnRootContextAccess() {
public static <T> T getLocal(ContextInternal context, Object key) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
if (key instanceof ContextLocalImpl<?>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cescoffier I will probably add the new ContextLocal-based methods to avoid this terrible hack .-.
I made it in order to have the existing code in the toggle to just use the now-deprecated methods and still get the benefit of the new API, but given that these methods are going to disappear to favour the new API, I'll probably tackle this on the current PR already, wdyt?

var localKey = (ContextLocal<T>) key;
return (T) context.getLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE);
}
assert !(key instanceof ContextLocal<?>);
return (T) context.localContextData().get(key);
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
Expand All @@ -31,7 +38,13 @@ public static <T> T getLocal(ContextInternal context, Object key) {
public static void putLocal(ContextInternal context, Object key, Object value) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
context.localContextData().put(key, value);
franz1981 marked this conversation as resolved.
Show resolved Hide resolved
if (key instanceof ContextLocalImpl<?>) {
var localKey = (ContextLocal<Object>) key;
context.putLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE, value);
} else {
assert !(key instanceof ContextLocal<?>);
context.localContextData().put(key, value);
}
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
}
Expand All @@ -40,6 +53,15 @@ public static void putLocal(ContextInternal context, Object key, Object value) {
public static boolean removeLocal(ContextInternal context, Object key) {
if (VertxContext.isDuplicatedContext(context)) {
// We are on a duplicated context, allow accessing the locals
if (key instanceof ContextLocalImpl<?>) {
var localKey = (ContextLocal<Object>) key;
if (localKey == null) {
return false;
}
context.removeLocal(localKey, QuarkusAccessModes.ACQUIRE_RELEASE);
return true;
}
assert !(key instanceof ContextLocal<?>);
return context.localContextData().remove(key) != null;
} else {
throw new UnsupportedOperationException(ILLEGAL_ACCESS_TO_LOCAL_CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.quarkus.vertx.core.runtime.context;

import static io.quarkus.vertx.runtime.storage.QuarkusLocalStorageKeyVertxServiceProvider.ACCESS_TOGGLE_KEY;

import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.spi.context.storage.ContextLocal;

/**
* This is meant for other extensions to integrate with, to help
Expand Down Expand Up @@ -39,7 +42,6 @@
*/
public final class VertxContextSafetyToggle {

private static final Object ACCESS_TOGGLE_KEY = new Object();
public static final String UNRESTRICTED_BY_DEFAULT_PROPERTY = "io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.UNRESTRICTED_BY_DEFAULT";

/**
Expand All @@ -50,6 +52,12 @@ public final class VertxContextSafetyToggle {
private static final boolean UNRESTRICTED_BY_DEFAULT = Boolean.getBoolean(UNRESTRICTED_BY_DEFAULT_PROPERTY);
private static final boolean FULLY_DISABLED = Boolean.getBoolean(FULLY_DISABLE_PROPERTY);

public static ContextLocal<Boolean> registerAccessToggleKey() {
if (FULLY_DISABLED)
return null;
return ContextLocal.registerLocal(Boolean.class);
Copy link
Contributor Author

@franz1981 franz1981 Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vietj is this ok?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not how it should be used imho

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean?
In our context (quarkus) we can disable specific keys, in a stable way (via the sys property), meaning that we don't have to register any local key there.
Do you have another way to provide a similar behaviour?

}

/**
* Verifies if the current Vert.x context was flagged as safe
* to be accessed by components which expect non-concurrent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package io.quarkus.vertx.runtime;

import static io.quarkus.vertx.runtime.storage.QuarkusLocalStorageKeyVertxServiceProvider.REQUEST_SCOPED_LOCAL_KEY;

import java.lang.annotation.Annotation;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.enterprise.context.RequestScoped;

import io.netty.util.concurrent.FastThreadLocal;
import io.quarkus.arc.CurrentContext;
Expand All @@ -14,23 +19,31 @@
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

public class VertxCurrentContextFactory implements CurrentContextFactory {

private static final String LOCAL_KEY_PREFIX = "io.quarkus.vertx.cdi-current-context";

private final List<String> keys;
private final List<String> unmodifiableKeys;
private final AtomicBoolean requestScopedKeyCreated;

public VertxCurrentContextFactory() {
// There will be only a few mutative operations max
this.keys = new CopyOnWriteArrayList<>();
// We do not want to allocate a new object for each VertxCurrentContextFactory#keys() invocation
this.unmodifiableKeys = Collections.unmodifiableList(keys);
this.requestScopedKeyCreated = new AtomicBoolean();
}

@Override
public <T extends InjectableContext.ContextState> CurrentContext<T> create(Class<? extends Annotation> scope) {
if (scope == RequestScoped.class) {
if (!requestScopedKeyCreated.compareAndSet(false, true)) {
throw new IllegalStateException(
"Multiple current contexts for the same scope are not supported. Current context for "
+ scope + " already exists!");
}
return new VertxCurrentContext<T>(REQUEST_SCOPED_LOCAL_KEY);
}
String key = LOCAL_KEY_PREFIX + scope.getName();
if (keys.contains(key)) {
throw new IllegalStateException(
Expand All @@ -41,20 +54,52 @@ public <T extends InjectableContext.ContextState> CurrentContext<T> create(Class
return new VertxCurrentContext<>(key);
}

/**
*
* @return an unmodifiable list of used keys
*/
public List<String> keys() {
return unmodifiableKeys;
public ContextInternal duplicateContextIfContainsAnyCreatedScopeKeys(ContextInternal vertxContext) {
if (!containsAnyCreatedScopeKeys(vertxContext)) {
return vertxContext;
}
// Duplicate the context, copy the data, remove the request context
var duplicateCtx = vertxContext.duplicate();
// TODO this is not copying any ContextLocal<?> from the original context to the new one!
franz1981 marked this conversation as resolved.
Show resolved Hide resolved
var duplicateCtxData = duplicateCtx.localContextData();
duplicateCtxData.putAll(vertxContext.localContextData());
keys.forEach(duplicateCtxData::remove);
if (requestScopedKeyCreated.get()) {
duplicateCtx.removeLocal(REQUEST_SCOPED_LOCAL_KEY);
}
VertxContextSafetyToggle.setContextSafe(duplicateCtx, true);
return duplicateCtx;
}

private boolean containsAnyCreatedScopeKeys(ContextInternal vertxContext) {
boolean requestScopedKeyCreated = this.requestScopedKeyCreated.get();
if (requestScopedKeyCreated && vertxContext.getLocal(REQUEST_SCOPED_LOCAL_KEY) != null) {
return true;
}
if (keys.isEmpty()) {
franz1981 marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
ConcurrentMap<Object, Object> local = vertxContext.localContextData();
if (keys.size() == 1) {
// Very often there will be only one key used
return local.containsKey(keys.get(0));
} else {
for (String key : keys) {
if (local.containsKey(key)) {
return true;
}
}
}
return false;
}

private static final class VertxCurrentContext<T extends ContextState> implements CurrentContext<T> {

private final String key;
private final FastThreadLocal<T> fallback = new FastThreadLocal<>();
// It allows to use both ContextLocalImpl and String keys
private final Object key;
private volatile FastThreadLocal<T> fallback;

private VertxCurrentContext(String key) {
private VertxCurrentContext(Object key) {
this.key = key;
}

Expand All @@ -64,7 +109,24 @@ public T get() {
if (context != null && VertxContext.isDuplicatedContext(context)) {
return context.getLocal(key);
}
return fallback.get();
return fallback().get();
}

private FastThreadLocal<T> fallback() {
var fallback = this.fallback;
if (fallback == null) {
fallback = getOrCreateFallback();
}
return fallback;
}

private synchronized FastThreadLocal<T> getOrCreateFallback() {
var fallback = this.fallback;
if (fallback == null) {
fallback = new FastThreadLocal<>();
this.fallback = fallback;
}
return fallback;
}

@Override
Expand All @@ -80,7 +142,7 @@ public void set(T state) {
}

} else {
fallback.set(state);
fallback().set(state);
}
}

Expand All @@ -91,7 +153,7 @@ public void remove() {
// NOOP - the DC should not be shared.
// context.removeLocal(key);
} else {
fallback.remove();
fallback().remove();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.vertx.runtime.storage;

import io.quarkus.arc.InjectableContext;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.vertx.core.impl.VertxBuilder;
import io.vertx.core.spi.VertxServiceProvider;
import io.vertx.core.spi.context.storage.ContextLocal;

/**
* This provider exists with the sole purpose of reliably get the optimized local keys created before
* the Vertx instance is created.
*/
public class QuarkusLocalStorageKeyVertxServiceProvider implements VertxServiceProvider {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@geoand @cescoffier I don't like to make spi loading to happen just for this TBH and I could, maybe, hack the Vertx recorder to just perform the init at the right time, but IDK what's the cost of this, usually. Any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: can we preload this at build time (static init). It would be interesting to do that and just use the result at runtime. I don't believe there is a good reason to change the SPI at runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can skip the service loading in this case and just use this class, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't believe we should allow any other implementation.

Copy link
Contributor Author

@franz1981 franz1981 Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks both, yeah, I'm not very happy about it either: so i can just have a not vertx service provider, but an holder class which is going to be initialized in the vertx recorder before vertx instances are allocated, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine to me


public static final ContextLocal<Boolean> ACCESS_TOGGLE_KEY = VertxContextSafetyToggle.registerAccessToggleKey();
public static final ContextLocal<InjectableContext.ContextState> REQUEST_SCOPED_LOCAL_KEY = ContextLocal
.registerLocal(InjectableContext.ContextState.class);

@Override
public void init(VertxBuilder builder) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.quarkus.vertx.runtime.storage.QuarkusLocalStorageKeyVertxServiceProvider
franz1981 marked this conversation as resolved.
Show resolved Hide resolved