From 34032486990e519928464e39f732280e46858f87 Mon Sep 17 00:00:00 2001 From: JamesBirdsall Date: Mon, 26 Oct 2020 09:36:17 -0700 Subject: [PATCH] Change thenComposeAsync to thenCompose (#15819) * Change ComposeAsync to Compose in low client * Change ComposeAsync in low client test * Change ComposeAsync to Compose in EPH --- .../eventprocessorhost/PartitionManager.java | 14 +++++------ .../eventprocessorhost/PartitionPump.java | 24 +++++++++---------- .../eventprocessorhost/PartitionScanner.java | 22 ++++++++--------- .../eventhubs/impl/EventHubClientImpl.java | 24 +++++++++---------- .../azure/eventhubs/lib/TestBase.java | 2 +- 5 files changed, 43 insertions(+), 43 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java b/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java index 96d6ede652814..a62fa0235075b 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java @@ -51,11 +51,11 @@ CompletableFuture cachePartitionIds() { // Stage 0B: set up a way to close the EventHubClient when we're done .thenApplyAsync((ehClient) -> { final EventHubClient saveForCleanupClient = ehClient; - cleanupFuture.thenComposeAsync((empty) -> saveForCleanupClient.close(), this.hostContext.getExecutor()); + cleanupFuture.thenCompose((empty) -> saveForCleanupClient.close()); return ehClient; }, this.hostContext.getExecutor()) // Stage 1: use the client to get runtime info for the event hub - .thenComposeAsync((ehClient) -> ehClient.getRuntimeInformation(), this.hostContext.getExecutor()) + .thenCompose((ehClient) -> ehClient.getRuntimeInformation()) // Stage 2: extract the partition ids from the runtime info or throw on null (timeout) .thenAcceptAsync((EventHubRuntimeInformation ehInfo) -> { if (ehInfo != null) { @@ -147,7 +147,7 @@ public CompletableFuture initialize() { // Stage 0: get partition ids and cache return cachePartitionIds() // Stage 1: initialize stores, if stage 0 succeeded - .thenComposeAsync((unused) -> initializeStores(), this.hostContext.getExecutor()) + .thenCompose((unused) -> initializeStores()) // Stage 2: RUN REGARDLESS OF EXCEPTIONS -- trace errors .whenCompleteAsync((empty, e) -> { if (e != null) { @@ -215,7 +215,7 @@ private CompletableFuture initializeStores() { private CompletableFuture buildRetries(CompletableFuture buildOnto, Callable> lambda, String retryMessage, String finalFailureMessage, String action, int maxRetries) { // Stage 0: first attempt - CompletableFuture retryChain = buildOnto.thenComposeAsync((unused) -> { + CompletableFuture retryChain = buildOnto.thenCompose((unused) -> { CompletableFuture newresult = CompletableFuture.completedFuture(null); try { newresult = lambda.call(); @@ -223,7 +223,7 @@ private CompletableFuture buildRetries(CompletableFuture buildOnto, Callab throw new CompletionException(e1); } return newresult; - }, this.hostContext.getExecutor()); + }); for (int i = 1; i < maxRetries; i++) { retryChain = retryChain @@ -249,7 +249,7 @@ private CompletableFuture buildRetries(CompletableFuture buildOnto, Callab }, this.hostContext.getExecutor()) // Stages 2, 4, 6, etc: if we already have a valid result, pass it along. Otherwise, make another attempt. // Once we have a valid result there will be no more attempts or exceptions. - .thenComposeAsync((oldresult) -> { + .thenCompose((oldresult) -> { CompletableFuture newresult = CompletableFuture.completedFuture(oldresult); if (oldresult == null) { try { @@ -259,7 +259,7 @@ private CompletableFuture buildRetries(CompletableFuture buildOnto, Callab } } return newresult; - }, this.hostContext.getExecutor()); + }); } // Stage final: trace the exception with the final message, or pass along the valid result. retryChain = retryChain.handleAsync((r, e) -> { diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java b/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java index ec049c63d511d..11cb95d28bc81 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java @@ -56,8 +56,8 @@ class PartitionPump extends Closable implements PartitionReceiveHandler { this.pumpManagerCallback.accept(this.lease.getPartitionId()); return cancelPendingOperations(); }, this.hostContext.getExecutor()) - .thenComposeAsync((empty) -> cleanUpAll(this.shutdownReason), this.hostContext.getExecutor()) - .thenComposeAsync((empty) -> releaseLeaseOnShutdown(), this.hostContext.getExecutor()) + .thenCompose((empty) -> cleanUpAll(this.shutdownReason)) + .thenCompose((empty) -> releaseLeaseOnShutdown()) .whenCompleteAsync((empty, e) -> { setClosed(); }, this.hostContext.getExecutor()); @@ -70,7 +70,7 @@ CompletableFuture startPump() { // Do the slow startup stuff asynchronously. // Use whenComplete to trigger cleanup on exception. CompletableFuture.runAsync(() -> openProcessor(), this.hostContext.getExecutor()) - .thenComposeAsync((empty) -> openClientsRetryWrapper(), this.hostContext.getExecutor()) + .thenCompose((empty) -> openClientsRetryWrapper()) .thenRunAsync(() -> scheduleLeaseRenewer(), this.hostContext.getExecutor()) .whenCompleteAsync((r, e) -> { if (e != null) { @@ -127,9 +127,9 @@ private CompletableFuture openClientsRetryWrapper() { return (e == null) ? r : false; }, this.hostContext.getExecutor()) // Stages 2, 4, 6, etc: make another attempt if needed. - .thenComposeAsync((done) -> { + .thenCompose((done) -> { return done ? CompletableFuture.completedFuture(done) : openClients(); - }, this.hostContext.getExecutor()); + }); } // Stage final: on success, hook up the user's event handler to start receiving events. On error, // trace exceptions from the final attempt, or ReceiverDisconnectedException. @@ -197,9 +197,9 @@ private CompletableFuture openClients() { this.internalOperationFuture = null; }, this.hostContext.getExecutor()) // Stage 2: get initial offset for receiver - .thenComposeAsync((empty) -> this.partitionContext.getInitialOffset(), this.hostContext.getExecutor()) + .thenCompose((empty) -> this.partitionContext.getInitialOffset()) // Stage 3: set up other receiver options, create receiver if initial offset is valid - .thenComposeAsync((startAt) -> { + .thenCompose((startAt) -> { long epoch = this.lease.getEpoch(); TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, @@ -222,7 +222,7 @@ private CompletableFuture openClients() { } return receiverFuture; - }, this.hostContext.getExecutor()) + }) // Stage 4: save PartitionReceiver on success, trace on error .whenCompleteAsync((receiver, e) -> { if ((receiver != null) && (e == null)) { @@ -299,9 +299,9 @@ private CompletableFuture cleanUpClients() { // swallows all exceptions this.partitionReceiver = null; return partitionReceiverTemp; }, this.hostContext.getExecutor()) - .thenComposeAsync((partitionReceiverTemp) -> { + .thenCompose((partitionReceiverTemp) -> { return (partitionReceiverTemp != null) ? partitionReceiverTemp.close() : CompletableFuture.completedFuture(null); - }, this.hostContext.getExecutor()) + }) .handleAsync((empty, e) -> { if (e != null) { TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, @@ -319,9 +319,9 @@ private CompletableFuture cleanUpClients() { // swallows all exceptions } return eventHubClientTemp; }, this.hostContext.getExecutor()) - .thenComposeAsync((eventHubClientTemp) -> { + .thenCompose((eventHubClientTemp) -> { return (eventHubClientTemp != null) ? eventHubClientTemp.close() : CompletableFuture.completedFuture(null); - }, this.hostContext.getExecutor()) + }) .handleAsync((empty, e) -> { if (e != null) { TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Closing EH client failed."), diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java b/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java index bafa1af7b3815..20df0c1fb91d8 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java @@ -45,11 +45,11 @@ class PartitionScanner extends Closable { public CompletableFuture scan(boolean isFirst) { return getAllLeaseStates() - .thenComposeAsync((unused) -> { + .thenCompose((unused) -> { throwIfClosingOrClosed("PartitionScanner is shutting down"); int ourLeasesCount = sortLeasesAndCalculateDesiredCount(isFirst); return acquireExpiredInChunksParallel(0, this.desiredCount - ourLeasesCount); - }, this.hostContext.getExecutor()) + }) .thenApplyAsync((remainingNeeded) -> { throwIfClosingOrClosed("PartitionScanner is shutting down"); ArrayList stealThese = new ArrayList(); @@ -59,10 +59,10 @@ public CompletableFuture scan(boolean isFirst) { } return stealThese; }, this.hostContext.getExecutor()) - .thenComposeAsync((stealThese) -> { + .thenCompose((stealThese) -> { throwIfClosingOrClosed("PartitionScanner is shutting down"); return stealLeases(stealThese); - }, this.hostContext.getExecutor()) + }) .handleAsync((didSteal, e) -> { if ((e != null) && !(e instanceof ClosingException)) { StringBuilder outAction = new StringBuilder(); @@ -181,7 +181,7 @@ private CompletableFuture acquireExpiredInChunksParallel(int startAt, i final int endAt = Math.min(startAt + needed, this.allLeaseStates.size()); resultFuture = findExpiredLeases(startAt, endAt) - .thenComposeAsync((getThese) -> { + .thenCompose((getThese) -> { throwIfClosingOrClosed("PartitionScanner is shutting down"); CompletableFuture acquireFuture = CompletableFuture.completedFuture(null); if (getThese.size() > 0) { @@ -190,11 +190,11 @@ private CompletableFuture acquireExpiredInChunksParallel(int startAt, i throwIfClosingOrClosed("PartitionScanner is shutting down"); final AcquisitionHolder holder = new AcquisitionHolder(); CompletableFuture getOneFuture = this.hostContext.getLeaseManager().getLease(info.getPartitionId()) - .thenComposeAsync((lease) -> { + .thenCompose((lease) -> { throwIfClosingOrClosed("PartitionScanner is shutting down"); holder.setAcquiredLease(lease); return this.hostContext.getLeaseManager().acquireLease(lease); - }, this.hostContext.getExecutor()) + }) .thenAcceptAsync((acquired) -> { throwIfClosingOrClosed("PartitionScanner is shutting down"); if (acquired) { @@ -215,7 +215,7 @@ private CompletableFuture acquireExpiredInChunksParallel(int startAt, i acquireFuture = CompletableFuture.allOf(getFutures.toArray(dummy)); } return acquireFuture; - }, this.hostContext.getExecutor()) + }) .handleAsync((empty, e) -> { // log/notify if exception occurred, then swallow exception and continue with next chunk if ((e != null) && !(e instanceof ClosingException)) { @@ -226,7 +226,7 @@ private CompletableFuture acquireExpiredInChunksParallel(int startAt, i } return null; }, this.hostContext.getExecutor()) - .thenComposeAsync((unused) -> acquireExpiredInChunksParallel(endAt, runningNeeded.get()), this.hostContext.getExecutor()); + .thenCompose((unused) -> acquireExpiredInChunksParallel(endAt, runningNeeded.get())); } else { TRACE_LOGGER.debug(this.hostContext.withHost("Short circuit: needed is 0, unowned is 0, or off end")); } @@ -291,11 +291,11 @@ private CompletableFuture stealLeases(List stealThese) { final AcquisitionHolder holder = new AcquisitionHolder(); CompletableFuture oneSteal = this.hostContext.getLeaseManager().getLease(info.getPartitionId()) - .thenComposeAsync((lease) -> { + .thenCompose((lease) -> { throwIfClosingOrClosed("PartitionScanner is shutting down"); holder.setAcquiredLease(lease); return this.hostContext.getLeaseManager().acquireLease(lease); - }, this.hostContext.getExecutor()) + }) .thenAcceptAsync((acquired) -> { throwIfClosingOrClosed("PartitionScanner is shutting down"); if (acquired) { diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 1ab62e56fc987..5ed9a0ca92d4d 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -174,12 +174,12 @@ public CompletableFuture send(final EventData data) { throw new IllegalArgumentException("EventData cannot be empty."); } - return this.createInternalSender().thenComposeAsync(new Function>() { + return this.createInternalSender().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClientImpl.this.sender.send(((EventDataImpl) data).toAmqpMessage()); } - }, this.executor); + }); } @Override @@ -188,12 +188,12 @@ public CompletableFuture send(final Iterable eventDatas) { throw new IllegalArgumentException("Empty batch of EventData cannot be sent."); } - return this.createInternalSender().thenComposeAsync(new Function>() { + return this.createInternalSender().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClientImpl.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas)); } - }, this.executor); + }); } @Override @@ -218,12 +218,12 @@ public CompletableFuture send(final EventData eventData, final String part throw new IllegalArgumentException("partitionKey cannot be null."); } - return this.createInternalSender().thenComposeAsync(new Function>() { + return this.createInternalSender().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClientImpl.this.sender.send(((EventDataImpl) eventData).toAmqpMessage(partitionKey)); } - }, this.executor); + }); } @Override @@ -241,12 +241,12 @@ public CompletableFuture send(final Iterable eventDatas, final String.format(Locale.US, "PartitionKey exceeds the maximum allowed length of partitionKey: %s", ClientConstants.MAX_PARTITION_KEY_LENGTH)); } - return this.createInternalSender().thenComposeAsync(new Function>() { + return this.createInternalSender().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClientImpl.this.sender.send(EventDataUtil.toAmqpMessages(eventDatas, partitionKey)); } - }, this.executor); + }); } @Override @@ -289,12 +289,12 @@ public CompletableFuture onClose() { if (this.underlyingFactory != null) { synchronized (this.senderCreateSync) { final CompletableFuture internalSenderClose = this.sender != null - ? this.sender.close().thenComposeAsync(new Function>() { + ? this.sender.close().thenCompose(new Function>() { @Override public CompletableFuture apply(Void voidArg) { return EventHubClientImpl.this.underlyingFactory.close(); } - }, this.executor) + }) : this.underlyingFactory.close(); return internalSenderClose; @@ -332,7 +332,7 @@ public CompletableFuture getRuntimeInformation() { request.put(ClientConstants.MANAGEMENT_ENTITY_TYPE_KEY, ClientConstants.MANAGEMENT_EVENTHUB_ENTITY_TYPE); request.put(ClientConstants.MANAGEMENT_ENTITY_NAME_KEY, this.eventHubName); request.put(ClientConstants.MANAGEMENT_OPERATION_KEY, ClientConstants.READ_OPERATION_VALUE); - return addManagementToken(request).thenComposeAsync((requestWithToken) -> managementWithRetry(requestWithToken), this.executor). + return addManagementToken(request).thenCompose((requestWithToken) -> managementWithRetry(requestWithToken)). thenApplyAsync((rawdata) -> { return new EventHubRuntimeInformation( (String) rawdata.get(ClientConstants.MANAGEMENT_ENTITY_NAME_KEY), @@ -351,7 +351,7 @@ public CompletableFuture getPartitionRuntimeInforma request.put(ClientConstants.MANAGEMENT_ENTITY_NAME_KEY, this.eventHubName); request.put(ClientConstants.MANAGEMENT_PARTITION_NAME_KEY, partitionId); request.put(ClientConstants.MANAGEMENT_OPERATION_KEY, ClientConstants.READ_OPERATION_VALUE); - return addManagementToken(request).thenComposeAsync((requestWithToken) -> managementWithRetry(requestWithToken), this.executor). + return addManagementToken(request).thenCompose((requestWithToken) -> managementWithRetry(requestWithToken)). thenApplyAsync((rawdata) -> { return new PartitionRuntimeInformation( (String) rawdata.get(ClientConstants.MANAGEMENT_ENTITY_NAME_KEY), diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestBase.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestBase.java index ffe198658ae7b..fef57e85612ba 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestBase.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestBase.java @@ -31,7 +31,7 @@ protected TestBase(Logger logger) { public static CompletableFuture pushEventsToPartition(final EventHubClient ehClient, final String partitionId, final int noOfEvents) throws EventHubException { return ehClient.createPartitionSender(partitionId) - .thenComposeAsync(new Function>() { + .thenCompose(new Function>() { @Override public CompletableFuture apply(PartitionSender pSender) { @SuppressWarnings("unchecked") final CompletableFuture[] sends = new CompletableFuture[noOfEvents];