diff --git a/praxiscore-code/src/main/java/org/praxislive/code/CodeComponent.java b/praxiscore-code/src/main/java/org/praxislive/code/CodeComponent.java index 25550c65..79e61212 100644 --- a/praxiscore-code/src/main/java/org/praxislive/code/CodeComponent.java +++ b/praxiscore-code/src/main/java/org/praxislive/code/CodeComponent.java @@ -83,8 +83,9 @@ public final Container getParent() { public void parentNotify(Container parent) throws VetoException { if (parent == null) { if (this.parent != null) { - this.parent = null; disconnectAll(); + codeCtxt.handleDispose(); + this.parent = null; } } else { if (this.parent != null) { @@ -104,9 +105,6 @@ public void hierarchyChanged() { router = null; logInfo = null; codeCtxt.handleHierarchyChanged(); - if (getAddress() == null) { - codeCtxt.handleDispose(); - } } @Override diff --git a/praxiscore-code/src/main/java/org/praxislive/code/CodeContext.java b/praxiscore-code/src/main/java/org/praxislive/code/CodeContext.java index 4cd3c12f..767f49ad 100644 --- a/praxiscore-code/src/main/java/org/praxislive/code/CodeContext.java +++ b/praxiscore-code/src/main/java/org/praxislive/code/CodeContext.java @@ -294,8 +294,6 @@ protected final void resetAndInitialize() { } final void handleDispose() { - cmp = null; - handleHierarchyChanged(); refs.values().forEach(ReferenceDescriptor::dispose); refs.clear(); controls.values().forEach(ControlDescriptor::dispose); @@ -303,6 +301,8 @@ final void handleDispose() { ports.values().forEach(PortDescriptor::dispose); ports.clear(); dispose(); + cmp = null; + handleHierarchyChanged(); } /** diff --git a/praxiscore-code/src/main/java/org/praxislive/code/FunctionDescriptor.java b/praxiscore-code/src/main/java/org/praxislive/code/FunctionDescriptor.java index 7bb17afc..6d89f9db 100644 --- a/praxiscore-code/src/main/java/org/praxislive/code/FunctionDescriptor.java +++ b/praxiscore-code/src/main/java/org/praxislive/code/FunctionDescriptor.java @@ -1,7 +1,7 @@ /* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * - * Copyright 2023 Neil C Smith. + * Copyright 2024 Neil C Smith. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License version 3 only, as @@ -24,7 +24,13 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.praxislive.code.userapi.Async; import org.praxislive.code.userapi.FN; import org.praxislive.core.ArgumentInfo; import org.praxislive.core.Call; @@ -32,8 +38,10 @@ import org.praxislive.core.ControlInfo; import org.praxislive.core.Info; import org.praxislive.core.PacketRouter; +import org.praxislive.core.Value; import org.praxislive.core.ValueMapper; import org.praxislive.core.services.LogLevel; +import org.praxislive.core.types.PError; import org.praxislive.core.types.PMap; /** @@ -41,26 +49,46 @@ */ class FunctionDescriptor extends ControlDescriptor { + private final Method method; private final ControlInfo info; - private final FunctionControl control; + private final boolean async; + private final List> parameterMappers; + private final ValueMapper returnMapper; + + private FunctionControl control; private FunctionDescriptor(String id, int index, Method method, ControlInfo info, List> parameterMappers, - ValueMapper returnMapper) { + ValueMapper returnMapper, + boolean async) { super(FunctionDescriptor.class, id, Category.Function, index); + this.method = method; this.info = info; - this.control = new DirectFunctionControl(method, parameterMappers, returnMapper); + this.parameterMappers = parameterMappers; + this.returnMapper = returnMapper; + this.async = async; } @Override public void attach(CodeContext context, FunctionDescriptor previous) { if (previous != null) { - previous.control.detach(); + if (isCompatible(previous)) { + control = previous.control; + } else { + previous.dispose(); + } } - control.attach(context); + if (control == null) { + if (async) { + control = new AsyncFunctionControl(parameterMappers, returnMapper); + } else { + control = new DirectFunctionControl(parameterMappers, returnMapper); + } + } + control.attach(context, method); } @Override @@ -73,13 +101,33 @@ public ControlInfo controlInfo() { return info; } + @Override + public void dispose() { + if (control != null) { + control.dispose(); + } + } + + @Override + public void onStop() { + if (control != null) { + control.onStop(); + } + } + + private boolean isCompatible(FunctionDescriptor other) { + return method.getGenericReturnType().equals(other.method.getGenericReturnType()) + && Arrays.equals(method.getGenericParameterTypes(), + other.method.getGenericParameterTypes()); + } + static FunctionDescriptor create(CodeConnector connector, FN ann, Method method) { method.setAccessible(true); - var parameterTypes = method.getParameterTypes(); - var parameterMappers = new ValueMapper[parameterTypes.length]; + Class[] parameterTypes = method.getParameterTypes(); + ValueMapper[] parameterMappers = new ValueMapper[parameterTypes.length]; for (int i = 0; i < parameterMappers.length; i++) { - var type = parameterTypes[i]; - var mapper = ValueMapper.find(type); + Class type = parameterTypes[i]; + ValueMapper mapper = ValueMapper.find(type); if (mapper == null) { connector.getLog().log(LogLevel.ERROR, "Unsupported parameter type " + type.getSimpleName() @@ -88,10 +136,25 @@ static FunctionDescriptor create(CodeConnector connector, FN ann, Method meth } parameterMappers[i] = mapper; } - var returnType = method.getReturnType(); + + boolean async = false; + Class returnType = method.getReturnType(); ValueMapper returnMapper; if (returnType == Void.TYPE) { returnMapper = null; + } else if (returnType == Async.class) { + async = true; + Class asyncReturnType = TypeUtils.extractRawType( + TypeUtils.extractTypeParameter(method.getGenericReturnType()) + ); + returnMapper = asyncReturnType == null ? null + : ValueMapper.find(asyncReturnType); + if (returnMapper == null) { + connector.getLog().log(LogLevel.ERROR, + "Unsupported Async type " + method.getGenericReturnType() + + " in method " + method.getName()); + return null; + } } else { returnMapper = ValueMapper.find(returnType); if (returnMapper == null) { @@ -101,12 +164,12 @@ static FunctionDescriptor create(CodeConnector connector, FN ann, Method meth return null; } } - var id = connector.findID(method); + String id = connector.findID(method); ArgumentInfo[] inputArgInfo = new ArgumentInfo[parameterMappers.length]; for (int i = 0; i < inputArgInfo.length; i++) { - var type = parameterMappers[i].valueType(); - var properties = type.emptyValue() + Value.Type type = parameterMappers[i].valueType(); + PMap properties = type.emptyValue() .map(empty -> PMap.EMPTY) .orElse(PMap.of(ArgumentInfo.KEY_ALLOW_EMPTY, true)); inputArgInfo[i] = ArgumentInfo.of(type.asClass(), properties); @@ -114,8 +177,8 @@ static FunctionDescriptor create(CodeConnector connector, FN ann, Method meth ArgumentInfo[] outputArgInfo; if (returnMapper != null) { - var type = returnMapper.valueType(); - var properties = type.emptyValue() + Value.Type type = returnMapper.valueType(); + PMap properties = type.emptyValue() .map(empty -> PMap.EMPTY) .orElse(PMap.of(ArgumentInfo.KEY_ALLOW_EMPTY, true)); outputArgInfo = new ArgumentInfo[]{ @@ -131,37 +194,40 @@ static FunctionDescriptor create(CodeConnector connector, FN ann, Method meth .build(); return new FunctionDescriptor(id, ann.value(), method, controlInfo, - List.of(parameterMappers), returnMapper); + List.of(parameterMappers), returnMapper, async); } private static abstract class FunctionControl implements Control { - abstract void attach(CodeContext context); + abstract void attach(CodeContext context, Method method); + + abstract void dispose(); - abstract void detach(); + void onStop() { + + } } private static class DirectFunctionControl extends FunctionControl { - private final Method method; private final List> parameterMappers; - private final ValueMapper returnMapper; + private final ValueMapper returnMapper; private CodeContext context; + private Method method; - private DirectFunctionControl(Method method, - List> parameterMappers, + @SuppressWarnings("unchecked") + private DirectFunctionControl(List> parameterMappers, ValueMapper returnMapper) { - this.method = method; this.parameterMappers = parameterMappers; - this.returnMapper = returnMapper; + this.returnMapper = (ValueMapper) returnMapper; } @Override public void call(Call call, PacketRouter router) throws Exception { if (call.isRequest()) { - var args = call.args(); + List args = call.args(); int reqCount = parameterMappers.size(); if (args.size() < reqCount) { throw new IllegalArgumentException("Not enough arguments in call"); @@ -175,17 +241,14 @@ public void call(Call call, PacketRouter router) throws Exception { () -> method.invoke(context.getDelegate(), parameters)); if (call.isReplyRequired()) { if (returnMapper != null) { - @SuppressWarnings("unchecked") - var mapper = (ValueMapper) returnMapper; - router.route(call.reply(mapper.toValue(response))); + router.route(call.reply(returnMapper.toValue(response))); } else { router.route(call.reply()); } } } catch (InvocationTargetException ex) { - var cause = ex.getCause(); - if (cause instanceof Exception) { - throw (Exception) cause; + if (ex.getCause() instanceof Exception exc) { + throw exc; } else { throw ex; } @@ -194,14 +257,106 @@ public void call(Call call, PacketRouter router) throws Exception { } @Override - void attach(CodeContext context) { + void attach(CodeContext context, Method method) { this.context = context; + this.method = method; } @Override - void detach() { + void dispose() { this.context = null; + this.method = null; } + + } + + private static class AsyncFunctionControl extends FunctionControl { + + private final List> parameterMappers; + private final ValueMapper returnMapper; + private final Map> pending; + + private CodeContext context; + private Method method; + + @SuppressWarnings("unchecked") + private AsyncFunctionControl(List> parameterMappers, + ValueMapper returnMapper) { + this.parameterMappers = parameterMappers; + this.returnMapper = (ValueMapper) returnMapper; + pending = new LinkedHashMap<>(); + } + + @Override + @SuppressWarnings("unchecked") + public void call(Call call, PacketRouter router) throws Exception { + if (call.isRequest()) { + List args = call.args(); + int reqCount = parameterMappers.size(); + if (args.size() < reqCount) { + throw new IllegalArgumentException("Not enough arguments in call"); + } + Object[] parameters = new Object[reqCount]; + for (int i = 0; i < parameters.length; i++) { + parameters[i] = parameterMappers.get(i).fromValue(args.get(i)); + } + try { + Object response = context.invokeCallable(call.time(), + () -> method.invoke(context.getDelegate(), parameters)); + if (call.isReplyRequired()) { + Async async = (Async) response; + CompletableFuture future = Async.toCompletableFuture(async); + pending.put(call, future); + future.whenComplete((result, error) -> { + if (pending.remove(call) != null) { + PacketRouter rtr = context.getComponent().getPacketRouter(); + if (result != null) { + try { + rtr.route(call.reply(returnMapper.toValue(result))); + } catch (Exception ex) { + rtr.route(call.error(PError.of(ex))); + } + } else { + PError pe = PError.of(error instanceof Exception e + ? e : new Exception(error)); + rtr.route(call.error(pe)); + } + } + + }); + } + } catch (InvocationTargetException ex) { + if (ex.getCause() instanceof Exception exc) { + throw exc; + } else { + throw ex; + } + } + } + } + + @Override + void attach(CodeContext context, Method method) { + this.context = context; + this.method = method; + } + + @Override + void onStop() { + if (!pending.isEmpty() && context != null) { + List> futures = new ArrayList<>(pending.values()); + futures.forEach(f -> f.cancel(false)); + } + pending.clear(); + } + + @Override + void dispose() { + onStop(); + this.context = null; + this.method = null; + } + } } diff --git a/praxiscore-code/src/main/java/org/praxislive/code/RefImpl.java b/praxiscore-code/src/main/java/org/praxislive/code/RefImpl.java index 54bd3054..404971c9 100644 --- a/praxiscore-code/src/main/java/org/praxislive/code/RefImpl.java +++ b/praxiscore-code/src/main/java/org/praxislive/code/RefImpl.java @@ -23,7 +23,6 @@ import java.lang.reflect.Field; import java.lang.reflect.Type; -import java.util.function.Function; import org.praxislive.code.userapi.AuxOut; import org.praxislive.code.userapi.Out; import org.praxislive.code.userapi.Ref; diff --git a/praxiscore-code/src/main/java/org/praxislive/code/ResponseHandler.java b/praxiscore-code/src/main/java/org/praxislive/code/ResponseHandler.java index 57d62639..ff26df25 100644 --- a/praxiscore-code/src/main/java/org/praxislive/code/ResponseHandler.java +++ b/praxiscore-code/src/main/java/org/praxislive/code/ResponseHandler.java @@ -1,7 +1,7 @@ /* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * - * Copyright 2023 Neil C Smith. + * Copyright 2024 Neil C Smith. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License version 3 only, as @@ -21,7 +21,6 @@ */ package org.praxislive.code; -import java.lang.ref.WeakReference; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,9 +34,6 @@ import org.praxislive.core.services.LogLevel; import org.praxislive.core.types.PError; -/** - * - */ class ResponseHandler extends ControlDescriptor implements Control { static final String ID = "_reply"; @@ -64,20 +60,19 @@ public void attach(CodeContext context, ResponseHandler previous) { @Override public void call(Call call, PacketRouter router) throws Exception { if (call.isReply()) { - var asyncRef = resultMap.remove(call.matchID()); + AsyncReference asyncRef = resultMap.remove(call.matchID()); if (asyncRef != null) { asyncRef.complete(call); } } else if (call.isError()) { - var error = extractError(call.args()); - var asyncRef = resultMap.remove(call.matchID()); + PError error = extractError(call.args()); + AsyncReference asyncRef = resultMap.remove(call.matchID()); if (asyncRef == null || !asyncRef.completeWithError(error)) { context.getLog().log(LogLevel.ERROR, error); } } else if (call.isReplyRequired()) { router.route(call.error(PError.of("Unexpected call"))); } - cleanResultMap(); } @Override @@ -93,10 +88,7 @@ public ControlInfo controlInfo() { @Override public void dispose() { resultMap.forEach((id, ref) -> { - Async async = ref.get(); - if (async != null) { - async.fail(PError.of("Disposed")); - } + ref.completeWithError(PError.of("Disposed")); }); resultMap.clear(); } @@ -104,14 +96,9 @@ public void dispose() { void register(Call call, Async async) { register(call, async, Function.identity()); } - - void register(Call call, Async async, Function converter) { - cleanResultMap(); - resultMap.put(call.matchID(), new AsyncReference(async, converter)); - } - private void cleanResultMap() { - resultMap.entrySet().removeIf(e -> e.getValue().get() == null); + void register(Call call, Async async, Function converter) { + resultMap.put(call.matchID(), new AsyncReference(call, async, converter)); } private PError extractError(List args) { @@ -119,39 +106,37 @@ private PError extractError(List args) { return UNKNOWN_ERROR; } else { return PError.from(args.get(0)) - .orElse(PError.of(args.get(0).toString())); + .orElseGet(() -> PError.of(args.get(0).toString())); } } - private static class AsyncReference extends WeakReference> { + private static class AsyncReference { + private final Call call; + private final Async async; private final Function converter; - - private AsyncReference(Async referent, Function converter) { - super(referent); + + private AsyncReference(Call call, Async async, Function converter) { + this.call = call; + this.async = async; this.converter = converter; } - + + private Call call() { + return call; + } + private void complete(Call call) { - Async async = get(); - if (async != null) { - try { - T value = converter.apply(call); - async.complete(value); - } catch (Exception ex) { - async.fail(PError.of(ex)); - } + try { + T value = converter.apply(call); + async.complete(value); + } catch (Exception ex) { + async.fail(PError.of(ex)); } - } - + private boolean completeWithError(PError error) { - Async async = get(); - if (async != null) { - return async.fail(error); - } else { - return false; - } + return async.fail(error); } } diff --git a/praxiscore-code/src/main/java/org/praxislive/code/TypeUtils.java b/praxiscore-code/src/main/java/org/praxislive/code/TypeUtils.java index 6c015bd2..19346a10 100644 --- a/praxiscore-code/src/main/java/org/praxislive/code/TypeUtils.java +++ b/praxiscore-code/src/main/java/org/praxislive/code/TypeUtils.java @@ -1,7 +1,7 @@ /* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * - * Copyright 2023 Neil C Smith. + * Copyright 2024 Neil C Smith. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License version 3 only, as @@ -60,13 +60,47 @@ static boolean equivalent(Type type1, Type type2) { */ static Type extractTypeParameter(Field field, Class baseType) { if (field.getType().equals(baseType)) { - var fieldType = field.getGenericType(); - if (fieldType instanceof ParameterizedType) { - var paramType = (ParameterizedType) fieldType; - var types = paramType.getActualTypeArguments(); - if (types.length == 1) { - return types[0]; - } + return extractTypeParameter(field.getGenericType()); + } + return null; + } + + /** + * Extract the type parameter from a type if it is a parameterized type with + * single type parameter. eg. for a type of {@code Ref>} return + * the type of {@code List}. + * + * @param type generic type + * @return extracted parameter type or null + */ + static Type extractTypeParameter(Type type) { + if (type instanceof ParameterizedType paramType) { + Type[] types = paramType.getActualTypeArguments(); + if (types.length == 1) { + return types[0]; + } + } + return null; + } + + /** + * Extract the raw Class type from a type, if one exists. Supports Class or + * ParameterizedType. Supports null input for chaining with other methods. + * + * @param type type or null + * @return class or null + */ + static Class extractRawType(Type type) { + if (type == null) { + return null; + } + if (type instanceof Class cls) { + return cls; + } + if (type instanceof ParameterizedType paramType) { + Type raw = paramType.getRawType(); + if (raw instanceof Class cls) { + return cls; } } return null; diff --git a/praxiscore-code/src/main/java/org/praxislive/code/userapi/Async.java b/praxiscore-code/src/main/java/org/praxislive/code/userapi/Async.java index 8a89d8fc..6439a596 100644 --- a/praxiscore-code/src/main/java/org/praxislive/code/userapi/Async.java +++ b/praxiscore-code/src/main/java/org/praxislive/code/userapi/Async.java @@ -1,7 +1,7 @@ /* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * - * Copyright 2023 Neil C Smith. + * Copyright 2024 Neil C Smith. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License version 3 only, as @@ -26,8 +26,15 @@ import java.util.Deque; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.function.Consumer; +import org.praxislive.core.Call; +import org.praxislive.core.Value; +import org.praxislive.core.ValueMapper; import org.praxislive.core.types.PError; +import org.praxislive.core.types.PReference; /** * A lightweight holder for a future value, the result of an asynchronous @@ -36,8 +43,8 @@ * An Async can be explicitly completed, with a value or error. Completion can * only happen once. *

- * Async is not thread safe and is not designed for concurrent operation. - * Use from a single thread, or protect appropriately. + * Async is not thread safe and is not designed for concurrent + * operation. Use from a single thread, or protect appropriately. * * @param result type */ @@ -134,18 +141,177 @@ private void processLinkOnDone() { } private void link(Link link) { - if (this.link != null && this.link != link) { - this.link.removeOnRelink(this); + if (this.link == null) { + this.link = link; + } else if (this.link instanceof CompoundLink compound) { + compound.addLink(link); + } else { + this.link = new CompoundLink<>(this.link, link); } - this.link = link; + } private void unlink(Link link) { if (this.link == link) { this.link = null; + } else if (this.link instanceof CompoundLink compound) { + compound.removeLink(link); + } + } + + /** + * Bind a target Async to complete when a source Async completes. When the + * source Async completes, the target will be completed with the same result + * or error. + * + * @param result type + * @param source source async + * @param target target async + */ + public static void bind(Async source, Async target) { + Objects.requireNonNull(target); + if (source.done()) { + complete(source, target); + } else { + source.link(handler(async -> complete(async, target))); + } + } + + /** + * Create an Async that will complete when the provided async call + * completes, by extracting the first call argument and attempting to map to + * the given type. The returned Async will complete with an error if the + * call completes with an error, the argument isn't available, or the + * argument cannot be mapped to the given type. + * + * @param type of created async + * @param asyncCall async call + * @param type class type of created async + * @return created async + */ + public static Async extractArg(Async asyncCall, Class type) { + return extractArg(asyncCall, type, 0); + } + + /** + * Create an Async that will complete when the provided async call + * completes, by extracting the indexed call argument and attempting to map + * to the given type. The returned Async will complete with an error if the + * call completes with an error, the argument isn't available, or the + * argument cannot be mapped to the given type. + * + * @param type of created async + * @param asyncCall async call + * @param type class type of created async + * @param argIdx index of argument to extract + * @return created async + */ + public static Async extractArg(Async asyncCall, Class type, int argIdx) { + Objects.requireNonNull(type); + Async asyncValue = new Async<>(); + if (asyncCall.done()) { + completeExtract(asyncCall, asyncValue, type, argIdx); + } else { + asyncCall.link(handler( + async -> completeExtract(async, asyncValue, type, argIdx) + )); + } + return asyncValue; + } + + /** + * A utility method for linking an Async with a {@link CompletableFuture} + * for passing to external APIs. + *

+ * IMPORTANT : do not use completable futures returned from this + * method inside component code. To react to Async completion from + * within component code, use an {@link Async.Queue}. + *

+ * The completable future will automatically complete with the result or + * failure of the Async. The link is one way - the Async will not respond to + * any changes to the future. + * + * @param async and future type + * @param async async to link to created future + * @return created future + */ + public static CompletableFuture toCompletableFuture(Async async) { + if (async.done()) { + if (async.failed()) { + return CompletableFuture.failedFuture(extractError(async)); + } else { + return CompletableFuture.completedFuture(async.result()); + } + } else { + CompletableFuture future = new CompletableFuture<>(); + async.link(handler( + a -> { + if (a.failed()) { + future.completeExceptionally(extractError(a)); + } else { + future.complete(a.result()); + } + } + )); + return future; + } + } + + private static void complete(Async source, Async target) { + if (source.failed()) { + target.fail(source.error()); + } else { + target.complete(source.result()); + } + } + + private static void completeExtract(Async asyncCall, Async asyncValue, Class type, int argIdx) { + try { + if (asyncCall.failed()) { + asyncValue.fail(asyncCall.error()); + } else { + Call call = asyncCall.result(); + List args = call.args(); + if (call.isError()) { + PError err; + if (args.isEmpty()) { + err = PError.of("Unknown error"); + } else { + err = PError.from(args.get(0)) + .orElseGet(() -> PError.of(args.get(0).toString())); + } + asyncValue.fail(err); + } else { + Value value = args.get(argIdx); + T result; + if (Value.class == type) { + result = type.cast(value); + } else if (value instanceof PReference ref) { + if (PReference.class == type) { + result = type.cast(ref); + } else { + result = ref.as(type).orElseThrow(ClassCastException::new); + } + } else { + result = ValueMapper.find(type).fromValue(value); + } + asyncValue.complete(result); + } + } + } catch (Exception ex) { + asyncValue.fail(PError.of(ex)); } } + private static Throwable extractError(Async failed) { + return failed.error().exception() + .orElseGet(() -> new Exception(failed.error().toString())); + } + + private static Handler handler(Consumer> consumer) { + return new Handler<>(consumer); + } + /** * A task intended to be run asynchronously and outside of the main * component context. All data required to complete the task should be @@ -172,14 +338,19 @@ public static interface Task { } - private static abstract class Link { - - abstract void processDone(Async async); + /** + * A Link is something attached to an Async to react to its completion. + * + * @param Async result type + */ + static sealed abstract class Link { - void removeOnRelink(Async async) { + Link() { } + abstract void processDone(Async async); + } /** @@ -187,9 +358,6 @@ void removeOnRelink(Async async) { * Async instances that have completed, or a handler can be attached to run * on completion. *

- * An Async may only be in one queue at a time. Adding to the queue will - * automatically remove from any previous queue. - *

* A queue cannot be constructed directly. Use the {@link Inject} annotation * on a field. Queues will have handlers and limits automatically removed on * reset, and will be cleared on disposal. @@ -375,11 +543,43 @@ void processDone(Async async) { } } + } + + static final class Handler extends Link { + + private final Consumer> onDoneHandler; + + Handler(Consumer> onDoneHandler) { + this.onDoneHandler = onDoneHandler; + } + @Override - void removeOnRelink(Async async) { - deque.remove(async); + void processDone(Async async) { + onDoneHandler.accept(async); } } + static final class CompoundLink extends Link { + + private final Set> links; + + CompoundLink(Link link1, Link link2) { + links = new CopyOnWriteArraySet<>(List.of(link1, link2)); + } + + @Override + void processDone(Async async) { + links.forEach(link -> link.processDone(async)); + } + + void addLink(Link link) { + links.add(link); + } + + void removeLink(Link link) { + links.remove(link); + } + + } } diff --git a/praxiscore-code/src/test/java/org/praxislive/code/userapi/AsyncTest.java b/praxiscore-code/src/test/java/org/praxislive/code/userapi/AsyncTest.java index b025496e..89c49ba2 100644 --- a/praxiscore-code/src/test/java/org/praxislive/code/userapi/AsyncTest.java +++ b/praxiscore-code/src/test/java/org/praxislive/code/userapi/AsyncTest.java @@ -2,10 +2,17 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.praxislive.core.types.PError; import static org.junit.jupiter.api.Assertions.*; +import org.praxislive.core.Call; +import org.praxislive.core.ControlAddress; +import org.praxislive.core.Value; +import org.praxislive.core.types.PBoolean; +import org.praxislive.core.types.PReference; +import org.praxislive.core.types.PString; /** * @@ -42,6 +49,147 @@ public void testError() { assertNull(async.result()); } + @Test + public void testBind() { + Async source = new Async<>(); + Async target = new Async<>(); + Async.bind(source, target); + source.complete("FOO"); + assertTrue(target.done()); + assertEquals("FOO", target.result()); + + source = new Async<>(); + target = new Async<>(); + source.complete("BAR"); + Async.bind(source, target); + assertTrue(target.done()); + assertEquals("BAR", target.result()); + + source = new Async<>(); + target = new Async<>(); + Async.bind(source, target); + PError error = PError.of("ERROR"); + source.fail(error); + assertTrue(target.done()); + assertSame(error, target.error()); + } + + @Test + public void testExtractArg() { + // Test Value extract + Async asyncCall = new Async<>(); + Async asyncValue = Async.extractArg(asyncCall, Value.class); + asyncCall.complete(createCall().reply(Value.ofObject("TEST"))); + assertTrue(asyncValue.done()); + assertEquals(PString.of("TEST"), asyncValue.result()); + + // Test coerced Value extract + asyncCall = new Async<>(); + Async asyncBoolean = Async.extractArg(asyncCall, PBoolean.class); + asyncCall.complete(createCall().reply(Value.ofObject("true"))); + assertTrue(asyncBoolean.done()); + assertEquals(PBoolean.TRUE, asyncBoolean.result()); + + // Test String extract + asyncCall = new Async<>(); + Async asyncMessage = Async.extractArg(asyncCall, String.class); + asyncCall.complete(createCall().reply(Value.ofObject("TEST"))); + assertTrue(asyncMessage.done()); + assertEquals("TEST", asyncMessage.result()); + + // Test String extract after precompletion + asyncCall = new Async<>(); + asyncCall.complete(createCall().reply(Value.ofObject("TEST"))); + asyncMessage = Async.extractArg(asyncCall, String.class); + assertTrue(asyncMessage.done()); + assertEquals("TEST", asyncMessage.result()); + + // Test failed int extract + asyncCall = new Async<>(); + Async asyncInt = Async.extractArg(asyncCall, Integer.class); + asyncCall.complete(createCall().reply(Value.ofObject("NOT A NUMBER"))); + assertTrue(asyncInt.done()); + assertTrue(asyncInt.failed()); + assertNotNull(asyncInt.error()); + + // Test int extract from second arg + asyncCall = new Async<>(); + asyncInt = Async.extractArg(asyncCall, Integer.class, 1); + asyncCall.complete(createCall().reply(List.of( + Value.ofObject("TEST"), Value.ofObject(42)))); + assertTrue(asyncInt.done()); + assertEquals(42, asyncInt.result()); + + // Test Call error passed through + asyncCall = new Async<>(); + PError err = PError.of("ERROR"); + asyncMessage = Async.extractArg(asyncCall, String.class); + asyncCall.complete(createCall().error(err)); + assertTrue(asyncMessage.done()); + assertTrue(asyncMessage.failed()); + assertEquals(err, asyncMessage.error()); + + // Test extract of custom reference + record Foo(int value) { + + } + asyncCall = new Async<>(); + Async asyncFoo = Async.extractArg(asyncCall, Foo.class); + asyncCall.complete(createCall().reply(PReference.of(new Foo(42)))); + assertTrue(asyncFoo.done()); + assertInstanceOf(Foo.class, asyncFoo.result()); + assertEquals(42, asyncFoo.result().value()); + + } + + @Test + public void testToCompletableFuture() { + // completed after link + Async asyncString = new Async<>(); + CompletableFuture futureString + = Async.toCompletableFuture(asyncString) + .thenApply(s -> "Hello " + s); + asyncString.complete("World"); + assertTrue(futureString.isDone()); + assertEquals("Hello World", futureString.resultNow()); + + // completed before link + asyncString = new Async<>(); + asyncString.complete("Universe"); + futureString + = Async.toCompletableFuture(asyncString) + .thenApply(s -> "Hello " + s); + assertTrue(futureString.isDone()); + assertEquals("Hello Universe", futureString.resultNow()); + + // error after link + asyncString = new Async<>(); + Exception ex = new Exception("FOO"); + futureString = Async.toCompletableFuture(asyncString); + asyncString.fail(PError.of(ex)); + assertTrue(futureString.isCompletedExceptionally()); + assertSame(ex, futureString.exceptionNow()); + + // error before link + asyncString = new Async<>(); + ex = new Exception("FOO"); + asyncString.fail(PError.of(ex)); + futureString = Async.toCompletableFuture(asyncString); + assertTrue(futureString.isCompletedExceptionally()); + assertSame(ex, futureString.exceptionNow()); + + // test queued and future + asyncString = new Async<>(); + futureString = Async.toCompletableFuture(asyncString); + Async.Queue queue = new Async.Queue<>(); + queue.add(asyncString); + assertNull(queue.poll()); + asyncString.complete("QUEUED"); + assertSame(asyncString, queue.poll()); + assertTrue(futureString.isDone()); + assertEquals("QUEUED", futureString.resultNow()); + } + @Test public void testQueuePoll() { Async async1 = new Async<>(); @@ -102,42 +250,48 @@ public void testQueuePollSized() { assertNull(queue.poll()); } - + @Test public void testQueueHandler() { Async async1 = new Async<>(); Async async2 = new Async<>(); Async async3 = new Async<>(); Async async4 = new Async<>(); - + Async.Queue queue = new Async.Queue<>(); - + List results = new ArrayList<>(); List errors = new ArrayList<>(); - + queue.onDone(results::add, errors::add); - + async3.complete(3); queue.add(async1); queue.add(async2); queue.add(async3); queue.add(async4); - + assertEquals(1, results.size()); assertEquals(3, results.get(0)); - + async1.complete(1); async2.fail(PError.of("TWO")); assertEquals(2, results.size()); assertEquals(1, results.get(1)); assertEquals(1, errors.size()); assertEquals("TWO", errors.get(0).message()); - + queue.clear(); async4.complete(4); assertEquals(2, results.size()); assertEquals(1, errors.size()); - + + } + + private static Call createCall() { + return Call.create(ControlAddress.of("/root/component.control"), + ControlAddress.of("/test/component.control"), + System.nanoTime()); } } diff --git a/testsuite/tests/core/async-functions/data1.pxr b/testsuite/tests/core/async-functions/data1.pxr index 9d24c036..b4c8624b 100644 --- a/testsuite/tests/core/async-functions/data1.pxr +++ b/testsuite/tests/core/async-functions/data1.pxr @@ -1,37 +1,34 @@ @ /data1 root:data { - #%praxis.version 5.7.0-SNAPSHOT + .meta [map praxis.version 5.7.0-SNAPSHOT] @ ./exit core:custom { - #%graph.x 680 - #%graph.y 659 - .code "import org.praxislive.core.services.Services; + .meta [map graph.x 589 graph.y 685] + .code {import org.praxislive.core.services.Services; import org.praxislive.core.services.SystemManagerService; @T(1) - void exitOK() \{ + void exitOK() { exit(0); - \} + } @T(2) - void exitFail() \{ + void exitFail() { exit(1); - \} + } - private void exit(int exitValue) \{ + private void exit(int exitValue) { find(Services.class) .flatMap(s -> s.locate(SystemManagerService.class)) - .ifPresent(s -> tell(ControlAddress.of(s, \"system-exit\"), exitValue)); - \} + .ifPresent(s -> tell(ControlAddress.of(s, "system-exit"), exitValue)); + } -" +} } @ ./test1 core:custom { - #%graph.x 124 - #%graph.y 74 - #%graph.comment Call uppercase function - .code " + .meta [map graph.x 124 graph.y 74 graph.comment "Call uppercase function"] + .code { - String lowercase = \"hello world\"; + String lowercase = "hello world"; String uppercase = lowercase.toUpperCase(Locale.ROOT); @Out(1) Output ok; @@ -40,42 +37,40 @@ import org.praxislive.core.services.SystemManagerService; @Persist Async response; @Override - public void update() \{ - if (response != null && response.done()) \{ - if (response.failed()) \{ - log(ERROR, \" \" + response.error()); + public void update() { + if (response != null && response.done()) { + if (response.failed()) { + log(ERROR, " " + response.error()); error.send(); - \} else \{ + } else { String result = response.result().args().get(0).toString(); - if (!result.equals(uppercase)) \{ - log(ERROR, \" expected \" + uppercase + \" but got \" + result); + if (!result.equals(uppercase)) { + log(ERROR, " expected " + uppercase + " but got " + result); error.send(); - \} else \{ - log(INFO, \"Test 1 : OK\"); + } else { + log(INFO, "Test 1 : OK"); ok.send(); - \} - \} + } + } response = null; - \} - \} + } + } @T(1) - void test() \{ - log(INFO, \"Test 1 : Calling /data2/functions.uppercase\"); - response = ask(ControlAddress.of(\"/data2/functions.uppercase\"), + void test() { + log(INFO, "Test 1 : Calling /data2/functions.uppercase"); + response = ask(ControlAddress.of("/data2/functions.uppercase"), List.of(V(lowercase))); - \} + } -" +} } @ ./test2 core:custom { - #%graph.x 124 - #%graph.y 231 - #%graph.comment Call uppercase function with extra args - .code " + .meta [map graph.x 124 graph.y 231 graph.comment "Call uppercase function with extra args"] + .code { - String lowercase = \"hello world\"; + String lowercase = "hello world"; String uppercase = lowercase.toUpperCase(Locale.ROOT); @Out(1) Output ok; @@ -84,42 +79,40 @@ import org.praxislive.core.services.SystemManagerService; @Persist Async response; @Override - public void update() \{ - if (response != null && response.done()) \{ - if (response.failed()) \{ - log(ERROR, \" \" + response.error()); + public void update() { + if (response != null && response.done()) { + if (response.failed()) { + log(ERROR, " " + response.error()); error.send(); - \} else \{ + } else { String result = response.result().args().get(0).toString(); - if (!result.equals(uppercase)) \{ - log(ERROR, \" expected \" + uppercase + \" but got \" + result); + if (!result.equals(uppercase)) { + log(ERROR, " expected " + uppercase + " but got " + result); error.send(); - \} else \{ - log(INFO, \"Test 2 : OK\"); + } else { + log(INFO, "Test 2 : OK"); ok.send(); - \} - \} + } + } response = null; - \} - \} + } + } @T(1) - void test() \{ - log(INFO, \"Test 2 : Calling /data2/functions.uppercase with extra argument\"); - response = ask(ControlAddress.of(\"/data2/functions.uppercase\"), - lowercase, \"extra argument\"); - \} + void test() { + log(INFO, "Test 2 : Calling /data2/functions.uppercase with extra argument"); + response = ask(ControlAddress.of("/data2/functions.uppercase"), + lowercase, "extra argument"); + } -" +} } @ ./test3 core:custom { - #%graph.x 124 - #%graph.y 391 - #%graph.comment Call no-op function - .code " + .meta [map graph.x 124 graph.y 391 graph.comment "Call no-op function"] + .code { - String lowercase = \"hello world\"; + String lowercase = "hello world"; String uppercase = lowercase.toUpperCase(Locale.ROOT); @Out(1) Output ok; @@ -128,39 +121,37 @@ import org.praxislive.core.services.SystemManagerService; @Persist Async response; @Override - public void update() \{ - if (response != null && response.done()) \{ - if (response.failed()) \{ - log(ERROR, \" \" + response.error()); + public void update() { + if (response != null && response.done()) { + if (response.failed()) { + log(ERROR, " " + response.error()); error.send(); - \} else \{ - if (!response.result().args().isEmpty()) \{ - log(ERROR, \" expected no response args but received \" + } else { + if (!response.result().args().isEmpty()) { + log(ERROR, " expected no response args but received " + response.result().args()); error.send(); - \} else \{ - log(INFO, \"Test 3 : OK\"); + } else { + log(INFO, "Test 3 : OK"); ok.send(); - \} - \} + } + } response = null; - \} - \} + } + } @T(1) - void test() \{ - log(INFO, \"Test 3 : Calling /data2/functions.no-op\"); - response = ask(ControlAddress.of(\"/data2/functions.no-op\")); - \} + void test() { + log(INFO, "Test 3 : Calling /data2/functions.no-op"); + response = ask(ControlAddress.of("/data2/functions.no-op")); + } -" +} } @ ./test4 core:custom { - #%graph.x 124 - #%graph.y 555 - #%graph.comment Call error function - .code " + .meta [map graph.x 124 graph.y 555 graph.comment "Call error function"] + .code { @Out(1) Output ok; @Out(2) Output error; @@ -168,42 +159,40 @@ import org.praxislive.core.services.SystemManagerService; @Persist Async response; @Override - public void update() \{ - if (response != null && response.done()) \{ - if (!response.failed()) \{ + public void update() { + if (response != null && response.done()) { + if (!response.failed()) { log(ERROR, - \" exception expected but call completed successfully with args \" + " exception expected but call completed successfully with args " + response.result().args() ); error.send(); - \} else if (!\"UnsupportedOperationException\".equals(response.error().errorType())) \{ - log(ERROR, \" wrong exception received : \" + } else if (!"UnsupportedOperationException".equals(response.error().errorType())) { + log(ERROR, " wrong exception received : " + response.error().errorType()); error.send(); - \} else \{ - log(INFO, \"Test 4 : OK\"); + } else { + log(INFO, "Test 4 : OK"); ok.send(); - \} + } response = null; - \} - \} + } + } @T(1) - void test() \{ - log(INFO, \"Test 4 : Calling /data2/functions.error\"); - response = ask(ControlAddress.of(\"/data2/functions.error\"), \"FOO\"); - \} + void test() { + log(INFO, "Test 4 : Calling /data2/functions.error"); + response = ask(ControlAddress.of("/data2/functions.error"), "FOO"); + } -" +} } @ ./test5 core:custom { - #%graph.x 433 - #%graph.y 74 - #%graph.comment Run async task - .code " + .meta [map graph.x 433 graph.y 74 graph.comment "Run async task"] + .code { - String lowercase = \"hello world\"; + String lowercase = "hello world"; String uppercase = lowercase.toUpperCase(Locale.ROOT); @Out(1) Output ok; @@ -212,39 +201,37 @@ import org.praxislive.core.services.SystemManagerService; @Persist Async response; @Override - public void update() \{ - if (response != null && response.done()) \{ - if (response.failed()) \{ - log(ERROR, \" \" + response.error()); + public void update() { + if (response != null && response.done()) { + if (response.failed()) { + log(ERROR, " " + response.error()); error.send(); - \} else \{ + } else { String result = response.result(); - if (!result.equals(uppercase)) \{ - log(ERROR, \" expected \" + uppercase + \" but got \" + result); + if (!result.equals(uppercase)) { + log(ERROR, " expected " + uppercase + " but got " + result); error.send(); - \} else \{ - log(INFO, \"Test 5 : OK\"); + } else { + log(INFO, "Test 5 : OK"); ok.send(); - \} - \} + } + } response = null; - \} - \} + } + } @T(1) - void test() \{ - log(INFO, \"Test 5 : Running async task\"); + void test() { + log(INFO, "Test 5 : Running async task"); response = async(lowercase, s -> s.toUpperCase(Locale.ROOT)); - \} + } -" +} } @ ./test6 core:custom { - #%graph.x 433 - #%graph.y 231 - #%graph.comment Run async task w/ Async.Queue - .code " - String lowercase = \"hello world\"; + .meta [map graph.x 433 graph.y 231 graph.comment "Run async task w/ Async.Queue"] + .code { + String lowercase = "hello world"; String uppercase = lowercase.toUpperCase(Locale.ROOT); @Out(1) Output ok; @@ -253,28 +240,70 @@ import org.praxislive.core.services.SystemManagerService; @Inject Async.Queue queue; @Override - public void init() \{ - queue.onDone(res -> \{ - if (!res.equals(uppercase)) \{ - log(ERROR, \" expected \" + uppercase + \" but got \" + res); + public void init() { + queue.onDone(res -> { + if (!res.equals(uppercase)) { + log(ERROR, " expected " + uppercase + " but got " + res); error.send(); - \} else \{ - log(INFO, \"Test 6 : OK\"); + } else { + log(INFO, "Test 6 : OK"); ok.send(); - \} - \}, err -> \{ - log(ERROR, \" \" + err); + } + }, err -> { + log(ERROR, " " + err); error.send(); - \}); - \} + }); + } @T(1) - void test() \{ - log(INFO, \"Test 6 : Running async task w/ Async.Queue\"); + void test() { + log(INFO, "Test 6 : Running async task w/ Async.Queue"); queue.add(async(lowercase, s -> s.toUpperCase(Locale.ROOT))); - \} + } + +} + } + @ ./test7 core:custom { + .meta [map graph.x 433 graph.y 391 graph.comment "Call uppercase-async function"] + .code { + + String lowercase = "hello world"; + String uppercase = lowercase.toUpperCase(Locale.ROOT); + + @Out(1) Output ok; + @Out(2) Output error; -" + @Persist Async response; + + @Override + public void update() { + if (response != null && response.done()) { + if (response.failed()) { + log(ERROR, " " + response.error()); + error.send(); + } else { + String result = response.result().args().get(0).toString(); + if (!result.equals(uppercase)) { + log(ERROR, " expected " + uppercase + " but got " + result); + error.send(); + } else { + log(INFO, "Test 1 : OK"); + ok.send(); + } + } + response = null; + } + } + + @T(1) + void test() { + log(INFO, "Test 7 : Calling /data2/functions.uppercase-async"); + response = ask(ControlAddress.of("/data2/functions.uppercase-async"), + List.of(V(lowercase))); + } + + +} } ~ ./test1!error ./exit!exit-fail ~ ./test2!error ./exit!exit-fail @@ -286,6 +315,8 @@ import org.praxislive.core.services.SystemManagerService; ~ ./test4!ok ./test5!test ~ ./test5!error ./exit!exit-fail ~ ./test5!ok ./test6!test - ~ ./test6!ok ./exit!exit-ok ~ ./test6!error ./exit!exit-fail + ~ ./test6!ok ./test7!test + ~ ./test7!ok ./exit!exit-ok + ~ ./test7!error ./exit!exit-fail } diff --git a/testsuite/tests/core/async-functions/data2.pxr b/testsuite/tests/core/async-functions/data2.pxr index f3d7a5fa..20b061fb 100644 --- a/testsuite/tests/core/async-functions/data2.pxr +++ b/testsuite/tests/core/async-functions/data2.pxr @@ -1,31 +1,29 @@ @ /data2 root:data { - #%praxis.version 5.7.0-SNAPSHOT + .meta [map praxis.version 5.7.0-SNAPSHOT] @ ./functions core:custom { - #%graph.x 264 - #%graph.y 184 - .code " - - - - @FN String uppercase(String input) \{ + .meta [map graph.x 265 graph.y 184] + .code { + @FN String uppercase(String input) { return input.toUpperCase(Locale.ROOT); - \} + } + + @FN Async uppercaseAsync(String input) { + return async(input, s -> s.toUpperCase(Locale.ROOT)); + } - @FN void noOp() \{\} + @FN void noOp() {} - @FN String error() \{ + @FN String error() { throw new UnsupportedOperationException(); - \} + } -" +} } @ ./start-trigger core:start-trigger { - #%graph.x 70 - #%graph.y 59 + .meta [map graph.x 70 graph.y 59] } @ ./send core:routing:send { - #%graph.x 455 - #%graph.y 76 + .meta [map graph.x 455 graph.y 76] .address /data1/test1.test } ~ ./start-trigger!out ./send!in