Skip to content

Commit

Permalink
Change thenComposeAsync to thenCompose (Azure#15819)
Browse files Browse the repository at this point in the history
* Change ComposeAsync to Compose in low client

* Change ComposeAsync in low client test

* Change ComposeAsync to Compose in EPH
  • Loading branch information
JamesBirdsall authored Oct 26, 2020
1 parent 32f8f7c commit 3403248
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ CompletableFuture<Void> cachePartitionIds() {
// Stage 0B: set up a way to close the EventHubClient when we're done
.thenApplyAsync((ehClient) -> {
final EventHubClient saveForCleanupClient = ehClient;
cleanupFuture.thenComposeAsync((empty) -> saveForCleanupClient.close(), this.hostContext.getExecutor());
cleanupFuture.thenCompose((empty) -> saveForCleanupClient.close());
return ehClient;
}, this.hostContext.getExecutor())
// Stage 1: use the client to get runtime info for the event hub
.thenComposeAsync((ehClient) -> ehClient.getRuntimeInformation(), this.hostContext.getExecutor())
.thenCompose((ehClient) -> ehClient.getRuntimeInformation())
// Stage 2: extract the partition ids from the runtime info or throw on null (timeout)
.thenAcceptAsync((EventHubRuntimeInformation ehInfo) -> {
if (ehInfo != null) {
Expand Down Expand Up @@ -147,7 +147,7 @@ public CompletableFuture<Void> initialize() {
// Stage 0: get partition ids and cache
return cachePartitionIds()
// Stage 1: initialize stores, if stage 0 succeeded
.thenComposeAsync((unused) -> initializeStores(), this.hostContext.getExecutor())
.thenCompose((unused) -> initializeStores())
// Stage 2: RUN REGARDLESS OF EXCEPTIONS -- trace errors
.whenCompleteAsync((empty, e) -> {
if (e != null) {
Expand Down Expand Up @@ -215,15 +215,15 @@ private CompletableFuture<?> initializeStores() {
private CompletableFuture<?> buildRetries(CompletableFuture<?> buildOnto, Callable<CompletableFuture<?>> lambda, String retryMessage,
String finalFailureMessage, String action, int maxRetries) {
// Stage 0: first attempt
CompletableFuture<?> retryChain = buildOnto.thenComposeAsync((unused) -> {
CompletableFuture<?> retryChain = buildOnto.thenCompose((unused) -> {
CompletableFuture<?> newresult = CompletableFuture.completedFuture(null);
try {
newresult = lambda.call();
} catch (Exception e1) {
throw new CompletionException(e1);
}
return newresult;
}, this.hostContext.getExecutor());
});

for (int i = 1; i < maxRetries; i++) {
retryChain = retryChain
Expand All @@ -249,7 +249,7 @@ private CompletableFuture<?> buildRetries(CompletableFuture<?> buildOnto, Callab
}, this.hostContext.getExecutor())
// Stages 2, 4, 6, etc: if we already have a valid result, pass it along. Otherwise, make another attempt.
// Once we have a valid result there will be no more attempts or exceptions.
.thenComposeAsync((oldresult) -> {
.thenCompose((oldresult) -> {
CompletableFuture<?> newresult = CompletableFuture.completedFuture(oldresult);
if (oldresult == null) {
try {
Expand All @@ -259,7 +259,7 @@ private CompletableFuture<?> buildRetries(CompletableFuture<?> buildOnto, Callab
}
}
return newresult;
}, this.hostContext.getExecutor());
});
}
// Stage final: trace the exception with the final message, or pass along the valid result.
retryChain = retryChain.handleAsync((r, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class PartitionPump extends Closable implements PartitionReceiveHandler {
this.pumpManagerCallback.accept(this.lease.getPartitionId());
return cancelPendingOperations();
}, this.hostContext.getExecutor())
.thenComposeAsync((empty) -> cleanUpAll(this.shutdownReason), this.hostContext.getExecutor())
.thenComposeAsync((empty) -> releaseLeaseOnShutdown(), this.hostContext.getExecutor())
.thenCompose((empty) -> cleanUpAll(this.shutdownReason))
.thenCompose((empty) -> releaseLeaseOnShutdown())
.whenCompleteAsync((empty, e) -> {
setClosed();
}, this.hostContext.getExecutor());
Expand All @@ -70,7 +70,7 @@ CompletableFuture<Void> startPump() {
// Do the slow startup stuff asynchronously.
// Use whenComplete to trigger cleanup on exception.
CompletableFuture.runAsync(() -> openProcessor(), this.hostContext.getExecutor())
.thenComposeAsync((empty) -> openClientsRetryWrapper(), this.hostContext.getExecutor())
.thenCompose((empty) -> openClientsRetryWrapper())
.thenRunAsync(() -> scheduleLeaseRenewer(), this.hostContext.getExecutor())
.whenCompleteAsync((r, e) -> {
if (e != null) {
Expand Down Expand Up @@ -127,9 +127,9 @@ private CompletableFuture<Void> openClientsRetryWrapper() {
return (e == null) ? r : false;
}, this.hostContext.getExecutor())
// Stages 2, 4, 6, etc: make another attempt if needed.
.thenComposeAsync((done) -> {
.thenCompose((done) -> {
return done ? CompletableFuture.completedFuture(done) : openClients();
}, this.hostContext.getExecutor());
});
}
// Stage final: on success, hook up the user's event handler to start receiving events. On error,
// trace exceptions from the final attempt, or ReceiverDisconnectedException.
Expand Down Expand Up @@ -197,9 +197,9 @@ private CompletableFuture<Boolean> openClients() {
this.internalOperationFuture = null;
}, this.hostContext.getExecutor())
// Stage 2: get initial offset for receiver
.thenComposeAsync((empty) -> this.partitionContext.getInitialOffset(), this.hostContext.getExecutor())
.thenCompose((empty) -> this.partitionContext.getInitialOffset())
// Stage 3: set up other receiver options, create receiver if initial offset is valid
.thenComposeAsync((startAt) -> {
.thenCompose((startAt) -> {
long epoch = this.lease.getEpoch();

TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext,
Expand All @@ -222,7 +222,7 @@ private CompletableFuture<Boolean> openClients() {
}

return receiverFuture;
}, this.hostContext.getExecutor())
})
// Stage 4: save PartitionReceiver on success, trace on error
.whenCompleteAsync((receiver, e) -> {
if ((receiver != null) && (e == null)) {
Expand Down Expand Up @@ -299,9 +299,9 @@ private CompletableFuture<Void> cleanUpClients() { // swallows all exceptions
this.partitionReceiver = null;
return partitionReceiverTemp;
}, this.hostContext.getExecutor())
.thenComposeAsync((partitionReceiverTemp) -> {
.thenCompose((partitionReceiverTemp) -> {
return (partitionReceiverTemp != null) ? partitionReceiverTemp.close() : CompletableFuture.completedFuture(null);
}, this.hostContext.getExecutor())
})
.handleAsync((empty, e) -> {
if (e != null) {
TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext,
Expand All @@ -319,9 +319,9 @@ private CompletableFuture<Void> cleanUpClients() { // swallows all exceptions
}
return eventHubClientTemp;
}, this.hostContext.getExecutor())
.thenComposeAsync((eventHubClientTemp) -> {
.thenCompose((eventHubClientTemp) -> {
return (eventHubClientTemp != null) ? eventHubClientTemp.close() : CompletableFuture.completedFuture(null);
}, this.hostContext.getExecutor())
})
.handleAsync((empty, e) -> {
if (e != null) {
TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Closing EH client failed."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ class PartitionScanner extends Closable {

public CompletableFuture<Boolean> scan(boolean isFirst) {
return getAllLeaseStates()
.thenComposeAsync((unused) -> {
.thenCompose((unused) -> {
throwIfClosingOrClosed("PartitionScanner is shutting down");
int ourLeasesCount = sortLeasesAndCalculateDesiredCount(isFirst);
return acquireExpiredInChunksParallel(0, this.desiredCount - ourLeasesCount);
}, this.hostContext.getExecutor())
})
.thenApplyAsync((remainingNeeded) -> {
throwIfClosingOrClosed("PartitionScanner is shutting down");
ArrayList<BaseLease> stealThese = new ArrayList<BaseLease>();
Expand All @@ -59,10 +59,10 @@ public CompletableFuture<Boolean> scan(boolean isFirst) {
}
return stealThese;
}, this.hostContext.getExecutor())
.thenComposeAsync((stealThese) -> {
.thenCompose((stealThese) -> {
throwIfClosingOrClosed("PartitionScanner is shutting down");
return stealLeases(stealThese);
}, this.hostContext.getExecutor())
})
.handleAsync((didSteal, e) -> {
if ((e != null) && !(e instanceof ClosingException)) {
StringBuilder outAction = new StringBuilder();
Expand Down Expand Up @@ -181,7 +181,7 @@ private CompletableFuture<Integer> acquireExpiredInChunksParallel(int startAt, i
final int endAt = Math.min(startAt + needed, this.allLeaseStates.size());

resultFuture = findExpiredLeases(startAt, endAt)
.thenComposeAsync((getThese) -> {
.thenCompose((getThese) -> {
throwIfClosingOrClosed("PartitionScanner is shutting down");
CompletableFuture<Void> acquireFuture = CompletableFuture.completedFuture(null);
if (getThese.size() > 0) {
Expand All @@ -190,11 +190,11 @@ private CompletableFuture<Integer> acquireExpiredInChunksParallel(int startAt, i
throwIfClosingOrClosed("PartitionScanner is shutting down");
final AcquisitionHolder holder = new AcquisitionHolder();
CompletableFuture<Void> getOneFuture = this.hostContext.getLeaseManager().getLease(info.getPartitionId())
.thenComposeAsync((lease) -> {
.thenCompose((lease) -> {
throwIfClosingOrClosed("PartitionScanner is shutting down");
holder.setAcquiredLease(lease);
return this.hostContext.getLeaseManager().acquireLease(lease);
}, this.hostContext.getExecutor())
})
.thenAcceptAsync((acquired) -> {
throwIfClosingOrClosed("PartitionScanner is shutting down");
if (acquired) {
Expand All @@ -215,7 +215,7 @@ private CompletableFuture<Integer> acquireExpiredInChunksParallel(int startAt, i
acquireFuture = CompletableFuture.allOf(getFutures.toArray(dummy));
}
return acquireFuture;
}, this.hostContext.getExecutor())
})
.handleAsync((empty, e) -> {
// log/notify if exception occurred, then swallow exception and continue with next chunk
if ((e != null) && !(e instanceof ClosingException)) {
Expand All @@ -226,7 +226,7 @@ private CompletableFuture<Integer> acquireExpiredInChunksParallel(int startAt, i
}
return null;
}, this.hostContext.getExecutor())
.thenComposeAsync((unused) -> acquireExpiredInChunksParallel(endAt, runningNeeded.get()), this.hostContext.getExecutor());
.thenCompose((unused) -> acquireExpiredInChunksParallel(endAt, runningNeeded.get()));
} else {
TRACE_LOGGER.debug(this.hostContext.withHost("Short circuit: needed is 0, unowned is 0, or off end"));
}
Expand Down Expand Up @@ -291,11 +291,11 @@ private CompletableFuture<Boolean> stealLeases(List<BaseLease> stealThese) {

final AcquisitionHolder holder = new AcquisitionHolder();
CompletableFuture<Void> oneSteal = this.hostContext.getLeaseManager().getLease(info.getPartitionId())
.thenComposeAsync((lease) -> {
.thenCompose((lease) -> {
throwIfClosingOrClosed("PartitionScanner is shutting down");
holder.setAcquiredLease(lease);
return this.hostContext.getLeaseManager().acquireLease(lease);
}, this.hostContext.getExecutor())
})
.thenAcceptAsync((acquired) -> {
throwIfClosingOrClosed("PartitionScanner is shutting down");
if (acquired) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ public CompletableFuture<Void> send(final EventData data) {
throw new IllegalArgumentException("EventData cannot be empty.");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClientImpl.this.sender.send(((EventDataImpl) data).toAmqpMessage());
}
}, this.executor);
});
}

@Override
Expand All @@ -188,12 +188,12 @@ public CompletableFuture<Void> send(final Iterable<EventData> eventDatas) {
throw new IllegalArgumentException("Empty batch of EventData cannot be sent.");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClientImpl.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas));
}
}, this.executor);
});
}

@Override
Expand All @@ -218,12 +218,12 @@ public CompletableFuture<Void> send(final EventData eventData, final String part
throw new IllegalArgumentException("partitionKey cannot be null.");
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClientImpl.this.sender.send(((EventDataImpl) eventData).toAmqpMessage(partitionKey));
}
}, this.executor);
});
}

@Override
Expand All @@ -241,12 +241,12 @@ public CompletableFuture<Void> send(final Iterable<EventData> eventDatas, final
String.format(Locale.US, "PartitionKey exceeds the maximum allowed length of partitionKey: %s", ClientConstants.MAX_PARTITION_KEY_LENGTH));
}

return this.createInternalSender().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
return this.createInternalSender().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClientImpl.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas, partitionKey));
}
}, this.executor);
});
}

@Override
Expand Down Expand Up @@ -289,12 +289,12 @@ public CompletableFuture<Void> onClose() {
if (this.underlyingFactory != null) {
synchronized (this.senderCreateSync) {
final CompletableFuture<Void> internalSenderClose = this.sender != null
? this.sender.close().thenComposeAsync(new Function<Void, CompletableFuture<Void>>() {
? this.sender.close().thenCompose(new Function<Void, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(Void voidArg) {
return EventHubClientImpl.this.underlyingFactory.close();
}
}, this.executor)
})
: this.underlyingFactory.close();

return internalSenderClose;
Expand Down Expand Up @@ -332,7 +332,7 @@ public CompletableFuture<EventHubRuntimeInformation> getRuntimeInformation() {
request.put(ClientConstants.MANAGEMENT_ENTITY_TYPE_KEY, ClientConstants.MANAGEMENT_EVENTHUB_ENTITY_TYPE);
request.put(ClientConstants.MANAGEMENT_ENTITY_NAME_KEY, this.eventHubName);
request.put(ClientConstants.MANAGEMENT_OPERATION_KEY, ClientConstants.READ_OPERATION_VALUE);
return addManagementToken(request).thenComposeAsync((requestWithToken) -> managementWithRetry(requestWithToken), this.executor).
return addManagementToken(request).thenCompose((requestWithToken) -> managementWithRetry(requestWithToken)).
thenApplyAsync((rawdata) -> {
return new EventHubRuntimeInformation(
(String) rawdata.get(ClientConstants.MANAGEMENT_ENTITY_NAME_KEY),
Expand All @@ -351,7 +351,7 @@ public CompletableFuture<PartitionRuntimeInformation> getPartitionRuntimeInforma
request.put(ClientConstants.MANAGEMENT_ENTITY_NAME_KEY, this.eventHubName);
request.put(ClientConstants.MANAGEMENT_PARTITION_NAME_KEY, partitionId);
request.put(ClientConstants.MANAGEMENT_OPERATION_KEY, ClientConstants.READ_OPERATION_VALUE);
return addManagementToken(request).thenComposeAsync((requestWithToken) -> managementWithRetry(requestWithToken), this.executor).
return addManagementToken(request).thenCompose((requestWithToken) -> managementWithRetry(requestWithToken)).
thenApplyAsync((rawdata) -> {
return new PartitionRuntimeInformation(
(String) rawdata.get(ClientConstants.MANAGEMENT_ENTITY_NAME_KEY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected TestBase(Logger logger) {
public static CompletableFuture<Void> pushEventsToPartition(final EventHubClient ehClient, final String partitionId, final int noOfEvents)
throws EventHubException {
return ehClient.createPartitionSender(partitionId)
.thenComposeAsync(new Function<PartitionSender, CompletableFuture<Void>>() {
.thenCompose(new Function<PartitionSender, CompletableFuture<Void>>() {
@Override
public CompletableFuture<Void> apply(PartitionSender pSender) {
@SuppressWarnings("unchecked") final CompletableFuture<Void>[] sends = new CompletableFuture[noOfEvents];
Expand Down

0 comments on commit 3403248

Please sign in to comment.