Skip to content

Commit

Permalink
Merge pull request praxis-live#70 from codelerity/async-fn
Browse files Browse the repository at this point in the history
Support Async return from functions
  • Loading branch information
neilcsmith-net authored Dec 9, 2024
2 parents 0d3c075 + 540f996 commit fdca7bc
Show file tree
Hide file tree
Showing 10 changed files with 831 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ public final Container getParent() {
public void parentNotify(Container parent) throws VetoException {
if (parent == null) {
if (this.parent != null) {
this.parent = null;
disconnectAll();
codeCtxt.handleDispose();
this.parent = null;
}
} else {
if (this.parent != null) {
Expand All @@ -104,9 +105,6 @@ public void hierarchyChanged() {
router = null;
logInfo = null;
codeCtxt.handleHierarchyChanged();
if (getAddress() == null) {
codeCtxt.handleDispose();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,15 @@ protected final void resetAndInitialize() {
}

final void handleDispose() {
cmp = null;
handleHierarchyChanged();
refs.values().forEach(ReferenceDescriptor::dispose);
refs.clear();
controls.values().forEach(ControlDescriptor::dispose);
controls.clear();
ports.values().forEach(PortDescriptor::dispose);
ports.clear();
dispose();
cmp = null;
handleHierarchyChanged();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 2023 Neil C Smith.
* Copyright 2024 Neil C Smith.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License version 3 only, as
Expand All @@ -24,43 +24,71 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.praxislive.code.userapi.Async;
import org.praxislive.code.userapi.FN;
import org.praxislive.core.ArgumentInfo;
import org.praxislive.core.Call;
import org.praxislive.core.Control;
import org.praxislive.core.ControlInfo;
import org.praxislive.core.Info;
import org.praxislive.core.PacketRouter;
import org.praxislive.core.Value;
import org.praxislive.core.ValueMapper;
import org.praxislive.core.services.LogLevel;
import org.praxislive.core.types.PError;
import org.praxislive.core.types.PMap;

/**
*
*/
class FunctionDescriptor extends ControlDescriptor<FunctionDescriptor> {

private final Method method;
private final ControlInfo info;
private final FunctionControl control;
private final boolean async;
private final List<ValueMapper<?>> parameterMappers;
private final ValueMapper<?> returnMapper;

private FunctionControl control;

private FunctionDescriptor(String id,
int index,
Method method,
ControlInfo info,
List<ValueMapper<?>> parameterMappers,
ValueMapper<?> returnMapper) {
ValueMapper<?> returnMapper,
boolean async) {
super(FunctionDescriptor.class, id, Category.Function, index);
this.method = method;
this.info = info;
this.control = new DirectFunctionControl(method, parameterMappers, returnMapper);
this.parameterMappers = parameterMappers;
this.returnMapper = returnMapper;
this.async = async;
}

@Override
public void attach(CodeContext<?> context, FunctionDescriptor previous) {
if (previous != null) {
previous.control.detach();
if (isCompatible(previous)) {
control = previous.control;
} else {
previous.dispose();
}
}
control.attach(context);
if (control == null) {
if (async) {
control = new AsyncFunctionControl(parameterMappers, returnMapper);
} else {
control = new DirectFunctionControl(parameterMappers, returnMapper);
}
}
control.attach(context, method);
}

@Override
Expand All @@ -73,13 +101,33 @@ public ControlInfo controlInfo() {
return info;
}

@Override
public void dispose() {
if (control != null) {
control.dispose();
}
}

@Override
public void onStop() {
if (control != null) {
control.onStop();
}
}

private boolean isCompatible(FunctionDescriptor other) {
return method.getGenericReturnType().equals(other.method.getGenericReturnType())
&& Arrays.equals(method.getGenericParameterTypes(),
other.method.getGenericParameterTypes());
}

static FunctionDescriptor create(CodeConnector<?> connector, FN ann, Method method) {
method.setAccessible(true);
var parameterTypes = method.getParameterTypes();
var parameterMappers = new ValueMapper<?>[parameterTypes.length];
Class<?>[] parameterTypes = method.getParameterTypes();
ValueMapper<?>[] parameterMappers = new ValueMapper<?>[parameterTypes.length];
for (int i = 0; i < parameterMappers.length; i++) {
var type = parameterTypes[i];
var mapper = ValueMapper.find(type);
Class<?> type = parameterTypes[i];
ValueMapper<?> mapper = ValueMapper.find(type);
if (mapper == null) {
connector.getLog().log(LogLevel.ERROR,
"Unsupported parameter type " + type.getSimpleName()
Expand All @@ -88,10 +136,25 @@ static FunctionDescriptor create(CodeConnector<?> connector, FN ann, Method meth
}
parameterMappers[i] = mapper;
}
var returnType = method.getReturnType();

boolean async = false;
Class<?> returnType = method.getReturnType();
ValueMapper<?> returnMapper;
if (returnType == Void.TYPE) {
returnMapper = null;
} else if (returnType == Async.class) {
async = true;
Class<?> asyncReturnType = TypeUtils.extractRawType(
TypeUtils.extractTypeParameter(method.getGenericReturnType())
);
returnMapper = asyncReturnType == null ? null
: ValueMapper.find(asyncReturnType);
if (returnMapper == null) {
connector.getLog().log(LogLevel.ERROR,
"Unsupported Async type " + method.getGenericReturnType()
+ " in method " + method.getName());
return null;
}
} else {
returnMapper = ValueMapper.find(returnType);
if (returnMapper == null) {
Expand All @@ -101,21 +164,21 @@ static FunctionDescriptor create(CodeConnector<?> connector, FN ann, Method meth
return null;
}
}
var id = connector.findID(method);
String id = connector.findID(method);

ArgumentInfo[] inputArgInfo = new ArgumentInfo[parameterMappers.length];
for (int i = 0; i < inputArgInfo.length; i++) {
var type = parameterMappers[i].valueType();
var properties = type.emptyValue()
Value.Type<?> type = parameterMappers[i].valueType();
PMap properties = type.emptyValue()
.map(empty -> PMap.EMPTY)
.orElse(PMap.of(ArgumentInfo.KEY_ALLOW_EMPTY, true));
inputArgInfo[i] = ArgumentInfo.of(type.asClass(), properties);
}

ArgumentInfo[] outputArgInfo;
if (returnMapper != null) {
var type = returnMapper.valueType();
var properties = type.emptyValue()
Value.Type<?> type = returnMapper.valueType();
PMap properties = type.emptyValue()
.map(empty -> PMap.EMPTY)
.orElse(PMap.of(ArgumentInfo.KEY_ALLOW_EMPTY, true));
outputArgInfo = new ArgumentInfo[]{
Expand All @@ -131,37 +194,40 @@ static FunctionDescriptor create(CodeConnector<?> connector, FN ann, Method meth
.build();

return new FunctionDescriptor(id, ann.value(), method, controlInfo,
List.of(parameterMappers), returnMapper);
List.of(parameterMappers), returnMapper, async);
}

private static abstract class FunctionControl implements Control {

abstract void attach(CodeContext<?> context);
abstract void attach(CodeContext<?> context, Method method);

abstract void dispose();

abstract void detach();
void onStop() {

}

}

private static class DirectFunctionControl extends FunctionControl {

private final Method method;
private final List<ValueMapper<?>> parameterMappers;
private final ValueMapper<?> returnMapper;
private final ValueMapper<Object> returnMapper;

private CodeContext<?> context;
private Method method;

private DirectFunctionControl(Method method,
List<ValueMapper<?>> parameterMappers,
@SuppressWarnings("unchecked")
private DirectFunctionControl(List<ValueMapper<?>> parameterMappers,
ValueMapper<?> returnMapper) {
this.method = method;
this.parameterMappers = parameterMappers;
this.returnMapper = returnMapper;
this.returnMapper = (ValueMapper<Object>) returnMapper;
}

@Override
public void call(Call call, PacketRouter router) throws Exception {
if (call.isRequest()) {
var args = call.args();
List<Value> args = call.args();
int reqCount = parameterMappers.size();
if (args.size() < reqCount) {
throw new IllegalArgumentException("Not enough arguments in call");
Expand All @@ -175,17 +241,14 @@ public void call(Call call, PacketRouter router) throws Exception {
() -> method.invoke(context.getDelegate(), parameters));
if (call.isReplyRequired()) {
if (returnMapper != null) {
@SuppressWarnings("unchecked")
var mapper = (ValueMapper<Object>) returnMapper;
router.route(call.reply(mapper.toValue(response)));
router.route(call.reply(returnMapper.toValue(response)));
} else {
router.route(call.reply());
}
}
} catch (InvocationTargetException ex) {
var cause = ex.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
if (ex.getCause() instanceof Exception exc) {
throw exc;
} else {
throw ex;
}
Expand All @@ -194,14 +257,106 @@ public void call(Call call, PacketRouter router) throws Exception {
}

@Override
void attach(CodeContext<?> context) {
void attach(CodeContext<?> context, Method method) {
this.context = context;
this.method = method;
}

@Override
void detach() {
void dispose() {
this.context = null;
this.method = null;
}

}

private static class AsyncFunctionControl extends FunctionControl {

private final List<ValueMapper<?>> parameterMappers;
private final ValueMapper<Object> returnMapper;
private final Map<Call, CompletableFuture<?>> pending;

private CodeContext<?> context;
private Method method;

@SuppressWarnings("unchecked")
private AsyncFunctionControl(List<ValueMapper<?>> parameterMappers,
ValueMapper<?> returnMapper) {
this.parameterMappers = parameterMappers;
this.returnMapper = (ValueMapper<Object>) returnMapper;
pending = new LinkedHashMap<>();
}

@Override
@SuppressWarnings("unchecked")
public void call(Call call, PacketRouter router) throws Exception {
if (call.isRequest()) {
List<Value> args = call.args();
int reqCount = parameterMappers.size();
if (args.size() < reqCount) {
throw new IllegalArgumentException("Not enough arguments in call");
}
Object[] parameters = new Object[reqCount];
for (int i = 0; i < parameters.length; i++) {
parameters[i] = parameterMappers.get(i).fromValue(args.get(i));
}
try {
Object response = context.invokeCallable(call.time(),
() -> method.invoke(context.getDelegate(), parameters));
if (call.isReplyRequired()) {
Async<Object> async = (Async<Object>) response;
CompletableFuture<Object> future = Async.toCompletableFuture(async);
pending.put(call, future);
future.whenComplete((result, error) -> {
if (pending.remove(call) != null) {
PacketRouter rtr = context.getComponent().getPacketRouter();
if (result != null) {
try {
rtr.route(call.reply(returnMapper.toValue(result)));
} catch (Exception ex) {
rtr.route(call.error(PError.of(ex)));
}
} else {
PError pe = PError.of(error instanceof Exception e
? e : new Exception(error));
rtr.route(call.error(pe));
}
}

});
}
} catch (InvocationTargetException ex) {
if (ex.getCause() instanceof Exception exc) {
throw exc;
} else {
throw ex;
}
}
}
}

@Override
void attach(CodeContext<?> context, Method method) {
this.context = context;
this.method = method;
}

@Override
void onStop() {
if (!pending.isEmpty() && context != null) {
List<CompletableFuture<?>> futures = new ArrayList<>(pending.values());
futures.forEach(f -> f.cancel(false));
}
pending.clear();
}

@Override
void dispose() {
onStop();
this.context = null;
this.method = null;
}

}

}
Loading

0 comments on commit fdca7bc

Please sign in to comment.