Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streamline resource framework #1253

Merged
merged 26 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package gov.nasa.jpl.aerie.contrib.streamline.core;

import gov.nasa.jpl.aerie.contrib.streamline.core.monads.ErrorCatchingMonad;
import gov.nasa.jpl.aerie.merlin.framework.CellRef;
import gov.nasa.jpl.aerie.merlin.protocol.model.CellType;
import gov.nasa.jpl.aerie.merlin.protocol.model.EffectTrait;
import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;

import java.util.function.BinaryOperator;
import java.util.function.Predicate;

import static gov.nasa.jpl.aerie.contrib.streamline.core.ErrorCatching.failure;
import static gov.nasa.jpl.aerie.contrib.streamline.core.Expiring.expiring;
import static gov.nasa.jpl.aerie.contrib.streamline.debugging.Naming.*;
import static gov.nasa.jpl.aerie.merlin.protocol.types.Duration.ZERO;

/**
* Utility class for a simplified allocate method.
*/
public final class CellRefV2 {
private CellRefV2() {}

/**
* Allocate a new resource with an explicitly given effect type and effect trait.
*/
public static <D extends Dynamics<?, D>, E extends DynamicsEffect<D>> CellRef<E, Cell<D>> allocate(ErrorCatching<Expiring<D>> initialDynamics, EffectTrait<E> effectTrait) {
return CellRef.allocate(new Cell<>(initialDynamics), new CellType<>() {
@Override
public EffectTrait<E> getEffectType() {
return effectTrait;
}

@Override
public Cell<D> duplicate(Cell<D> cell) {
return new Cell<>(cell.initialDynamics, cell.dynamics, cell.elapsedTime);
}

@Override
public void apply(Cell<D> cell, E effect) {
cell.initialDynamics = effect.apply(cell.dynamics).match(
ErrorCatching::success,
error -> failure(new RuntimeException(
"Applying '%s' failed.".formatted(getEffectName(effect)), error)));
cell.dynamics = cell.initialDynamics;
cell.elapsedTime = ZERO;
}

@Override
public void step(Cell<D> cell, Duration duration) {
// Avoid accumulated round-off error in imperfect stepping
// by always stepping up from the initial dynamics
cell.elapsedTime = cell.elapsedTime.plus(duration);
cell.dynamics = ErrorCatchingMonad.map(cell.initialDynamics, d ->
expiring(d.data().step(cell.elapsedTime), d.expiry().minus(cell.elapsedTime)));
}
});
}

public static <D extends Dynamics<?, D>> EffectTrait<DynamicsEffect<D>> noncommutingEffects() {
return resolvingConcurrencyBy((left, right) -> x -> {
throw new UnsupportedOperationException(
"Concurrent effects are not supported on this resource.");
});
}

public static <D extends Dynamics<?, D>> EffectTrait<DynamicsEffect<D>> commutingEffects() {
return resolvingConcurrencyBy((left, right) -> x -> right.apply(left.apply(x)));
}

public static <D extends Dynamics<?, D>> EffectTrait<DynamicsEffect<D>> autoEffects() {
return autoEffects(testing((CommutativityTestInput<D> input) -> input.leftResult.equals(input.rightResult)));
}

public static <D extends Dynamics<?, D>> EffectTrait<DynamicsEffect<D>> autoEffects(
Predicate<CommutativityTestInput<ErrorCatching<Expiring<D>>>> commutativityTest) {
return resolvingConcurrencyBy((left, right) -> x -> {
final var lrx = left.apply(right.apply(x));
final var rlx = right.apply(left.apply(x));
if (commutativityTest.test(new CommutativityTestInput<>(x, lrx, rlx))) {
return lrx;
} else {
throw new UnsupportedOperationException(
"Detected non-commuting concurrent effects!");
}
});
}


/**
* Lift a commutativity test from data to dynamics,
* correctly comparing expiry and error information in the process.
*/
public static <D> Predicate<CommutativityTestInput<ErrorCatching<Expiring<D>>>> testing(Predicate<CommutativityTestInput<D>> test) {
// If both expiring, compare expiry and data
// If both error, compare error contents
// If one is expiring and the other is error, return false
return input -> input.leftResult.match(
leftExpiring -> input.rightResult.match(
rightExpiring -> leftExpiring.expiry().equals(rightExpiring.expiry()) && test.test(new CommutativityTestInput<>(
input.original.match(Expiring::data, $ -> leftExpiring.data()),
leftExpiring.data(),
rightExpiring.data())),
rightError -> false),
leftError -> input.rightResult.match(
rightExpiring -> false,
rightError -> Resources.equivalentExceptions(leftError, rightError)));
}

public record CommutativityTestInput<D>(D original, D leftResult, D rightResult) {}

public static <D extends Dynamics<?, D>> EffectTrait<DynamicsEffect<D>> resolvingConcurrencyBy(BinaryOperator<DynamicsEffect<D>> combineConcurrent) {
return new EffectTrait<>() {
@Override
public DynamicsEffect<D> empty() {
final DynamicsEffect<D> result = x -> x;
name(result, "No-op");
return result;
}

@Override
public DynamicsEffect<D> sequentially(final DynamicsEffect<D> prefix, final DynamicsEffect<D> suffix) {
final DynamicsEffect<D> result = x -> suffix.apply(prefix.apply(x));
name(result, "(%s) then (%s)".formatted(getEffectName(prefix), getEffectName(suffix)));
return result;
}

@Override
public DynamicsEffect<D> concurrently(final DynamicsEffect<D> left, final DynamicsEffect<D> right) {
try {
final DynamicsEffect<D> combined = combineConcurrent.apply(left, right);
final DynamicsEffect<D> result = x -> {
try {
return combined.apply(x);
} catch (Exception e) {
return failure(e);
}
};
name(result, "(%s) and (%s)".formatted(getEffectName(left), getEffectName(right)));
return result;
} catch (Throwable e) {
final DynamicsEffect<D> result = $ -> failure(e);
name(result, "Failed to combine concurrent effects: (%s) and (%s)".formatted(
getEffectName(left), getEffectName(right)));
return result;
}
}
};
}

private static <D extends Dynamics<?, D>, E extends DynamicsEffect<D>> String getEffectName(E effect) {
return getName(effect).orElse("anonymous effect");
}

public static class Cell<D> {
public ErrorCatching<Expiring<D>> initialDynamics;
public ErrorCatching<Expiring<D>> dynamics;
public Duration elapsedTime;

public Cell(ErrorCatching<Expiring<D>> dynamics) {
this(dynamics, dynamics, ZERO);
}

public Cell(ErrorCatching<Expiring<D>> initialDynamics, ErrorCatching<Expiring<D>> dynamics, Duration elapsedTime) {
this.initialDynamics = initialDynamics;
this.dynamics = dynamics;
this.elapsedTime = elapsedTime;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package gov.nasa.jpl.aerie.contrib.streamline.core;

import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;

/**
* A single segment of a resource profile;
* a value which evolves as time passes.
*/
public interface Dynamics<V, D extends Dynamics<V, D>> {
/**
* Get the current value.
*/
V extract();

/**
* Evolve for the given time.
*
* @apiNote This method should always return the same value when called on the same object with the same duration
*/
D step(Duration t);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package gov.nasa.jpl.aerie.contrib.streamline.core;

/**
* General interface for an effect applied to a {@link MutableResource}
*/
public interface DynamicsEffect<D extends Dynamics<?, D>> {
ErrorCatching<Expiring<D>> apply(ErrorCatching<Expiring<D>> dynamics);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package gov.nasa.jpl.aerie.contrib.streamline.core;

import gov.nasa.jpl.aerie.contrib.streamline.core.monads.ErrorCatchingMonad;

import java.util.function.Function;

/**
* Sum type representing a value or a failure to produce a value.
*/
public sealed interface ErrorCatching<T> {
<R> R match(Function<T, R> onSuccess, Function<Throwable, R> onError);

static <T> ErrorCatching<T> success(T result) {
return new Success<>(result);
}

static <T> ErrorCatching<T> failure(Throwable exception) {
return new Failure<>(exception);
}

default <R> ErrorCatching<R> map(Function<T, R> f) {
return ErrorCatchingMonad.map(this, f);
}

default T getOrThrow() {
return match(
Function.identity(),
e -> {
throw new RuntimeException(e);
});
}

record Success<T>(T result) implements ErrorCatching<T> {
@Override
public <R> R match(final Function<T, R> onSuccess, final Function<Throwable, R> onError) {
return onSuccess.apply(result);
}
}

record Failure<T>(Throwable exception) implements ErrorCatching<T> {
@Override
public <R> R match(final Function<T, R> onSuccess, final Function<Throwable, R> onError) {
return onError.apply(exception);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package gov.nasa.jpl.aerie.contrib.streamline.core;

import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;

import static gov.nasa.jpl.aerie.contrib.streamline.core.Expiry.NEVER;

/**
* A value which may be valid for a limited time.
*/
public record Expiring<D>(D data, Expiry expiry) {
public static <D> Expiring<D> expiring(D data, Expiry expiry) {
return new Expiring<>(data, expiry);
}

public static <D> Expiring<D> neverExpiring(D data) {
return expiring(data, NEVER);
}

public static <D> Expiring<D> expiring(D data, Duration expiry) {
return expiring(data, Expiry.at(expiry));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package gov.nasa.jpl.aerie.contrib.streamline.core;

import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;

import java.util.Optional;
import java.util.stream.Stream;

/**
* The time at which a value expires.
*/
public record Expiry(Optional<Duration> value) {
public static Expiry NEVER = expiry(Optional.empty());

public static Expiry at(Duration t) {
return expiry(Optional.of(t));
}

public static Expiry expiry(Optional<Duration> value) {
return new Expiry(value);
}

public Expiry or(Expiry other) {
return expiry(
Stream.concat(value().stream(), other.value().stream()).reduce(Duration::min));
}

public Expiry minus(Duration t) {
return expiry(value().map(v -> v.minus(t)));
}

public boolean isNever() {
return value().isEmpty();
}

public int compareTo(Expiry other) {
if (this.isNever()) {
if (other.isNever()) {
return 0;
} else {
return 1;
}
} else {
if (other.isNever()) {
return -1;
} else {
return this.value().get().compareTo(other.value().get());
}
}
}

public boolean shorterThan(Expiry other) {
return this.compareTo(other) < 0;
}

public boolean noShorterThan(Expiry other) {
return this.compareTo(other) >= 0;
}

public boolean longerThan(Expiry other) {
return this.compareTo(other) > 0;
}

public boolean noLongerThan(Expiry other) {
return this.compareTo(other) <= 0;
}

@Override
public String toString() {
return value.map(Duration::toString).orElse("NEVER");
}
}
Loading
Loading