diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java index 081946029..2ba4c192d 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecChain.java @@ -32,6 +32,7 @@ import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.CancellableDependency; import org.apache.hc.core5.http.HttpException; @@ -116,7 +117,7 @@ interface Scheduler { * @param request the actual request. * @param entityProducer the request entity producer or {@code null} if the request * does not enclose an entity. - * @param scope the execution scope . + * @param scope the execution scope. * @param asyncExecCallback the execution callback. * @param delay re-execution delay. Can be {@code null} if the request is to be * re-executed immediately. @@ -128,6 +129,31 @@ void scheduleExecution( AsyncExecCallback asyncExecCallback, TimeValue delay); + /** + * Schedules request re-execution of the given execution chain immediately or + * after a delay. + * @param request the actual request. + * @param entityProducer the request entity producer or {@code null} if the request + * does not enclose an entity. + * @param scope the execution scope. + * @param chain the execution chain. + * @param asyncExecCallback the execution callback. + * @param delay re-execution delay. Can be {@code null} if the request is to be + * re-executed immediately. + * + * @since 5.3 + */ + @Internal + default void scheduleExecution( + HttpRequest request, + AsyncEntityProducer entityProducer, + AsyncExecChain.Scope scope, + AsyncExecChain chain, + AsyncExecCallback asyncExecCallback, + TimeValue delay) { + scheduleExecution(request, entityProducer, scope, asyncExecCallback, delay); + } + } /** diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java index a38bfd42f..9fc7c3296 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncHttpRequestRetryExec.java @@ -131,7 +131,13 @@ public void completed() { if (entityProducer != null) { entityProducer.releaseResources(); } - scope.scheduler.scheduleExecution(request, entityProducer, scope, asyncExecCallback, state.delay); + scope.scheduler.scheduleExecution( + request, + entityProducer, + scope, + (r, e, s, c) -> execute(r, e, s, chain, c), + asyncExecCallback, + state.delay); } else { asyncExecCallback.completed(); } @@ -161,7 +167,13 @@ public void failed(final Exception cause) { state.retrying = true; final int execCount = scope.execCount.incrementAndGet(); state.delay = retryStrategy.getRetryInterval(request, (IOException) cause, execCount - 1, clientContext); - scope.scheduler.scheduleExecution(request, entityProducer, scope, asyncExecCallback, state.delay); + scope.scheduler.scheduleExecution( + request, + entityProducer, + scope, + (r, e, s, c) -> execute(r, e, s, chain, c), + asyncExecCallback, + state.delay); return; } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java index 4e9a7379b..277437062 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java @@ -92,6 +92,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa private final RequestConfig defaultConfig; private final ConcurrentLinkedQueue closeables; private final ScheduledExecutorService scheduledExecutorService; + private final AsyncExecChain.Scheduler scheduler; InternalAbstractHttpAsyncClient( final DefaultConnectingIOReactor ioReactor, @@ -113,6 +114,30 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa this.defaultConfig = defaultConfig; this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null; this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY); + this.scheduler = new AsyncExecChain.Scheduler() { + + @Override + public void scheduleExecution( + final HttpRequest request, + final AsyncEntityProducer entityProducer, + final AsyncExecChain.Scope scope, + final AsyncExecCallback asyncExecCallback, + final TimeValue delay) { + executeScheduled(request, entityProducer, scope, execChain::execute, asyncExecCallback, delay); + } + + @Override + public void scheduleExecution( + final HttpRequest request, + final AsyncEntityProducer entityProducer, + final AsyncExecChain.Scope scope, + final AsyncExecChain chain, + final AsyncExecCallback asyncExecCallback, + final TimeValue delay) { + executeScheduled(request, entityProducer, scope, chain, asyncExecCallback, delay); + } + }; + } @Override @@ -197,8 +222,6 @@ protected Future doExecute( } final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory); - final AsyncExecChain.Scheduler scheduler = this::executeScheduled; - final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future, clientContext, execRuntime, scheduler, new AtomicInteger(1)); final AtomicBoolean outputTerminated = new AtomicBoolean(false); @@ -262,6 +285,7 @@ public void produce(final DataStreamChannel channel) throws IOException { } : null, scope, + execChain::execute, new AsyncExecCallback() { @Override @@ -343,18 +367,20 @@ void executeImmediate( final HttpRequest request, final AsyncEntityProducer entityProducer, final AsyncExecChain.Scope scope, + final AsyncExecChain chain, final AsyncExecCallback asyncExecCallback) throws HttpException, IOException { - execChain.execute(request, entityProducer, scope, asyncExecCallback); + chain.proceed(request, entityProducer, scope, asyncExecCallback); } void executeScheduled( final HttpRequest request, final AsyncEntityProducer entityProducer, final AsyncExecChain.Scope scope, + final AsyncExecChain chain, final AsyncExecCallback asyncExecCallback, final TimeValue delay) { final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution( - request, entityProducer, scope, asyncExecCallback, delay); + request, entityProducer, scope, chain, asyncExecCallback, delay); if (TimeValue.isPositive(delay)) { scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit()); } else { @@ -367,17 +393,20 @@ class ScheduledRequestExecution implements Runnable, Cancellable { final HttpRequest request; final AsyncEntityProducer entityProducer; final AsyncExecChain.Scope scope; + final AsyncExecChain chain; final AsyncExecCallback asyncExecCallback; final TimeValue delay; ScheduledRequestExecution(final HttpRequest request, final AsyncEntityProducer entityProducer, final AsyncExecChain.Scope scope, + final AsyncExecChain chain, final AsyncExecCallback asyncExecCallback, final TimeValue delay) { this.request = request; this.entityProducer = entityProducer; this.scope = scope; + this.chain = chain; this.asyncExecCallback = asyncExecCallback; this.delay = delay; } @@ -385,7 +414,7 @@ class ScheduledRequestExecution implements Runnable, Cancellable { @Override public void run() { try { - execChain.execute(request, entityProducer, scope, asyncExecCallback); + chain.proceed(request, entityProducer, scope, asyncExecCallback); } catch (final Exception ex) { asyncExecCallback.failed(ex); }