Skip to content

Commit

Permalink
HTTPCLIENT-2291: fixed inconsistency in behavior between the class an…
Browse files Browse the repository at this point in the history
…d async implementation of the request re-execution. The async request retry exec will now re-start request execution from itself instead of form the very beginning of the execution chain
  • Loading branch information
ok2c committed Aug 31, 2023
1 parent 9748d1b commit 8a54e70
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.HttpException;
Expand Down Expand Up @@ -116,7 +117,7 @@ interface Scheduler {
* @param request the actual request.
* @param entityProducer the request entity producer or {@code null} if the request
* does not enclose an entity.
* @param scope the execution scope .
* @param scope the execution scope.
* @param asyncExecCallback the execution callback.
* @param delay re-execution delay. Can be {@code null} if the request is to be
* re-executed immediately.
Expand All @@ -128,6 +129,31 @@ void scheduleExecution(
AsyncExecCallback asyncExecCallback,
TimeValue delay);

/**
* Schedules request re-execution of the given execution chain immediately or
* after a delay.
* @param request the actual request.
* @param entityProducer the request entity producer or {@code null} if the request
* does not enclose an entity.
* @param scope the execution scope.
* @param chain the execution chain.
* @param asyncExecCallback the execution callback.
* @param delay re-execution delay. Can be {@code null} if the request is to be
* re-executed immediately.
*
* @since 5.3
*/
@Internal
default void scheduleExecution(
HttpRequest request,
AsyncEntityProducer entityProducer,
AsyncExecChain.Scope scope,
AsyncExecChain chain,
AsyncExecCallback asyncExecCallback,
TimeValue delay) {
scheduleExecution(request, entityProducer, scope, asyncExecCallback, delay);
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ public void completed() {
if (entityProducer != null) {
entityProducer.releaseResources();
}
scope.scheduler.scheduleExecution(request, entityProducer, scope, asyncExecCallback, state.delay);
scope.scheduler.scheduleExecution(
request,
entityProducer,
scope,
(r, e, s, c) -> execute(r, e, s, chain, c),
asyncExecCallback,
state.delay);
} else {
asyncExecCallback.completed();
}
Expand Down Expand Up @@ -161,7 +167,13 @@ public void failed(final Exception cause) {
state.retrying = true;
final int execCount = scope.execCount.incrementAndGet();
state.delay = retryStrategy.getRetryInterval(request, (IOException) cause, execCount - 1, clientContext);
scope.scheduler.scheduleExecution(request, entityProducer, scope, asyncExecCallback, state.delay);
scope.scheduler.scheduleExecution(
request,
entityProducer,
scope,
(r, e, s, c) -> execute(r, e, s, chain, c),
asyncExecCallback,
state.delay);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
private final RequestConfig defaultConfig;
private final ConcurrentLinkedQueue<Closeable> closeables;
private final ScheduledExecutorService scheduledExecutorService;
private final AsyncExecChain.Scheduler scheduler;

InternalAbstractHttpAsyncClient(
final DefaultConnectingIOReactor ioReactor,
Expand All @@ -113,6 +114,30 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
this.defaultConfig = defaultConfig;
this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
this.scheduler = new AsyncExecChain.Scheduler() {

@Override
public void scheduleExecution(
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecCallback asyncExecCallback,
final TimeValue delay) {
executeScheduled(request, entityProducer, scope, execChain::execute, asyncExecCallback, delay);
}

@Override
public void scheduleExecution(
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback,
final TimeValue delay) {
executeScheduled(request, entityProducer, scope, chain, asyncExecCallback, delay);
}
};

}

@Override
Expand Down Expand Up @@ -197,8 +222,6 @@ protected <T> Future<T> doExecute(
}
final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);

final AsyncExecChain.Scheduler scheduler = this::executeScheduled;

final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
clientContext, execRuntime, scheduler, new AtomicInteger(1));
final AtomicBoolean outputTerminated = new AtomicBoolean(false);
Expand Down Expand Up @@ -262,6 +285,7 @@ public void produce(final DataStreamChannel channel) throws IOException {

} : null,
scope,
execChain::execute,
new AsyncExecCallback() {

@Override
Expand Down Expand Up @@ -343,18 +367,20 @@ void executeImmediate(
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
execChain.execute(request, entityProducer, scope, asyncExecCallback);
chain.proceed(request, entityProducer, scope, asyncExecCallback);
}

void executeScheduled(
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback,
final TimeValue delay) {
final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
request, entityProducer, scope, asyncExecCallback, delay);
request, entityProducer, scope, chain, asyncExecCallback, delay);
if (TimeValue.isPositive(delay)) {
scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
} else {
Expand All @@ -367,25 +393,28 @@ class ScheduledRequestExecution implements Runnable, Cancellable {
final HttpRequest request;
final AsyncEntityProducer entityProducer;
final AsyncExecChain.Scope scope;
final AsyncExecChain chain;
final AsyncExecCallback asyncExecCallback;
final TimeValue delay;

ScheduledRequestExecution(final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback,
final TimeValue delay) {
this.request = request;
this.entityProducer = entityProducer;
this.scope = scope;
this.chain = chain;
this.asyncExecCallback = asyncExecCallback;
this.delay = delay;
}

@Override
public void run() {
try {
execChain.execute(request, entityProducer, scope, asyncExecCallback);
chain.proceed(request, entityProducer, scope, asyncExecCallback);
} catch (final Exception ex) {
asyncExecCallback.failed(ex);
}
Expand Down

0 comments on commit 8a54e70

Please sign in to comment.