From 68dcf9ff36395811de4a2d85d64e5bfb04424a14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Malte=20Grebe-L=C3=BCth?= Date: Tue, 5 Nov 2024 15:15:00 +0100 Subject: [PATCH] Added reconnection capabilities for the test client. --- configuration/config.toml | 2 + sdccc/pom.xml | 2 +- .../configuration/DefaultTestSuiteConfig.java | 2 + .../sdccc/configuration/TestSuiteConfig.java | 2 + .../sdccc/sdcri/testclient/TestClient.java | 7 + .../sdcri/testclient/TestClientImpl.java | 184 ++++++++++++++---- .../draeger/medical/sdccc/TestSuiteIT.java | 94 ++++++++- 7 files changed, 256 insertions(+), 37 deletions(-) diff --git a/configuration/config.toml b/configuration/config.toml index 1ca3070c..9ce5f4c4 100644 --- a/configuration/config.toml +++ b/configuration/config.toml @@ -28,6 +28,8 @@ EnabledCiphers = [ InterfaceAddress="127.0.0.1" MaxWait=10 MulticastTTL=128 +ReconnectTries=3 +ReconnectWait=5 [SDCcc.Consumer] DeviceEpr="urn:uuid:857bf583-8a51-475f-a77f-d0ca7de69b11" diff --git a/sdccc/pom.xml b/sdccc/pom.xml index 6541e273..0dc6d1b0 100644 --- a/sdccc/pom.xml +++ b/sdccc/pom.xml @@ -15,7 +15,7 @@ UTF-8 5.10.2 1.10.2 - 6.0.0-SNAPSHOT + 6.2.0-SNAPSHOT 2.17.1 4.7.3 ../checkstyle diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/DefaultTestSuiteConfig.java b/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/DefaultTestSuiteConfig.java index 71d0a76a..347d1b2b 100644 --- a/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/DefaultTestSuiteConfig.java +++ b/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/DefaultTestSuiteConfig.java @@ -75,6 +75,8 @@ void configureNetwork() { bind(TestSuiteConfig.NETWORK_INTERFACE_ADDRESS, String.class, "127.0.0.1"); bind(TestSuiteConfig.NETWORK_MAX_WAIT, long.class, 10L); bind(TestSuiteConfig.NETWORK_MULTICAST_TTL, long.class, 128L); + bind(TestSuiteConfig.NETWORK_RECONNECT_TRIES, long.class, 3L); + bind(TestSuiteConfig.NETWORK_RECONNECT_WAIT, long.class, 5L); } void configureGRpc() { diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/TestSuiteConfig.java b/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/TestSuiteConfig.java index 95e864d2..e13910be 100644 --- a/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/TestSuiteConfig.java +++ b/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/TestSuiteConfig.java @@ -44,6 +44,8 @@ public final class TestSuiteConfig { public static final String NETWORK_INTERFACE_ADDRESS = SDCCC + NETWORK + "InterfaceAddress"; public static final String NETWORK_MAX_WAIT = SDCCC + NETWORK + "MaxWait"; public static final String NETWORK_MULTICAST_TTL = SDCCC + NETWORK + "MulticastTTL"; // should be between 0 and 255 + public static final String NETWORK_RECONNECT_TRIES = SDCCC + NETWORK + "ReconnectTries"; + public static final String NETWORK_RECONNECT_WAIT = SDCCC + NETWORK + "ReconnectWait"; /* * Consumer configuration diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClient.java b/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClient.java index 13f73da4..f70c480e 100644 --- a/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClient.java +++ b/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClient.java @@ -54,6 +54,13 @@ public interface TestClient { */ void connect() throws InterceptorException, TransportException, IOException; + /** + * Enable or disable reconnection attempts on connection loss. + * + * @param shouldReconnect true if reconnect should be attempted, false otherwise. + */ + void shouldReconnect(Boolean shouldReconnect); + /** * Disconnects the SDC client from the target. * diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClientImpl.java b/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClientImpl.java index 6f801e2f..e17e5e13 100644 --- a/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClientImpl.java +++ b/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClientImpl.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +37,7 @@ import javax.annotation.Nullable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.somda.sdc.common.util.ExecutorWrapperService; import org.somda.sdc.dpws.DpwsFramework; import org.somda.sdc.dpws.client.Client; import org.somda.sdc.dpws.client.DiscoveredDevice; @@ -59,6 +62,7 @@ public class TestClientImpl extends AbstractIdleService implements TestClient, W private static final Logger LOG = LogManager.getLogger(TestClientImpl.class); private static final String COULDN_T_CONNECT_TO_TARGET = "Couldn't connect to target"; + private static final String COULDN_T_DISCONNECT = "Could not disconnect the test client"; private static final String LOCATION_CONTEXT_SCOPE_STRING_START = "sdc.ctxt.loc:"; private static final String MATCHING = "matching"; @@ -101,20 +105,30 @@ public class TestClientImpl extends AbstractIdleService implements TestClient, W private HostingServiceProxy hostingServiceProxy; private List targetXAddrs; + private final ExecutorWrapperService reconnectExecutor; + private final AtomicBoolean isConnected; + private final Object lock; + private final long reconnectTries; + private final long reconnectWait; + private final AtomicBoolean reconnectEnabled; + private final AtomicBoolean inReconnectProcess; + /** * Creates an SDCri consumer instance. * - * @param targetDeviceEpr EPR address to filter for - * @param targetDeviceFacility facility to filter for - * @param targetDeviceBuilding building to filter for - * @param targetDevicePointOfCare point of care to filter for - * @param targetDeviceFloor floor to filter for - * @param targetDeviceRoom room to filter for - * @param targetDeviceBed bed to filter for - * @param adapterAddress ip of the network interface to bind to - * @param maxWait max waiting time to find and connect to target device - * @param testClientUtil test client utility - * @param testRunObserver observer for invalidating test runs on unexpected errors + * @param targetDeviceEpr EPR address to filter for + * @param targetDeviceFacility facility to filter for + * @param targetDeviceBuilding building to filter for + * @param targetDevicePointOfCare point of care to filter for + * @param targetDeviceFloor floor to filter for + * @param targetDeviceRoom room to filter for + * @param targetDeviceBed bed to filter for + * @param adapterAddress ip of the network interface to bind to + * @param maxWait max waiting time to find and connect to target device + * @param reconnectTries number of tries a reconnection is attempted + * @param reconnectWait the wait time between reconnection attempts in seconds + * @param testClientUtil test client utility + * @param testRunObserver observer for invalidating test runs on unexpected errors * @param testClientMdibAccessObserver observer for changes to the mdib */ @Inject @@ -129,6 +143,8 @@ public TestClientImpl( @Named(TestSuiteConfig.CONSUMER_DEVICE_LOCATION_BED) final @Nullable String targetDeviceBed, @Named(TestSuiteConfig.NETWORK_INTERFACE_ADDRESS) final String adapterAddress, @Named(TestSuiteConfig.NETWORK_MAX_WAIT) final long maxWait, + @Named(TestSuiteConfig.NETWORK_RECONNECT_TRIES) final long reconnectTries, + @Named(TestSuiteConfig.NETWORK_RECONNECT_WAIT) final long reconnectWait, final TestClientUtil testClientUtil, final TestRunObserver testRunObserver, final TestClientMdibAccessObserver testClientMdibAccessObserver) { @@ -137,6 +153,12 @@ public TestClientImpl( this.connector = injector.getInstance(SdcRemoteDevicesConnector.class); this.testRunObserver = testRunObserver; this.shouldBeConnected = new AtomicBoolean(false); + this.isConnected = new AtomicBoolean(false); + this.reconnectEnabled = new AtomicBoolean(false); + this.inReconnectProcess = new AtomicBoolean(false); + this.lock = new Object(); + this.reconnectTries = reconnectTries; + this.reconnectWait = reconnectWait; this.maxWait = Duration.ofSeconds(maxWait); this.testClientMdibAccessObserver = testClientMdibAccessObserver; @@ -187,6 +209,8 @@ public TestClientImpl( this.floorSearchLogString, this.roomSearchLogString, this.bedSearchLogString); + reconnectExecutor = new ExecutorWrapperService<>( + Executors::newSingleThreadExecutor, "TestClientExecutor", "InstanceIdentifier"); } @Override @@ -196,10 +220,12 @@ protected void startUp() { this.dpwsFramework.setNetworkInterface(networkInterface); dpwsFramework.startAsync().awaitRunning(); client.startAsync().awaitRunning(); + reconnectExecutor.startAsync().awaitRunning(); } @Override protected void shutDown() { + reconnectExecutor.stopAsync().awaitTerminated(); client.stopAsync().awaitTerminated(); dpwsFramework.stopAsync().awaitTerminated(); } @@ -318,16 +344,29 @@ void deviceFound(final ProbedDeviceFoundMessage message) { throw new IOException(COULDN_T_CONNECT_TO_TARGET); } } catch (final InterruptedException | TimeoutException | ExecutionException e) { - LOG.error( - "Couldn't find a device with {}, {}, {}, {}, {}, {} and {}", - this.eprSearchLogString, - this.facilitySearchLogString, - this.buildingSearchLogString, - this.pointOfCareSearchLogString, - this.floorSearchLogString, - this.roomSearchLogString, - this.bedSearchLogString, - e); + if (inReconnectProcess.get()) { + LOG.info( + "Tried to reconnect, but couldn't find a device with {}, {}, {}, {}, {}, {} and {}", + this.eprSearchLogString, + this.facilitySearchLogString, + this.buildingSearchLogString, + this.pointOfCareSearchLogString, + this.floorSearchLogString, + this.roomSearchLogString, + this.bedSearchLogString, + e); + } else { + LOG.error( + "Couldn't find a device with {}, {}, {}, {}, {}, {} and {}", + this.eprSearchLogString, + this.facilitySearchLogString, + this.buildingSearchLogString, + this.pointOfCareSearchLogString, + this.floorSearchLogString, + this.roomSearchLogString, + this.bedSearchLogString, + e); + } throw new IOException(COULDN_T_CONNECT_TO_TARGET); } finally { client.unregisterDiscoveryObserver(obs); @@ -359,17 +398,29 @@ void deviceFound(final ProbedDeviceFoundMessage message) { } sdcRemoteDevice.registerWatchdogObserver(this); + isConnected.set(true); + } + + @Override + public void shouldReconnect(final Boolean shouldReconnect) { + this.reconnectEnabled.set(shouldReconnect); } @Override public synchronized void disconnect() throws TimeoutException { - shouldBeConnected.set(false); + disconnect(true); + } + + private synchronized void disconnect(final Boolean expected) throws TimeoutException { + shouldBeConnected.set(expected); if (sdcRemoteDevice != null) { sdcRemoteDevice.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS); sdcRemoteDevice = null; } hostingServiceProxy = null; - client.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS); + if (expected) { + client.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS); + } } @Override @@ -384,12 +435,12 @@ public SdcRemoteDevicesConnector getConnector() { @Override public SdcRemoteDevice getSdcRemoteDevice() { - return sdcRemoteDevice; + return restrictedGetter(sdcRemoteDevice); } @Override public HostingServiceProxy getHostingServiceProxy() { - return hostingServiceProxy; + return restrictedGetter(hostingServiceProxy); } @Override @@ -410,17 +461,31 @@ public Injector getInjector() { @Subscribe void onConnectionLoss(final WatchdogMessage watchdogMessage) { LOG.info("Watchdog detected disconnect from provider."); - if (shouldBeConnected.get()) { - testRunObserver.invalidateTestRun(String.format( - "Lost connection to device %s. Reason: %s", - watchdogMessage.getPayload(), watchdogMessage.getReason().getMessage())); - try { - disconnect(); - } catch (TimeoutException e) { - LOG.error("Error while disconnecting device after connection loss.", e); + if (shouldBeConnected.get() && reconnectEnabled.get()) { + isConnected.set(false); + inReconnectProcess.set(true); + LOG.info("The disconnect from provider was unexpected, trying to reconnect."); + if (reconnectExecutor.isRunning()) { + reconnectExecutor.get().submit(() -> { + synchronized (lock) { + try { + disconnect(false); + } catch (TimeoutException e) { + LOG.error(COULDN_T_DISCONNECT, e); + } + reconnect(watchdogMessage); + lock.notifyAll(); + } + }); + } else { + LOG.debug("ReconnectExecutor is not running."); + invalidateAfterConnectionLoss(watchdogMessage); } + inReconnectProcess.set(false); + } else if (shouldBeConnected.get()) { + invalidateAfterConnectionLoss(watchdogMessage); } else { - LOG.info("Watchdog detected expected disconnect from provider."); + LOG.info("The disconnect from provider was expected."); } } @@ -433,4 +498,55 @@ public void registerMdibObserver(final TestClientMdibObserver observer) { public void unregisterMdibObserver(final TestClientMdibObserver observer) { testClientMdibAccessObserver.unregisterObserver(observer); } + + private T restrictedGetter(final T t) { + synchronized (lock) { + while (inReconnectProcess.get()) { + LOG.debug("Attempted to access a connection-dependent object while the connection is interrupted, " + + "wait for connection to be re-established."); + try { + lock.wait(); + } catch (InterruptedException e) { + LOG.error("Waiting for lock interrupted.", e); + } + } + return t; + } + } + + private void reconnect(final WatchdogMessage watchdogMessage) { + var count = 1; + while (count <= reconnectTries && !isConnected.get()) { + LOG.info("Trying to reconnect, attempt {} of {}.", count, reconnectTries); + try { + connect(); + LOG.info("Successfully reconnected."); + return; + } catch (InterceptorException | TransportException | IOException e) { + LOG.info("{}. reconnection attempt failed.", count); + try { + LOG.info("Wait for {} seconds, to give the provider time to restart.", reconnectWait); + TimeUnit.SECONDS.sleep(reconnectWait); + } catch (InterruptedException ex) { + throw new RuntimeException("Error while trying to wait for the provider to restart.", ex); + } + } + count++; + } + if (!isConnected.get()) { + LOG.info("Couldn't reconnect after {} retries", reconnectTries); + invalidateAfterConnectionLoss(watchdogMessage); + } + } + + private void invalidateAfterConnectionLoss(final WatchdogMessage watchdogMessage) { + testRunObserver.invalidateTestRun(String.format( + "Lost connection to device %s. Reason: %s", + watchdogMessage.getPayload(), watchdogMessage.getReason().getMessage())); + try { + disconnect(); + } catch (TimeoutException e) { + LOG.error(COULDN_T_DISCONNECT, e); + } + } } diff --git a/sdccc/src/test/java/it/com/draeger/medical/sdccc/TestSuiteIT.java b/sdccc/src/test/java/it/com/draeger/medical/sdccc/TestSuiteIT.java index e1c6b084..71181bda 100644 --- a/sdccc/src/test/java/it/com/draeger/medical/sdccc/TestSuiteIT.java +++ b/sdccc/src/test/java/it/com/draeger/medical/sdccc/TestSuiteIT.java @@ -446,11 +446,39 @@ public void testConsumerUnexpectedSubscriptionEnd() throws Exception { getConsumerInjector(false, null, testProvider.getSdcDevice().getEprAddress()); InjectorTestBase.setInjector(injector); + unexpectedSubscriptionEnd(injector, false); + // dead subscription must've been marked + final var testRunObserver = injector.getInstance(TestRunObserver.class); + assertTrue(testRunObserver.isInvalid(), "TestRunObserver had unexpectedly absent failures"); + } + + /** + * Runs the test consumer and causes a failed renewal with reconnection enabled, + * verifies that test run is not marked as invalid. + */ + @Test + @Timeout(TEST_TIMEOUT) + public void testConsumerReconnectsSuccessfulAfterUnexpectedSubscriptionEnd() throws Exception { + testProvider.startService(DEFAULT_TIMEOUT); + + final var injector = + getConsumerInjector(false, null, testProvider.getSdcDevice().getEprAddress()); + InjectorTestBase.setInjector(injector); + + unexpectedSubscriptionEnd(injector, true); + final var testRunObserver = injector.getInstance(TestRunObserver.class); + assertFalse(testRunObserver.isInvalid(), "TestRunObserver should not be invalid"); + final var client = injector.getInstance(TestClient.class); + assertFalse(client.getConnector().getConnectedDevices().isEmpty(), "Client should be reconnected"); + } + + private void unexpectedSubscriptionEnd(final Injector injector, final boolean shouldReconnect) throws Exception { final var obs = injector.getInstance(WasRunObserver.class); assertFalse(obs.hadDirectRun()); assertFalse(obs.hadInvariantRun()); final var client = injector.getInstance(TestClient.class); + client.shouldReconnect(shouldReconnect); client.startService(DEFAULT_TIMEOUT); client.connect(); @@ -486,10 +514,72 @@ public void testConsumerUnexpectedSubscriptionEnd() throws Exception { if (!subscriptionEnd.isNegative()) { Thread.sleep(subscriptionEnd.toMillis()); } + } + + /** + * Runs the test consumer and stops and starts a provider with the same epr, + * verifies that the consumer reconnects and the test run is not marked as invalid. + */ + @Test + @Timeout(TEST_TIMEOUT) + public void testConsumerSuccessfullyReconnect() throws Exception { + final var providerEpr = getRandomEpr(); + final var provider = getProvider(providerEpr); + + final DiscoveryAccess discoveryAccess = + provider.getSdcDevice().getDevice().getDiscoveryAccess(); + discoveryAccess.setTypes(List.of(CommonConstants.MEDICAL_DEVICE_TYPE)); + discoveryAccess.setScopes(List.of(GlueConstants.SCOPE_SDC_PROVIDER)); + + provider.startService(DEFAULT_TIMEOUT); + + final var injector = getConsumerInjector(false, null, providerEpr, new AbstractConfigurationModule() { + @Override + protected void defaultConfigure() { + bind(TestSuiteConfig.NETWORK_RECONNECT_WAIT, long.class, 2L); + } + }); + InjectorTestBase.setInjector(injector); + + final var client = injector.getInstance(TestClient.class); + client.shouldReconnect(true); + client.startService(DEFAULT_TIMEOUT); + client.connect(); + + provider.stopService(DEFAULT_TIMEOUT); + final var provider2 = getProvider(providerEpr); + provider2.startService(DEFAULT_TIMEOUT); + + final DiscoveryAccess discoveryAccess2 = + provider2.getSdcDevice().getDevice().getDiscoveryAccess(); + discoveryAccess2.setTypes(List.of(CommonConstants.MEDICAL_DEVICE_TYPE)); + discoveryAccess2.setScopes(List.of(GlueConstants.SCOPE_SDC_PROVIDER)); + + final var waitTime = 1000; + while (provider2.getActiveSubscriptions().isEmpty()) { + Thread.sleep(waitTime); + } + // get active subscription id + final var activeSubs = provider2.getActiveSubscriptions(); + assertEquals(1, activeSubs.size(), "Expected only one active subscription"); + + final var subManTimeout = + activeSubs.values().stream().findFirst().orElseThrow().getExpiresTimeout(); + + client.disconnect(); + + // wait until subscription must've ended + final var subscriptionEnd = Duration.between(Instant.now(), subManTimeout); + + if (!subscriptionEnd.isNegative()) { + Thread.sleep(subscriptionEnd.toMillis()); + } - // dead subscription must've been marked final var testRunObserver = injector.getInstance(TestRunObserver.class); - assertTrue(testRunObserver.isInvalid(), "TestRunObserver had unexpectedly absent failures"); + assertFalse( + testRunObserver.isInvalid(), + "TestRunObserver had unexpected failures: " + testRunObserver.getReasons()); + provider2.stopService(DEFAULT_TIMEOUT); } /**