Skip to content

Commit

Permalink
Added configuration option for auto reconnect, reconnect delay, and m…
Browse files Browse the repository at this point in the history
…ax inflight
  • Loading branch information
syedmhashim committed Apr 3, 2022
1 parent dc2bb92 commit d164504
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 8 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ hs_err_pid*

.idea
*.iml
target
target

*.mapdb*
*SNAPSHOT.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,39 @@
"to connect to the MQTT broker. Once this time interval elapses, a timeout takes " +
"place.",
type = {DataType.INT},
optional = true, defaultValue = "30")

optional = true, defaultValue = "30"),
@Parameter(
name = "max.inflight",
description = "The maximum number of messages the MQTT client can send without receiving " +
"acknowledgments. The default value is 10",
type = {DataType.INT},
optional = true, defaultValue = "10"),
@Parameter(
name = "automatic.reconnect",
description = "This is an optional parameter. If set to true, in the event that the " +
"connection is lost, the client will attempt to reconnect to the server. It will " +
"initially wait 1 second before it attempts to reconnect, for every failed reconnect " +
"attempt, the delay will double until it is at 2 minutes at which point the delay " +
"will stay at 2 minutes. " +
"If set to false, the client will not attempt to automatically reconnect to the " +
"server in the event that the connection is lost. " +
"The default value is `false`.",
type = {DataType.BOOL},
optional = true, defaultValue = "false"),
@Parameter(
name = "max.reconnect.delay",
description = "The maximum number of milliseconds the client could wait after the connection " +
"is lost. The default value is 128000",
type = {DataType.INT},
optional = true, defaultValue = "128000")
},
examples =
{
@Example(
syntax = "@sink(type='mqtt', url= 'tcp://localhost:1883', " +
"topic='mqtt_topic', clean.session='true', message.retain='false', " +
"quality.of.service= '1', keep.alive= '60',connection.timeout='30'" +
"max.inflight='20' ,automatic.reconnect='true', max.reconnect.delay='64000'," +
"@map(type='xml'))" +
"Define stream BarStream (symbol string, price float, volume long);",
description = "This query publishes events to a stream named `BarStream` via the " +
Expand All @@ -165,6 +189,9 @@ public class MqttSink extends Sink {
private boolean cleanSession;
private int keepAlive;
private int connectionTimeout;
private int maxInflight;
private boolean automaticReconnect;
private int maxReconnectDelay;
private MqttClient client;
private Option messageRetainOption;
private StreamDefinition streamDefinition;
Expand Down Expand Up @@ -192,6 +219,14 @@ protected StateFactory init(StreamDefinition streamDefinition, OptionHolder opti
MqttConstants.DEFAULT_MESSAGE_RETAIN);
this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.CLEAN_SESSION, MqttConstants.DEFAULT_CLEAN_SESSION));
this.maxInflight = Integer.parseInt(optionHolder.validateAndGetStaticValue
(MqttConstants.MAX_INFLIGHT,
MqttConstants.DEFAULT_MAX_INFLIGHT));
this.automaticReconnect = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.AUTOMATIC_RECONNECT, MqttConstants.DEFAULT_AUTOMATIC_RECONNECT));
this.maxReconnectDelay = Integer.parseInt(optionHolder.validateAndGetStaticValue
(MqttConstants.MAX_RECONNECT_DELAY,
MqttConstants.DEFAULT_MAX_RECONNECT_DELAY));
return null;
}

Expand Down Expand Up @@ -225,6 +260,9 @@ public void connect() throws ConnectionUnavailableException {
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
connectionOptions.setMaxInflight(maxInflight);
connectionOptions.setAutomaticReconnect(automaticReconnect);
connectionOptions.setMaxReconnectDelay(maxReconnectDelay);
client.connect(connectionOptions);

} catch (MqttException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,42 @@
"place.",
type = {DataType.INT},
optional = true,
defaultValue = "30")
defaultValue = "30"),
@Parameter(
name = "max.inflight",
description = "The maximum number of messages the MQTT client can send without receiving " +
"acknowledgments. The default value is 10",
type = {DataType.INT},
optional = true,
defaultValue = "10"),
@Parameter(
name = "automatic.reconnect",
description = "This is an optional parameter. If set to true, in the event that the " +
"connection is lost, the client will attempt to reconnect to the server. It will " +
"initially wait 1 second before it attempts to reconnect, for every failed reconnect " +
"attempt, the delay will double until it is at 2 minutes at which point the delay " +
"will stay at 2 minutes. " +
"If set to false, the client will not attempt to automatically reconnect to the " +
"server in the event that the connection is lost. " +
"The default value is `false`.",
type = {DataType.BOOL},
optional = true,
defaultValue = "false"),
@Parameter(
name = "max.reconnect.delay",
description = "The maximum number of milliseconds the client could wait after the connection " +
"is lost. The default value is 128000",
type = {DataType.INT},
optional = true,
defaultValue = "128000")
},
examples =
{
@Example(
syntax = "@source(type='mqtt', url= 'tcp://localhost:1883', " +
"topic='mqtt_topic', clean.session='true'," +
"quality.of.service= '1', keep.alive= '60',connection.timeout='30'" +
"max.inflight='20' ,automatic.reconnect='true', max.reconnect.delay='64000'," +
"@map(type='xml'))" +
"Define stream BarStream (symbol string, price float, volume long);",
description = "This query receives events from the `mqtt_topic` topic via MQTT," +
Expand All @@ -153,6 +181,9 @@ public class MqttSource extends Source {
private boolean cleanSession;
private int keepAlive;
private int connectionTimeout;
private int maxInflight;
private boolean automaticReconnect;
private int maxReconnectDelay;
private MqttClient client;
private MqttConsumer mqttConsumer;
private String siddhiAppName;
Expand All @@ -178,6 +209,14 @@ public StateFactory init(SourceEventListener sourceEventListener, OptionHolder o
MqttConstants.DEFAULT_CONNECTION_TIMEOUT_INTERVAL));
this.cleanSession = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.CLEAN_SESSION, MqttConstants.DEFAULT_CLEAN_SESSION));
this.maxInflight = Integer.parseInt(optionHolder.validateAndGetStaticValue
(MqttConstants.MAX_INFLIGHT,
MqttConstants.DEFAULT_MAX_INFLIGHT));
this.automaticReconnect = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue
(MqttConstants.AUTOMATIC_RECONNECT, MqttConstants.DEFAULT_AUTOMATIC_RECONNECT));
this.maxReconnectDelay = Integer.parseInt(optionHolder.validateAndGetStaticValue
(MqttConstants.MAX_RECONNECT_DELAY,
MqttConstants.DEFAULT_MAX_RECONNECT_DELAY));
this.mqttConsumer = new MqttConsumer(sourceEventListener);
return null;
}
Expand All @@ -200,6 +239,9 @@ public void connect(ConnectionCallback connectionCallback, State state) throws C
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
connectionOptions.setMaxInflight(maxInflight);
connectionOptions.setAutomaticReconnect(automaticReconnect);
connectionOptions.setMaxReconnectDelay(maxReconnectDelay);
client.connect(connectionOptions);
int qos = Integer.parseInt(String.valueOf(qosOption));
mqttConsumer.subscribe(topicOption, qos, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ private MqttConstants() {
public static final String DEFAULT_USERNAME = null;
public static final String CONNECTION_TIMEOUT_INTERVAL = "connection.timeout";
public static final String DEFAULT_CONNECTION_TIMEOUT_INTERVAL = "30";



public static final String MAX_INFLIGHT = "max.inflight";
public static final String DEFAULT_MAX_INFLIGHT = "10";
public static final String AUTOMATIC_RECONNECT = "automatic.reconnect";
public static final String DEFAULT_AUTOMATIC_RECONNECT = "false";
public static final String MAX_RECONNECT_DELAY = "max.reconnect.delay";
public static final String DEFAULT_MAX_RECONNECT_DELAY = "128000";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.siddhi.extension.io.mqtt.sink;

import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.input.InputHandler;
import org.apache.log4j.Logger;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.rmi.RemoteException;
import java.util.Properties;

public class MqttSinkAutomaticReconnectTest {
private volatile int count;
private volatile boolean eventArrived;
private static final Logger log = Logger.getLogger(MqttSinkAutomaticReconnectTest.class);
private static final Server mqttBroker = new Server();
private MqttTestClient mqttTestClient;
private static final Properties properties = new Properties();

@BeforeMethod
public void initBeforeMethod() {
count = 0;
eventArrived = false;
}

@BeforeClass
public static void init() throws Exception {
try {
properties.put("port", Integer.toString(1883));
properties.put("host", "0.0.0.0");
final IConfig config = new MemoryConfig(properties);
mqttBroker.startServer(config);
Thread.sleep(1000);
} catch (Exception e) {
throw new RemoteException("Exception caught when starting server", e);
}
}

@AfterClass
public static void stop() {
mqttBroker.stopServer();
}

@Test
public void mqttPublishEventWithAutomaticReconnect() {
log.info("Test for Mqtt publish events with automatic reconnect enabled");
SiddhiManager siddhiManager = new SiddhiManager();
ResultContainer resultContainer = new ResultContainer(3);
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
"define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+ "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+ "topic='mqtt_publish_event_with_automatic_reconnect',username='mqtt-user', "
+ "password='mqtt-password', clean.session='true', message.retain='false', "
+ "automatic.reconnect='true', keep.alive= '60'," + "@map(type='xml'))"
+ "Define stream BarStream (symbol string, price float, volume long);"
+ "from FooStream select symbol, price, volume insert into BarStream;");
InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
try {
this.mqttTestClient = new MqttTestClient("tcp://localhost:1883",
"mqtt_publish_event_with_automatic_reconnect", 1,
resultContainer, true, false);
} catch (ConnectionUnavailableException e) {
AssertJUnit.fail("Could not connect to broker.");
}
siddhiAppRuntime.start();
try {
fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
fooStream.send(new Object[] { "IBM", 75.6f, 100L });
Thread.sleep(500);
mqttBroker.stopServer();
fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
} catch (InterruptedException e) {
AssertJUnit.fail("Thread sleep was interrupted");
}
try {
final IConfig config = new MemoryConfig(properties);
mqttBroker.startServer(config);
Thread.sleep(2000);
fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
Thread.sleep(500);
} catch (Exception e) {
AssertJUnit.fail("Thread sleep was interrupted");
}
count = mqttTestClient.getCount();
eventArrived = mqttTestClient.getEventArrived();
AssertJUnit.assertEquals(3, count);
AssertJUnit.assertTrue(eventArrived);
AssertJUnit.assertTrue(resultContainer.assertMessageContent("WSO2"));
AssertJUnit.assertTrue(resultContainer.assertMessageContent("IBM"));
AssertJUnit.assertTrue(resultContainer.assertMessageContent("JAMES"));
siddhiAppRuntime.shutdown();
}

@Test
public void mqttPublishEventWithoutAutomaticReconnect() {
log.info("Test for Mqtt publish events with automatic reconnect disabled");
SiddhiManager siddhiManager = new SiddhiManager();
ResultContainer resultContainer = new ResultContainer(2);
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(
"define stream FooStream (symbol string, price float, volume long); " + "@info(name = 'query1') "
+ "@sink(type='mqtt', url= 'tcp://localhost:1883', "
+ "topic='mqtt_publish_event_without_automatic_reconnect',username='mqtt-user', "
+ "password='mqtt-password', clean.session='true', message.retain='false', "
+ "automatic.reconnect='false', keep.alive= '60'," + "@map(type='xml'))"
+ "Define stream BarStream (symbol string, price float, volume long);"
+ "from FooStream select symbol, price, volume insert into BarStream;");
InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
try {
this.mqttTestClient = new MqttTestClient("tcp://localhost:1883",
"mqtt_publish_event_without_automatic_reconnect", 1, resultContainer,
true, false);
} catch (ConnectionUnavailableException e) {
AssertJUnit.fail("Could not connect to broker.");
}
siddhiAppRuntime.start();
try {
fooStream.send(new Object[] { "WSO2", 55.6f, 100L });
fooStream.send(new Object[] { "IBM", 75.6f, 100L });
Thread.sleep(500);
mqttBroker.stopServer();
fooStream.send(new Object[] { "WSO2", 57.6f, 100L });
} catch (InterruptedException e) {
AssertJUnit.fail("Thread sleep was interrupted");
}
try {
final IConfig config = new MemoryConfig(properties);
mqttBroker.startServer(config);
Thread.sleep(2000);
fooStream.send(new Object[] { "JAMES", 58.6f, 100L });
Thread.sleep(500);
} catch (Exception e) {
AssertJUnit.fail("Thread sleep was interrupted");
}

count = mqttTestClient.getCount();
eventArrived = mqttTestClient.getEventArrived();
AssertJUnit.assertEquals(2, count);
AssertJUnit.assertTrue(eventArrived);
AssertJUnit.assertTrue(resultContainer.assertMessageContent("WSO2"));
AssertJUnit.assertTrue(resultContainer.assertMessageContent("IBM"));
siddhiAppRuntime.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class MqttTestClient {
private String clientId;
private String userName = null;
private String userPassword = "";
private boolean cleanSession = true;
private boolean eventArrived;
private int count;
private int keepAlive = 60;
Expand Down Expand Up @@ -74,6 +73,12 @@ public boolean getEventArrived() {

public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer resultContainer)
throws ConnectionUnavailableException {
this(brokerURL, topic, qos, resultContainer, false, true);
}

public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer resultContainer,
boolean automaticReconnect, boolean cleanSession)
throws ConnectionUnavailableException {
this.resultContainer = resultContainer;
try {
persistence = new MemoryPersistence();
Expand All @@ -85,6 +90,7 @@ public MqttTestClient(String brokerURL, String topic, int qos, ResultContainer r
connectionOptions.setCleanSession(cleanSession);
connectionOptions.setKeepAliveInterval(keepAlive);
connectionOptions.setConnectionTimeout(connectionTimeout);
connectionOptions.setAutomaticReconnect(automaticReconnect);
client.connect(connectionOptions);
} catch (MqttException e) {
throw new ConnectionUnavailableException(
Expand Down
Loading

0 comments on commit d164504

Please sign in to comment.