diff --git a/configuration/config.toml b/configuration/config.toml
index 1ca3070c..9ce5f4c4 100644
--- a/configuration/config.toml
+++ b/configuration/config.toml
@@ -28,6 +28,8 @@ EnabledCiphers = [
InterfaceAddress="127.0.0.1"
MaxWait=10
MulticastTTL=128
+ReconnectTries=3
+ReconnectWait=5
[SDCcc.Consumer]
DeviceEpr="urn:uuid:857bf583-8a51-475f-a77f-d0ca7de69b11"
diff --git a/sdccc/pom.xml b/sdccc/pom.xml
index 6541e273..0dc6d1b0 100644
--- a/sdccc/pom.xml
+++ b/sdccc/pom.xml
@@ -15,7 +15,7 @@
UTF-8
5.10.2
1.10.2
- 6.0.0-SNAPSHOT
+ 6.2.0-SNAPSHOT
2.17.1
4.7.3
../checkstyle
diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/DefaultTestSuiteConfig.java b/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/DefaultTestSuiteConfig.java
index 71d0a76a..347d1b2b 100644
--- a/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/DefaultTestSuiteConfig.java
+++ b/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/DefaultTestSuiteConfig.java
@@ -75,6 +75,8 @@ void configureNetwork() {
bind(TestSuiteConfig.NETWORK_INTERFACE_ADDRESS, String.class, "127.0.0.1");
bind(TestSuiteConfig.NETWORK_MAX_WAIT, long.class, 10L);
bind(TestSuiteConfig.NETWORK_MULTICAST_TTL, long.class, 128L);
+ bind(TestSuiteConfig.NETWORK_RECONNECT_TRIES, long.class, 3L);
+ bind(TestSuiteConfig.NETWORK_RECONNECT_WAIT, long.class, 5L);
}
void configureGRpc() {
diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/TestSuiteConfig.java b/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/TestSuiteConfig.java
index 95e864d2..e13910be 100644
--- a/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/TestSuiteConfig.java
+++ b/sdccc/src/main/java/com/draeger/medical/sdccc/configuration/TestSuiteConfig.java
@@ -44,6 +44,8 @@ public final class TestSuiteConfig {
public static final String NETWORK_INTERFACE_ADDRESS = SDCCC + NETWORK + "InterfaceAddress";
public static final String NETWORK_MAX_WAIT = SDCCC + NETWORK + "MaxWait";
public static final String NETWORK_MULTICAST_TTL = SDCCC + NETWORK + "MulticastTTL"; // should be between 0 and 255
+ public static final String NETWORK_RECONNECT_TRIES = SDCCC + NETWORK + "ReconnectTries";
+ public static final String NETWORK_RECONNECT_WAIT = SDCCC + NETWORK + "ReconnectWait";
/*
* Consumer configuration
diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClient.java b/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClient.java
index 13f73da4..f70c480e 100644
--- a/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClient.java
+++ b/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClient.java
@@ -54,6 +54,13 @@ public interface TestClient {
*/
void connect() throws InterceptorException, TransportException, IOException;
+ /**
+ * Enable or disable reconnection attempts on connection loss.
+ *
+ * @param shouldReconnect true if reconnect should be attempted, false otherwise.
+ */
+ void shouldReconnect(Boolean shouldReconnect);
+
/**
* Disconnects the SDC client from the target.
*
diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClientImpl.java b/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClientImpl.java
index 6f801e2f..e17e5e13 100644
--- a/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClientImpl.java
+++ b/sdccc/src/main/java/com/draeger/medical/sdccc/sdcri/testclient/TestClientImpl.java
@@ -27,6 +27,8 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,6 +37,7 @@
import javax.annotation.Nullable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.somda.sdc.common.util.ExecutorWrapperService;
import org.somda.sdc.dpws.DpwsFramework;
import org.somda.sdc.dpws.client.Client;
import org.somda.sdc.dpws.client.DiscoveredDevice;
@@ -59,6 +62,7 @@ public class TestClientImpl extends AbstractIdleService implements TestClient, W
private static final Logger LOG = LogManager.getLogger(TestClientImpl.class);
private static final String COULDN_T_CONNECT_TO_TARGET = "Couldn't connect to target";
+ private static final String COULDN_T_DISCONNECT = "Could not disconnect the test client";
private static final String LOCATION_CONTEXT_SCOPE_STRING_START = "sdc.ctxt.loc:";
private static final String MATCHING = "matching";
@@ -101,20 +105,30 @@ public class TestClientImpl extends AbstractIdleService implements TestClient, W
private HostingServiceProxy hostingServiceProxy;
private List targetXAddrs;
+ private final ExecutorWrapperService reconnectExecutor;
+ private final AtomicBoolean isConnected;
+ private final Object lock;
+ private final long reconnectTries;
+ private final long reconnectWait;
+ private final AtomicBoolean reconnectEnabled;
+ private final AtomicBoolean inReconnectProcess;
+
/**
* Creates an SDCri consumer instance.
*
- * @param targetDeviceEpr EPR address to filter for
- * @param targetDeviceFacility facility to filter for
- * @param targetDeviceBuilding building to filter for
- * @param targetDevicePointOfCare point of care to filter for
- * @param targetDeviceFloor floor to filter for
- * @param targetDeviceRoom room to filter for
- * @param targetDeviceBed bed to filter for
- * @param adapterAddress ip of the network interface to bind to
- * @param maxWait max waiting time to find and connect to target device
- * @param testClientUtil test client utility
- * @param testRunObserver observer for invalidating test runs on unexpected errors
+ * @param targetDeviceEpr EPR address to filter for
+ * @param targetDeviceFacility facility to filter for
+ * @param targetDeviceBuilding building to filter for
+ * @param targetDevicePointOfCare point of care to filter for
+ * @param targetDeviceFloor floor to filter for
+ * @param targetDeviceRoom room to filter for
+ * @param targetDeviceBed bed to filter for
+ * @param adapterAddress ip of the network interface to bind to
+ * @param maxWait max waiting time to find and connect to target device
+ * @param reconnectTries number of tries a reconnection is attempted
+ * @param reconnectWait the wait time between reconnection attempts in seconds
+ * @param testClientUtil test client utility
+ * @param testRunObserver observer for invalidating test runs on unexpected errors
* @param testClientMdibAccessObserver observer for changes to the mdib
*/
@Inject
@@ -129,6 +143,8 @@ public TestClientImpl(
@Named(TestSuiteConfig.CONSUMER_DEVICE_LOCATION_BED) final @Nullable String targetDeviceBed,
@Named(TestSuiteConfig.NETWORK_INTERFACE_ADDRESS) final String adapterAddress,
@Named(TestSuiteConfig.NETWORK_MAX_WAIT) final long maxWait,
+ @Named(TestSuiteConfig.NETWORK_RECONNECT_TRIES) final long reconnectTries,
+ @Named(TestSuiteConfig.NETWORK_RECONNECT_WAIT) final long reconnectWait,
final TestClientUtil testClientUtil,
final TestRunObserver testRunObserver,
final TestClientMdibAccessObserver testClientMdibAccessObserver) {
@@ -137,6 +153,12 @@ public TestClientImpl(
this.connector = injector.getInstance(SdcRemoteDevicesConnector.class);
this.testRunObserver = testRunObserver;
this.shouldBeConnected = new AtomicBoolean(false);
+ this.isConnected = new AtomicBoolean(false);
+ this.reconnectEnabled = new AtomicBoolean(false);
+ this.inReconnectProcess = new AtomicBoolean(false);
+ this.lock = new Object();
+ this.reconnectTries = reconnectTries;
+ this.reconnectWait = reconnectWait;
this.maxWait = Duration.ofSeconds(maxWait);
this.testClientMdibAccessObserver = testClientMdibAccessObserver;
@@ -187,6 +209,8 @@ public TestClientImpl(
this.floorSearchLogString,
this.roomSearchLogString,
this.bedSearchLogString);
+ reconnectExecutor = new ExecutorWrapperService<>(
+ Executors::newSingleThreadExecutor, "TestClientExecutor", "InstanceIdentifier");
}
@Override
@@ -196,10 +220,12 @@ protected void startUp() {
this.dpwsFramework.setNetworkInterface(networkInterface);
dpwsFramework.startAsync().awaitRunning();
client.startAsync().awaitRunning();
+ reconnectExecutor.startAsync().awaitRunning();
}
@Override
protected void shutDown() {
+ reconnectExecutor.stopAsync().awaitTerminated();
client.stopAsync().awaitTerminated();
dpwsFramework.stopAsync().awaitTerminated();
}
@@ -318,16 +344,29 @@ void deviceFound(final ProbedDeviceFoundMessage message) {
throw new IOException(COULDN_T_CONNECT_TO_TARGET);
}
} catch (final InterruptedException | TimeoutException | ExecutionException e) {
- LOG.error(
- "Couldn't find a device with {}, {}, {}, {}, {}, {} and {}",
- this.eprSearchLogString,
- this.facilitySearchLogString,
- this.buildingSearchLogString,
- this.pointOfCareSearchLogString,
- this.floorSearchLogString,
- this.roomSearchLogString,
- this.bedSearchLogString,
- e);
+ if (inReconnectProcess.get()) {
+ LOG.info(
+ "Tried to reconnect, but couldn't find a device with {}, {}, {}, {}, {}, {} and {}",
+ this.eprSearchLogString,
+ this.facilitySearchLogString,
+ this.buildingSearchLogString,
+ this.pointOfCareSearchLogString,
+ this.floorSearchLogString,
+ this.roomSearchLogString,
+ this.bedSearchLogString,
+ e);
+ } else {
+ LOG.error(
+ "Couldn't find a device with {}, {}, {}, {}, {}, {} and {}",
+ this.eprSearchLogString,
+ this.facilitySearchLogString,
+ this.buildingSearchLogString,
+ this.pointOfCareSearchLogString,
+ this.floorSearchLogString,
+ this.roomSearchLogString,
+ this.bedSearchLogString,
+ e);
+ }
throw new IOException(COULDN_T_CONNECT_TO_TARGET);
} finally {
client.unregisterDiscoveryObserver(obs);
@@ -359,17 +398,29 @@ void deviceFound(final ProbedDeviceFoundMessage message) {
}
sdcRemoteDevice.registerWatchdogObserver(this);
+ isConnected.set(true);
+ }
+
+ @Override
+ public void shouldReconnect(final Boolean shouldReconnect) {
+ this.reconnectEnabled.set(shouldReconnect);
}
@Override
public synchronized void disconnect() throws TimeoutException {
- shouldBeConnected.set(false);
+ disconnect(true);
+ }
+
+ private synchronized void disconnect(final Boolean expected) throws TimeoutException {
+ shouldBeConnected.set(expected);
if (sdcRemoteDevice != null) {
sdcRemoteDevice.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS);
sdcRemoteDevice = null;
}
hostingServiceProxy = null;
- client.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS);
+ if (expected) {
+ client.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS);
+ }
}
@Override
@@ -384,12 +435,12 @@ public SdcRemoteDevicesConnector getConnector() {
@Override
public SdcRemoteDevice getSdcRemoteDevice() {
- return sdcRemoteDevice;
+ return restrictedGetter(sdcRemoteDevice);
}
@Override
public HostingServiceProxy getHostingServiceProxy() {
- return hostingServiceProxy;
+ return restrictedGetter(hostingServiceProxy);
}
@Override
@@ -410,17 +461,31 @@ public Injector getInjector() {
@Subscribe
void onConnectionLoss(final WatchdogMessage watchdogMessage) {
LOG.info("Watchdog detected disconnect from provider.");
- if (shouldBeConnected.get()) {
- testRunObserver.invalidateTestRun(String.format(
- "Lost connection to device %s. Reason: %s",
- watchdogMessage.getPayload(), watchdogMessage.getReason().getMessage()));
- try {
- disconnect();
- } catch (TimeoutException e) {
- LOG.error("Error while disconnecting device after connection loss.", e);
+ if (shouldBeConnected.get() && reconnectEnabled.get()) {
+ isConnected.set(false);
+ inReconnectProcess.set(true);
+ LOG.info("The disconnect from provider was unexpected, trying to reconnect.");
+ if (reconnectExecutor.isRunning()) {
+ reconnectExecutor.get().submit(() -> {
+ synchronized (lock) {
+ try {
+ disconnect(false);
+ } catch (TimeoutException e) {
+ LOG.error(COULDN_T_DISCONNECT, e);
+ }
+ reconnect(watchdogMessage);
+ lock.notifyAll();
+ }
+ });
+ } else {
+ LOG.debug("ReconnectExecutor is not running.");
+ invalidateAfterConnectionLoss(watchdogMessage);
}
+ inReconnectProcess.set(false);
+ } else if (shouldBeConnected.get()) {
+ invalidateAfterConnectionLoss(watchdogMessage);
} else {
- LOG.info("Watchdog detected expected disconnect from provider.");
+ LOG.info("The disconnect from provider was expected.");
}
}
@@ -433,4 +498,55 @@ public void registerMdibObserver(final TestClientMdibObserver observer) {
public void unregisterMdibObserver(final TestClientMdibObserver observer) {
testClientMdibAccessObserver.unregisterObserver(observer);
}
+
+ private T restrictedGetter(final T t) {
+ synchronized (lock) {
+ while (inReconnectProcess.get()) {
+ LOG.debug("Attempted to access a connection-dependent object while the connection is interrupted, "
+ + "wait for connection to be re-established.");
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ LOG.error("Waiting for lock interrupted.", e);
+ }
+ }
+ return t;
+ }
+ }
+
+ private void reconnect(final WatchdogMessage watchdogMessage) {
+ var count = 1;
+ while (count <= reconnectTries && !isConnected.get()) {
+ LOG.info("Trying to reconnect, attempt {} of {}.", count, reconnectTries);
+ try {
+ connect();
+ LOG.info("Successfully reconnected.");
+ return;
+ } catch (InterceptorException | TransportException | IOException e) {
+ LOG.info("{}. reconnection attempt failed.", count);
+ try {
+ LOG.info("Wait for {} seconds, to give the provider time to restart.", reconnectWait);
+ TimeUnit.SECONDS.sleep(reconnectWait);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Error while trying to wait for the provider to restart.", ex);
+ }
+ }
+ count++;
+ }
+ if (!isConnected.get()) {
+ LOG.info("Couldn't reconnect after {} retries", reconnectTries);
+ invalidateAfterConnectionLoss(watchdogMessage);
+ }
+ }
+
+ private void invalidateAfterConnectionLoss(final WatchdogMessage watchdogMessage) {
+ testRunObserver.invalidateTestRun(String.format(
+ "Lost connection to device %s. Reason: %s",
+ watchdogMessage.getPayload(), watchdogMessage.getReason().getMessage()));
+ try {
+ disconnect();
+ } catch (TimeoutException e) {
+ LOG.error(COULDN_T_DISCONNECT, e);
+ }
+ }
}
diff --git a/sdccc/src/test/java/it/com/draeger/medical/sdccc/TestSuiteIT.java b/sdccc/src/test/java/it/com/draeger/medical/sdccc/TestSuiteIT.java
index e1c6b084..71181bda 100644
--- a/sdccc/src/test/java/it/com/draeger/medical/sdccc/TestSuiteIT.java
+++ b/sdccc/src/test/java/it/com/draeger/medical/sdccc/TestSuiteIT.java
@@ -446,11 +446,39 @@ public void testConsumerUnexpectedSubscriptionEnd() throws Exception {
getConsumerInjector(false, null, testProvider.getSdcDevice().getEprAddress());
InjectorTestBase.setInjector(injector);
+ unexpectedSubscriptionEnd(injector, false);
+ // dead subscription must've been marked
+ final var testRunObserver = injector.getInstance(TestRunObserver.class);
+ assertTrue(testRunObserver.isInvalid(), "TestRunObserver had unexpectedly absent failures");
+ }
+
+ /**
+ * Runs the test consumer and causes a failed renewal with reconnection enabled,
+ * verifies that test run is not marked as invalid.
+ */
+ @Test
+ @Timeout(TEST_TIMEOUT)
+ public void testConsumerReconnectsSuccessfulAfterUnexpectedSubscriptionEnd() throws Exception {
+ testProvider.startService(DEFAULT_TIMEOUT);
+
+ final var injector =
+ getConsumerInjector(false, null, testProvider.getSdcDevice().getEprAddress());
+ InjectorTestBase.setInjector(injector);
+
+ unexpectedSubscriptionEnd(injector, true);
+ final var testRunObserver = injector.getInstance(TestRunObserver.class);
+ assertFalse(testRunObserver.isInvalid(), "TestRunObserver should not be invalid");
+ final var client = injector.getInstance(TestClient.class);
+ assertFalse(client.getConnector().getConnectedDevices().isEmpty(), "Client should be reconnected");
+ }
+
+ private void unexpectedSubscriptionEnd(final Injector injector, final boolean shouldReconnect) throws Exception {
final var obs = injector.getInstance(WasRunObserver.class);
assertFalse(obs.hadDirectRun());
assertFalse(obs.hadInvariantRun());
final var client = injector.getInstance(TestClient.class);
+ client.shouldReconnect(shouldReconnect);
client.startService(DEFAULT_TIMEOUT);
client.connect();
@@ -486,10 +514,72 @@ public void testConsumerUnexpectedSubscriptionEnd() throws Exception {
if (!subscriptionEnd.isNegative()) {
Thread.sleep(subscriptionEnd.toMillis());
}
+ }
+
+ /**
+ * Runs the test consumer and stops and starts a provider with the same epr,
+ * verifies that the consumer reconnects and the test run is not marked as invalid.
+ */
+ @Test
+ @Timeout(TEST_TIMEOUT)
+ public void testConsumerSuccessfullyReconnect() throws Exception {
+ final var providerEpr = getRandomEpr();
+ final var provider = getProvider(providerEpr);
+
+ final DiscoveryAccess discoveryAccess =
+ provider.getSdcDevice().getDevice().getDiscoveryAccess();
+ discoveryAccess.setTypes(List.of(CommonConstants.MEDICAL_DEVICE_TYPE));
+ discoveryAccess.setScopes(List.of(GlueConstants.SCOPE_SDC_PROVIDER));
+
+ provider.startService(DEFAULT_TIMEOUT);
+
+ final var injector = getConsumerInjector(false, null, providerEpr, new AbstractConfigurationModule() {
+ @Override
+ protected void defaultConfigure() {
+ bind(TestSuiteConfig.NETWORK_RECONNECT_WAIT, long.class, 2L);
+ }
+ });
+ InjectorTestBase.setInjector(injector);
+
+ final var client = injector.getInstance(TestClient.class);
+ client.shouldReconnect(true);
+ client.startService(DEFAULT_TIMEOUT);
+ client.connect();
+
+ provider.stopService(DEFAULT_TIMEOUT);
+ final var provider2 = getProvider(providerEpr);
+ provider2.startService(DEFAULT_TIMEOUT);
+
+ final DiscoveryAccess discoveryAccess2 =
+ provider2.getSdcDevice().getDevice().getDiscoveryAccess();
+ discoveryAccess2.setTypes(List.of(CommonConstants.MEDICAL_DEVICE_TYPE));
+ discoveryAccess2.setScopes(List.of(GlueConstants.SCOPE_SDC_PROVIDER));
+
+ final var waitTime = 1000;
+ while (provider2.getActiveSubscriptions().isEmpty()) {
+ Thread.sleep(waitTime);
+ }
+ // get active subscription id
+ final var activeSubs = provider2.getActiveSubscriptions();
+ assertEquals(1, activeSubs.size(), "Expected only one active subscription");
+
+ final var subManTimeout =
+ activeSubs.values().stream().findFirst().orElseThrow().getExpiresTimeout();
+
+ client.disconnect();
+
+ // wait until subscription must've ended
+ final var subscriptionEnd = Duration.between(Instant.now(), subManTimeout);
+
+ if (!subscriptionEnd.isNegative()) {
+ Thread.sleep(subscriptionEnd.toMillis());
+ }
- // dead subscription must've been marked
final var testRunObserver = injector.getInstance(TestRunObserver.class);
- assertTrue(testRunObserver.isInvalid(), "TestRunObserver had unexpectedly absent failures");
+ assertFalse(
+ testRunObserver.isInvalid(),
+ "TestRunObserver had unexpected failures: " + testRunObserver.getReasons());
+ provider2.stopService(DEFAULT_TIMEOUT);
}
/**