Skip to content

Commit

Permalink
Duplicating a duplicated context is supported but the semantic of the…
Browse files Browse the repository at this point in the history
… actual locals is not defined.

This update the duplicated context duplication by doing a copy of each local in the duplicated duplicate. This introduce a duplicator for each local that is responsible for copying the object when it is not null.
  • Loading branch information
vietj committed May 30, 2024
1 parent d262280 commit dc2f1fb
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 19 deletions.
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.concurrent.*;
import java.util.function.UnaryOperator;

/**
* A base class for {@link Context} implementations.
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/vertx/core/impl/ContextInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

/**
* This interface provides an api for vert.x core internal use only
Expand All @@ -35,7 +36,7 @@
*/
public interface ContextInternal extends Context {

ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0);
ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = (ContextLocal) new ContextLocalImpl<>(0, ConcurrentMap.class, t -> new ConcurrentHashMap<>(t));

/**
* @return the current context
Expand Down
23 changes: 18 additions & 5 deletions src/main/java/io/vertx/core/impl/ContextLocalImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,31 @@

import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.function.Function;
import java.util.function.UnaryOperator;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class ContextLocalImpl<T> implements ContextLocal<T> {

final int index;
public static <T> ContextLocal<T> registerLocal(Class<T> type) {
return registerLocal(type, (UnaryOperator<T>) ContextLocalImpl.IDENTITY);
}

public ContextLocalImpl(int index) {
this.index = index;
public static <T> ContextLocal<T> registerLocal(Class<T> type, Function<T, ? extends T> duplicator) {
return LocalSeq.add(idx -> new ContextLocalImpl<>(idx, type, duplicator));
}

public ContextLocalImpl() {
this.index = LocalSeq.next();
public static final UnaryOperator<?> IDENTITY = UnaryOperator.identity();

final int index;
final Class<T> type;
final Function<T, ? extends T> duplicator;

ContextLocalImpl(int index, Class<T> type, Function<T, ? extends T> duplicator) {
this.index = index;
this.type = type;
this.duplicator = duplicator;
}
}
25 changes: 23 additions & 2 deletions src/main/java/io/vertx/core/impl/DuplicatedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import io.vertx.core.Handler;
import io.vertx.core.ThreadingModel;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

Expand Down Expand Up @@ -164,7 +164,28 @@ public boolean isWorkerContext() {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
Object[] localsDuplicate = locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : locals.clone();
ContextLocal<?>[] contextLocals = ((VertxImpl) delegate.owner()).contextLocals;
for (int i = 0;i < localsDuplicate.length;i++) {
ContextLocalImpl<?> contextLocal = (ContextLocalImpl<?>) contextLocals[i];
Object local = locals[i];
if (local != null) {
if (local != ContextLocalImpl.IDENTITY) {
localsDuplicate[i] = duplicate(local, contextLocal);
} else {
localsDuplicate[i] = local;
}
}
}
return new DuplicatedContext(delegate, localsDuplicate);
}

private static <T> T duplicate(Object o, ContextLocalImpl<T> contextLocal) {
if (contextLocal.type.isInstance(o)) {
return contextLocal.duplicator.apply(contextLocal.type.cast(o));
} else {
throw new ClassCastException();
}
}

@Override
Expand Down
32 changes: 25 additions & 7 deletions src/main/java/io/vertx/core/impl/LocalSeq.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,46 @@
*/
package io.vertx.core.impl;

import java.util.concurrent.atomic.AtomicInteger;
import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.ArrayList;
import java.util.List;
import java.util.function.IntFunction;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class LocalSeq {

// 0 : reserved slot for local context map
private static final AtomicInteger seq = new AtomicInteger(1);
private static final List<ContextLocal<?>> locals = new ArrayList<>();

static {
reset();
}

/**
* Hook for testing purposes
*/
static void reset() {
seq.set((1));
synchronized (locals) {
locals.clear();
locals.add(ContextInternal.LOCAL_MAP);
}
}

static int get() {
return seq.get();
static ContextLocal<?>[] get() {
synchronized (locals) {
return locals.toArray(new ContextLocal[0]);
}
}

static int next() {
return seq.getAndIncrement();
static <T> ContextLocal<T> add(IntFunction<ContextLocal<T>> provider) {
synchronized (locals) {
int idx = locals.size();
ContextLocal<T> local = provider.apply(idx);
locals.add(local);
return local;
}
}
}
9 changes: 6 additions & 3 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.vertx.core.net.*;
import io.vertx.core.net.impl.*;
import io.vertx.core.impl.transports.JDKTransport;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.file.FileResolver;
import io.vertx.core.file.impl.FileSystemImpl;
import io.vertx.core.file.impl.WindowsFileSystem;
Expand Down Expand Up @@ -73,6 +74,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
Expand Down Expand Up @@ -136,7 +138,7 @@ private static ThreadFactory virtualThreadFactory() {
private final VerticleManager verticleManager;
private final FileResolver fileResolver;
private final Map<ServerID, NetServerInternal> sharedNetServers = new HashMap<>();
private final int contextLocals;
final ContextLocal<?>[] contextLocals;
final WorkerPool workerPool;
final WorkerPool internalWorkerPool;
final WorkerPool virtualThreaWorkerPool;
Expand Down Expand Up @@ -503,10 +505,11 @@ public boolean cancelTimer(long id) {
}

private Object[] createContextLocals() {
if (contextLocals == 0) {
int len = contextLocals.length;
if (len == 0) {
return EMPTY_CONTEXT_LOCALS;
} else {
return new Object[contextLocals];
return new Object[len];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.vertx.core.impl.ContextLocalImpl;

import java.util.function.Supplier;
import java.util.function.UnaryOperator;

/**
* A local storage for arbitrary data attached to a duplicated {@link Context}.
Expand All @@ -35,7 +36,16 @@ public interface ContextLocal<T> {
* @return the context local storage
*/
static <T> ContextLocal<T> registerLocal(Class<T> type) {
return new ContextLocalImpl<>();
return ContextLocalImpl.registerLocal(type);
}

/**
* Registers a context local storage.
*
* @return the context local storage
*/
static <T> ContextLocal<T> registerLocal(Class<T> type, UnaryOperator<T> duplicator) {
return ContextLocalImpl.registerLocal(type, duplicator);
}

/**
Expand All @@ -58,6 +68,10 @@ default T get(Context context, Supplier<? extends T> initialValueSupplier) {
return get(context, AccessMode.CONCURRENT, initialValueSupplier);
}

default T duplicate(T value) {
return value;
}

/**
* Put local data in the {@code context}.
*
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/io/vertx/core/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1092,4 +1092,18 @@ public void testConcurrentLocalAccess() throws Exception {
}
}

@Test
public void testNestedDuplicate() {
ContextInternal ctx = ((ContextInternal) vertx.getOrCreateContext()).duplicate();
ctx.putLocal("foo", "bar");
Object expected = new Object();
ctx.putLocal(contextLocal, AccessMode.CONCURRENT, expected);
ContextInternal duplicate = ctx.duplicate();
assertEquals("bar", duplicate.getLocal("foo"));
assertEquals(expected, duplicate.getLocal(contextLocal));
ctx.removeLocal("foo");
ctx.removeLocal(contextLocal, AccessMode.CONCURRENT);
assertEquals("bar", duplicate.getLocal("foo"));
assertEquals(expected, duplicate.getLocal(contextLocal));
}
}

0 comments on commit dc2f1fb

Please sign in to comment.