Skip to content

Commit

Permalink
Added reconnection capabilities for the test client.
Browse files Browse the repository at this point in the history
  • Loading branch information
belagertem committed Nov 5, 2024
1 parent ebe0e09 commit 68dcf9f
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 37 deletions.
2 changes: 2 additions & 0 deletions configuration/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ EnabledCiphers = [
InterfaceAddress="127.0.0.1"
MaxWait=10
MulticastTTL=128
ReconnectTries=3
ReconnectWait=5

[SDCcc.Consumer]
DeviceEpr="urn:uuid:857bf583-8a51-475f-a77f-d0ca7de69b11"
Expand Down
2 changes: 1 addition & 1 deletion sdccc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junitVersion>5.10.2</junitVersion>
<junitPlatformVersion>1.10.2</junitPlatformVersion>
<sdcriVersion>6.0.0-SNAPSHOT</sdcriVersion>
<sdcriVersion>6.2.0-SNAPSHOT</sdcriVersion>
<log4jVersion>2.17.1</log4jVersion>
<spotbugsVersion>4.7.3</spotbugsVersion>
<checkstyleConfigDir>../checkstyle</checkstyleConfigDir>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ void configureNetwork() {
bind(TestSuiteConfig.NETWORK_INTERFACE_ADDRESS, String.class, "127.0.0.1");
bind(TestSuiteConfig.NETWORK_MAX_WAIT, long.class, 10L);
bind(TestSuiteConfig.NETWORK_MULTICAST_TTL, long.class, 128L);
bind(TestSuiteConfig.NETWORK_RECONNECT_TRIES, long.class, 3L);
bind(TestSuiteConfig.NETWORK_RECONNECT_WAIT, long.class, 5L);
}

void configureGRpc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public final class TestSuiteConfig {
public static final String NETWORK_INTERFACE_ADDRESS = SDCCC + NETWORK + "InterfaceAddress";
public static final String NETWORK_MAX_WAIT = SDCCC + NETWORK + "MaxWait";
public static final String NETWORK_MULTICAST_TTL = SDCCC + NETWORK + "MulticastTTL"; // should be between 0 and 255
public static final String NETWORK_RECONNECT_TRIES = SDCCC + NETWORK + "ReconnectTries";
public static final String NETWORK_RECONNECT_WAIT = SDCCC + NETWORK + "ReconnectWait";

/*
* Consumer configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ public interface TestClient {
*/
void connect() throws InterceptorException, TransportException, IOException;

/**
* Enable or disable reconnection attempts on connection loss.
*
* @param shouldReconnect true if reconnect should be attempted, false otherwise.
*/
void shouldReconnect(Boolean shouldReconnect);

/**
* Disconnects the SDC client from the target.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -35,6 +37,7 @@
import javax.annotation.Nullable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.somda.sdc.common.util.ExecutorWrapperService;
import org.somda.sdc.dpws.DpwsFramework;
import org.somda.sdc.dpws.client.Client;
import org.somda.sdc.dpws.client.DiscoveredDevice;
Expand All @@ -59,6 +62,7 @@ public class TestClientImpl extends AbstractIdleService implements TestClient, W
private static final Logger LOG = LogManager.getLogger(TestClientImpl.class);

private static final String COULDN_T_CONNECT_TO_TARGET = "Couldn't connect to target";
private static final String COULDN_T_DISCONNECT = "Could not disconnect the test client";

private static final String LOCATION_CONTEXT_SCOPE_STRING_START = "sdc.ctxt.loc:";
private static final String MATCHING = "matching";
Expand Down Expand Up @@ -101,20 +105,30 @@ public class TestClientImpl extends AbstractIdleService implements TestClient, W
private HostingServiceProxy hostingServiceProxy;
private List<String> targetXAddrs;

private final ExecutorWrapperService<ExecutorService> reconnectExecutor;
private final AtomicBoolean isConnected;
private final Object lock;
private final long reconnectTries;
private final long reconnectWait;
private final AtomicBoolean reconnectEnabled;
private final AtomicBoolean inReconnectProcess;

/**
* Creates an SDCri consumer instance.
*
* @param targetDeviceEpr EPR address to filter for
* @param targetDeviceFacility facility to filter for
* @param targetDeviceBuilding building to filter for
* @param targetDevicePointOfCare point of care to filter for
* @param targetDeviceFloor floor to filter for
* @param targetDeviceRoom room to filter for
* @param targetDeviceBed bed to filter for
* @param adapterAddress ip of the network interface to bind to
* @param maxWait max waiting time to find and connect to target device
* @param testClientUtil test client utility
* @param testRunObserver observer for invalidating test runs on unexpected errors
* @param targetDeviceEpr EPR address to filter for
* @param targetDeviceFacility facility to filter for
* @param targetDeviceBuilding building to filter for
* @param targetDevicePointOfCare point of care to filter for
* @param targetDeviceFloor floor to filter for
* @param targetDeviceRoom room to filter for
* @param targetDeviceBed bed to filter for
* @param adapterAddress ip of the network interface to bind to
* @param maxWait max waiting time to find and connect to target device
* @param reconnectTries number of tries a reconnection is attempted
* @param reconnectWait the wait time between reconnection attempts in seconds
* @param testClientUtil test client utility
* @param testRunObserver observer for invalidating test runs on unexpected errors
* @param testClientMdibAccessObserver observer for changes to the mdib
*/
@Inject
Expand All @@ -129,6 +143,8 @@ public TestClientImpl(
@Named(TestSuiteConfig.CONSUMER_DEVICE_LOCATION_BED) final @Nullable String targetDeviceBed,
@Named(TestSuiteConfig.NETWORK_INTERFACE_ADDRESS) final String adapterAddress,
@Named(TestSuiteConfig.NETWORK_MAX_WAIT) final long maxWait,
@Named(TestSuiteConfig.NETWORK_RECONNECT_TRIES) final long reconnectTries,
@Named(TestSuiteConfig.NETWORK_RECONNECT_WAIT) final long reconnectWait,
final TestClientUtil testClientUtil,
final TestRunObserver testRunObserver,
final TestClientMdibAccessObserver testClientMdibAccessObserver) {
Expand All @@ -137,6 +153,12 @@ public TestClientImpl(
this.connector = injector.getInstance(SdcRemoteDevicesConnector.class);
this.testRunObserver = testRunObserver;
this.shouldBeConnected = new AtomicBoolean(false);
this.isConnected = new AtomicBoolean(false);
this.reconnectEnabled = new AtomicBoolean(false);
this.inReconnectProcess = new AtomicBoolean(false);
this.lock = new Object();
this.reconnectTries = reconnectTries;
this.reconnectWait = reconnectWait;
this.maxWait = Duration.ofSeconds(maxWait);
this.testClientMdibAccessObserver = testClientMdibAccessObserver;

Expand Down Expand Up @@ -187,6 +209,8 @@ public TestClientImpl(
this.floorSearchLogString,
this.roomSearchLogString,
this.bedSearchLogString);
reconnectExecutor = new ExecutorWrapperService<>(
Executors::newSingleThreadExecutor, "TestClientExecutor", "InstanceIdentifier");
}

@Override
Expand All @@ -196,10 +220,12 @@ protected void startUp() {
this.dpwsFramework.setNetworkInterface(networkInterface);
dpwsFramework.startAsync().awaitRunning();
client.startAsync().awaitRunning();
reconnectExecutor.startAsync().awaitRunning();
}

@Override
protected void shutDown() {
reconnectExecutor.stopAsync().awaitTerminated();
client.stopAsync().awaitTerminated();
dpwsFramework.stopAsync().awaitTerminated();
}
Expand Down Expand Up @@ -318,16 +344,29 @@ void deviceFound(final ProbedDeviceFoundMessage message) {
throw new IOException(COULDN_T_CONNECT_TO_TARGET);
}
} catch (final InterruptedException | TimeoutException | ExecutionException e) {
LOG.error(
"Couldn't find a device with {}, {}, {}, {}, {}, {} and {}",
this.eprSearchLogString,
this.facilitySearchLogString,
this.buildingSearchLogString,
this.pointOfCareSearchLogString,
this.floorSearchLogString,
this.roomSearchLogString,
this.bedSearchLogString,
e);
if (inReconnectProcess.get()) {
LOG.info(
"Tried to reconnect, but couldn't find a device with {}, {}, {}, {}, {}, {} and {}",
this.eprSearchLogString,
this.facilitySearchLogString,
this.buildingSearchLogString,
this.pointOfCareSearchLogString,
this.floorSearchLogString,
this.roomSearchLogString,
this.bedSearchLogString,
e);
} else {
LOG.error(
"Couldn't find a device with {}, {}, {}, {}, {}, {} and {}",
this.eprSearchLogString,
this.facilitySearchLogString,
this.buildingSearchLogString,
this.pointOfCareSearchLogString,
this.floorSearchLogString,
this.roomSearchLogString,
this.bedSearchLogString,
e);
}
throw new IOException(COULDN_T_CONNECT_TO_TARGET);
} finally {
client.unregisterDiscoveryObserver(obs);
Expand Down Expand Up @@ -359,17 +398,29 @@ void deviceFound(final ProbedDeviceFoundMessage message) {
}

sdcRemoteDevice.registerWatchdogObserver(this);
isConnected.set(true);
}

@Override
public void shouldReconnect(final Boolean shouldReconnect) {
this.reconnectEnabled.set(shouldReconnect);
}

@Override
public synchronized void disconnect() throws TimeoutException {
shouldBeConnected.set(false);
disconnect(true);
}

private synchronized void disconnect(final Boolean expected) throws TimeoutException {
shouldBeConnected.set(expected);
if (sdcRemoteDevice != null) {
sdcRemoteDevice.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS);
sdcRemoteDevice = null;
}
hostingServiceProxy = null;
client.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS);
if (expected) {
client.stopAsync().awaitTerminated(maxWait.toSeconds(), TimeUnit.SECONDS);
}
}

@Override
Expand All @@ -384,12 +435,12 @@ public SdcRemoteDevicesConnector getConnector() {

@Override
public SdcRemoteDevice getSdcRemoteDevice() {
return sdcRemoteDevice;
return restrictedGetter(sdcRemoteDevice);
}

@Override
public HostingServiceProxy getHostingServiceProxy() {
return hostingServiceProxy;
return restrictedGetter(hostingServiceProxy);
}

@Override
Expand All @@ -410,17 +461,31 @@ public Injector getInjector() {
@Subscribe
void onConnectionLoss(final WatchdogMessage watchdogMessage) {
LOG.info("Watchdog detected disconnect from provider.");
if (shouldBeConnected.get()) {
testRunObserver.invalidateTestRun(String.format(
"Lost connection to device %s. Reason: %s",
watchdogMessage.getPayload(), watchdogMessage.getReason().getMessage()));
try {
disconnect();
} catch (TimeoutException e) {
LOG.error("Error while disconnecting device after connection loss.", e);
if (shouldBeConnected.get() && reconnectEnabled.get()) {
isConnected.set(false);
inReconnectProcess.set(true);
LOG.info("The disconnect from provider was unexpected, trying to reconnect.");
if (reconnectExecutor.isRunning()) {
reconnectExecutor.get().submit(() -> {
synchronized (lock) {
try {
disconnect(false);
} catch (TimeoutException e) {
LOG.error(COULDN_T_DISCONNECT, e);
}
reconnect(watchdogMessage);
lock.notifyAll();
}
});
} else {
LOG.debug("ReconnectExecutor is not running.");
invalidateAfterConnectionLoss(watchdogMessage);
}
inReconnectProcess.set(false);
} else if (shouldBeConnected.get()) {
invalidateAfterConnectionLoss(watchdogMessage);
} else {
LOG.info("Watchdog detected expected disconnect from provider.");
LOG.info("The disconnect from provider was expected.");
}
}

Expand All @@ -433,4 +498,55 @@ public void registerMdibObserver(final TestClientMdibObserver observer) {
public void unregisterMdibObserver(final TestClientMdibObserver observer) {
testClientMdibAccessObserver.unregisterObserver(observer);
}

private <T> T restrictedGetter(final T t) {
synchronized (lock) {
while (inReconnectProcess.get()) {
LOG.debug("Attempted to access a connection-dependent object while the connection is interrupted, "
+ "wait for connection to be re-established.");
try {
lock.wait();
} catch (InterruptedException e) {
LOG.error("Waiting for lock interrupted.", e);
}
}
return t;
}
}

private void reconnect(final WatchdogMessage watchdogMessage) {
var count = 1;
while (count <= reconnectTries && !isConnected.get()) {
LOG.info("Trying to reconnect, attempt {} of {}.", count, reconnectTries);
try {
connect();
LOG.info("Successfully reconnected.");
return;
} catch (InterceptorException | TransportException | IOException e) {
LOG.info("{}. reconnection attempt failed.", count);
try {
LOG.info("Wait for {} seconds, to give the provider time to restart.", reconnectWait);
TimeUnit.SECONDS.sleep(reconnectWait);
} catch (InterruptedException ex) {
throw new RuntimeException("Error while trying to wait for the provider to restart.", ex);
}
}
count++;
}
if (!isConnected.get()) {
LOG.info("Couldn't reconnect after {} retries", reconnectTries);
invalidateAfterConnectionLoss(watchdogMessage);
}
}

private void invalidateAfterConnectionLoss(final WatchdogMessage watchdogMessage) {
testRunObserver.invalidateTestRun(String.format(
"Lost connection to device %s. Reason: %s",
watchdogMessage.getPayload(), watchdogMessage.getReason().getMessage()));
try {
disconnect();
} catch (TimeoutException e) {
LOG.error(COULDN_T_DISCONNECT, e);
}
}
}
Loading

0 comments on commit 68dcf9f

Please sign in to comment.