Skip to content

Commit

Permalink
Add support in Async for extracting arguments from a Call.
Browse files Browse the repository at this point in the history
Also update ResponseHandler to no longer use weak references.
  • Loading branch information
neilcsmith-net committed Dec 4, 2024
1 parent 0d3c075 commit 3017786
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 2023 Neil C Smith.
* Copyright 2024 Neil C Smith.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License version 3 only, as
Expand All @@ -21,7 +21,6 @@
*/
package org.praxislive.code;

import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -35,9 +34,6 @@
import org.praxislive.core.services.LogLevel;
import org.praxislive.core.types.PError;

/**
*
*/
class ResponseHandler extends ControlDescriptor<ResponseHandler> implements Control {

static final String ID = "_reply";
Expand All @@ -64,20 +60,19 @@ public void attach(CodeContext<?> context, ResponseHandler previous) {
@Override
public void call(Call call, PacketRouter router) throws Exception {
if (call.isReply()) {
var asyncRef = resultMap.remove(call.matchID());
AsyncReference<?> asyncRef = resultMap.remove(call.matchID());
if (asyncRef != null) {
asyncRef.complete(call);
}
} else if (call.isError()) {
var error = extractError(call.args());
var asyncRef = resultMap.remove(call.matchID());
PError error = extractError(call.args());
AsyncReference<?> asyncRef = resultMap.remove(call.matchID());
if (asyncRef == null || !asyncRef.completeWithError(error)) {
context.getLog().log(LogLevel.ERROR, error);
}
} else if (call.isReplyRequired()) {
router.route(call.error(PError.of("Unexpected call")));
}
cleanResultMap();
}

@Override
Expand All @@ -93,65 +88,55 @@ public ControlInfo controlInfo() {
@Override
public void dispose() {
resultMap.forEach((id, ref) -> {
Async<?> async = ref.get();
if (async != null) {
async.fail(PError.of("Disposed"));
}
ref.completeWithError(PError.of("Disposed"));
});
resultMap.clear();
}

void register(Call call, Async<Call> async) {
register(call, async, Function.identity());
}

<T> void register(Call call, Async<T> async, Function<Call, T> converter) {
cleanResultMap();
resultMap.put(call.matchID(), new AsyncReference(async, converter));
}

private void cleanResultMap() {
resultMap.entrySet().removeIf(e -> e.getValue().get() == null);
<T> void register(Call call, Async<T> async, Function<Call, T> converter) {
resultMap.put(call.matchID(), new AsyncReference(call, async, converter));
}

private PError extractError(List<Value> args) {
if (args.isEmpty()) {
return UNKNOWN_ERROR;
} else {
return PError.from(args.get(0))
.orElse(PError.of(args.get(0).toString()));
.orElseGet(() -> PError.of(args.get(0).toString()));
}
}

private static class AsyncReference<T> extends WeakReference<Async<T>> {
private static class AsyncReference<T> {

private final Call call;
private final Async<T> async;
private final Function<Call, T> converter;

private AsyncReference(Async<T> referent, Function<Call, T> converter) {
super(referent);

private AsyncReference(Call call, Async<T> async, Function<Call, T> converter) {
this.call = call;
this.async = async;
this.converter = converter;
}


private Call call() {
return call;
}

private void complete(Call call) {
Async<T> async = get();
if (async != null) {
try {
T value = converter.apply(call);
async.complete(value);
} catch (Exception ex) {
async.fail(PError.of(ex));
}
try {
T value = converter.apply(call);
async.complete(value);
} catch (Exception ex) {
async.fail(PError.of(ex));
}

}

private boolean completeWithError(PError error) {
Async<T> async = get();
if (async != null) {
return async.fail(error);
} else {
return false;
}
return async.fail(error);
}

}
Expand Down
133 changes: 129 additions & 4 deletions praxiscore-code/src/main/java/org/praxislive/code/userapi/Async.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 2023 Neil C Smith.
* Copyright 2024 Neil C Smith.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License version 3 only, as
Expand All @@ -27,7 +27,11 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import org.praxislive.core.Call;
import org.praxislive.core.Value;
import org.praxislive.core.ValueMapper;
import org.praxislive.core.types.PError;
import org.praxislive.core.types.PReference;

/**
* A lightweight holder for a future value, the result of an asynchronous
Expand Down Expand Up @@ -146,6 +150,91 @@ private void unlink(Link<T> link) {
}
}

/**
* Create an Async that will complete when the provided async call
* completes, by extracting the first call argument and attempting to map to
* the given type. The returned Async will complete with an error if the
* call completes with an error, the argument isn't available, or the
* argument cannot be mapped to the given type.
* <p>
* Any existing {@link Link} on the passed in async call will be removed.
*
* @param <T> type of created async
* @param asyncCall async call
* @param type class type of created async
* @return created async
*/
public static <T> Async<T> extractArg(Async<Call> asyncCall, Class<T> type) {
return extractArg(asyncCall, type, 0);
}

/**
* Create an Async that will complete when the provided async call
* completes, by extracting the indexed call argument and attempting to map
* to the given type. The returned Async will complete with an error if the
* call completes with an error, the argument isn't available, or the
* argument cannot be mapped to the given type.
* <p>
* Any existing {@link Link} on the passed in async call will be removed.
*
* @param <T> type of created async
* @param asyncCall async call
* @param type class type of created async
* @param argIdx index of argument to extract
* @return created async
*/
public static <T> Async<T> extractArg(Async<Call> asyncCall, Class<T> type, int argIdx) {
Objects.requireNonNull(type);
Async<T> asyncValue = new Async<>();
if (asyncCall.done()) {
completeExtract(asyncCall, asyncValue, type, argIdx);
} else {
asyncCall.link(new Handler<>(
async -> completeExtract(async, asyncValue, type, argIdx),
async -> asyncValue.fail(PError.of("unlinked"))
));
}
return asyncValue;
}

private static <T> void completeExtract(Async<Call> asyncCall, Async<T> asyncValue, Class<T> type, int argIdx) {
try {
if (asyncCall.failed()) {
asyncValue.fail(asyncCall.error());
} else {
Call call = asyncCall.result();
List<Value> args = call.args();
if (call.isError()) {
PError err;
if (args.isEmpty()) {
err = PError.of("Unknown error");
} else {
err = PError.from(args.get(0))
.orElseGet(() -> PError.of(args.get(0).toString()));
}
asyncValue.fail(err);
} else {
Value value = args.get(argIdx);
T result;
if (Value.class == type) {
result = type.cast(value);
} else if (value instanceof PReference ref) {
if (PReference.class == type) {
result = type.cast(ref);
} else {
result = ref.as(type).orElseThrow(ClassCastException::new);
}
} else {
result = ValueMapper.find(type).fromValue(value);
}
asyncValue.complete(result);
}
}
} catch (Exception ex) {
asyncValue.fail(PError.of(ex));
}
}

/**
* A task intended to be run asynchronously and outside of the main
* component context. All data required to complete the task should be
Expand All @@ -172,7 +261,19 @@ public static interface Task<T, R> {

}

private static abstract class Link<T> {
/**
* A Link is something attached to an Async to react to its completion. An
* Async may only have one Link at a time. Calling any function or passing
* an Async to a type that adds a Link to an Async will remove any other
* Link attached to it.
*
* @param <T> Async result type
*/
public static sealed abstract class Link<T> {

Link() {

}

abstract void processDone(Async<T> async);

Expand All @@ -187,8 +288,10 @@ void removeOnRelink(Async<T> async) {
* Async instances that have completed, or a handler can be attached to run
* on completion.
* <p>
* An Async may only be in one queue at a time. Adding to the queue will
* automatically remove from any previous queue.
* An Async may only have one {@link Link} at a time. Adding an Async to the
* queue will automatically remove any previous link. After adding to the
* queue, calling any function that replaces the link will remove it from
* the queue.
* <p>
* A queue cannot be constructed directly. Use the {@link Inject} annotation
* on a field. Queues will have handlers and limits automatically removed on
Expand Down Expand Up @@ -382,4 +485,26 @@ void removeOnRelink(Async<T> async) {

}

static final class Handler<T> extends Link<T> {

private final Consumer<Async<T>> onDoneHandler;
private final Consumer<Async<T>> onRelinkHandler;

Handler(Consumer<Async<T>> onDoneHandler, Consumer<Async<T>> onRelinkHandler) {
this.onDoneHandler = onDoneHandler;
this.onRelinkHandler = onRelinkHandler;
}

@Override
void processDone(Async<T> async) {
onDoneHandler.accept(async);
}

@Override
void removeOnRelink(Async<T> async) {
onRelinkHandler.accept(async);
}

}

}
Loading

0 comments on commit 3017786

Please sign in to comment.