diff --git a/pom.xml b/pom.xml
index 79de8f4251..d049ee4a52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -203,6 +203,7 @@
apps
samples
sql
+ solace
diff --git a/solace/XmlJavadocCommentsExtractor.xsl b/solace/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000000..ec72325240
--- /dev/null
+++ b/solace/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,48 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/solace/pom.xml b/solace/pom.xml
new file mode 100755
index 0000000000..7d6d10806f
--- /dev/null
+++ b/solace/pom.xml
@@ -0,0 +1,224 @@
+
+
+ 4.0.0
+
+
+ org.apache.apex
+ malhar
+ 3.7.0-SNAPSHOT
+
+
+ malhar-solace
+ Apache Apex Malhar Solace Support
+ jar
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.4
+
+
+
+ test-jar
+
+ package
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ createJavadocDirectory
+ generate-resources
+
+
+
+
+
+
+
+ run
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+
+
+
+ xml-doclet
+ generate-resources
+
+ javadoc
+
+
+ com.github.markusbernhardt.xmldoclet.XmlDoclet
+ -d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml
+ false
+
+ com.github.markusbernhardt
+ xml-doclet
+ 1.0.4
+
+
+
+
+
+ attach-sources
+
+ jar
+
+
+ true
+
+
+ customTag1
+ a
+ Custom Tag One:
+
+
+ customTag2
+ a
+ Custom Tag two:
+
+
+ customTag3
+ a
+ Custom Tag three:
+
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ xml-maven-plugin
+ 1.0
+
+
+ transform-xmljavadoc
+ generate-resources
+
+ transform
+
+
+
+
+
+
+ ${project.build.directory}/generated-resources/xml-javadoc
+
+ ${project.artifactId}-${project.version}-javadoc.xml
+
+ XmlJavadocCommentsExtractor.xsl
+ ${project.build.directory}/generated-resources/xml-javadoc
+
+
+
+
+
+
+ maven-resources-plugin
+ 2.6
+
+
+ copy-resources
+ process-resources
+
+ copy-resources
+
+
+ ${basedir}/target/classes
+
+
+ ${project.build.directory}/generated-resources/xml-javadoc
+
+ ${project.artifactId}-${project.version}-javadoc.xml
+
+ true
+
+
+
+
+
+
+
+ maven-surefire-plugin
+
+ -Xmx2048m
+
+
+
+
+
+
+
+ com.solacesystems
+ sol-jms
+ 10.0.1
+
+
+ com.solacesystems
+ sol-jcsmp
+ 10.0.1
+
+
+ com.solacesystems
+ sol-common
+ 10.0.1
+
+
+ ${project.groupId}
+ malhar-library
+ ${project.version}
+
+
+ ${project.groupId}
+ malhar-library
+ ${project.version}
+ test
+ tests
+
+
+ ${project.groupId}
+ apex-common
+ ${apex.core.version}
+ jar
+
+
+
diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java
new file mode 100644
index 0000000000..13a7b0dcb6
--- /dev/null
+++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.Consumer;
+import com.solacesystems.jcsmp.JCSMPChannelProperties;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacesystems.jcsmp.JCSMPReconnectEventHandler;
+import com.solacesystems.jcsmp.JCSMPSession;
+import com.solacesystems.jcsmp.SessionEvent;
+import com.solacesystems.jcsmp.SessionEventArgs;
+import com.solacesystems.jcsmp.SessionEventHandler;
+import com.solacesystems.jcsmp.XMLMessageListener;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
+
+@SuppressWarnings("unused")
+public abstract class AbstractSolaceBaseInputOperator extends BaseOperator implements
+ InputOperator, Operator.ActivationListener, CheckpointListener
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractSolaceBaseInputOperator.class);
+
+ @NotNull
+ protected JCSMPProperties properties = new JCSMPProperties();
+ protected String connectRetries;
+ protected String reconnectRetries;
+ protected String unackedMessageLimit;
+
+
+ //protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ protected FSOpsIdempotentStorageManager idempotentStorageManager = new FSOpsIdempotentStorageManager();
+
+ protected transient JCSMPFactory factory;
+ protected transient JCSMPSession session;
+
+ protected transient Consumer consumer;
+ protected transient Consumer reliableConsumer;
+
+ protected transient int operatorId;
+ protected long windowId;
+ protected transient long lastCompletedWId;
+
+ protected transient int emitCount;
+
+ protected transient volatile boolean DRFailover = false;
+ protected transient volatile boolean TCPDisconnected = false;
+
+ protected transient BlockingQueue unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove
+ protected LinkedList inFlightMessageId = new LinkedList(); //keeps track of all in flight IDs since they are not necessarily sequential
+ protected transient ArrayBlockingQueue arrivedTopicMessagesToProcess;
+
+ //protected transient com.solace.dt.operator.DTSolaceOperatorInputOutput.ArrayBlockingQueue arrivedMessagesToProcess;
+
+ private transient ReconnectCallbackHandler rcHandler = new ReconnectCallbackHandler();
+
+ private transient CallbackMessageHandler cbHandler = new CallbackMessageHandler();
+
+ int spinMillis;
+
+ protected transient int reconnectRetryMillis = 0;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+
+ operatorId = context.getId();
+ logger.info("OperatorID from Base class: {}", operatorId);
+ spinMillis = context.getValue(com.datatorrent.api.Context.OperatorContext.SPIN_MILLIS);
+ factory = JCSMPFactory.onlyInstance();
+
+ //Required for HA and DR to try forever if set to "-1"
+ JCSMPChannelProperties channelProperties = (JCSMPChannelProperties)this.properties.getProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES);
+ channelProperties.setConnectRetries(Integer.parseInt(this.connectRetries));
+ channelProperties.setReconnectRetries(Integer.parseInt(this.reconnectRetries));
+
+ reconnectRetryMillis = channelProperties.getReconnectRetryWaitInMillis();
+
+
+ try {
+ session = factory.createSession(this.properties, null, new PrintingSessionEventHandler());
+ } catch (JCSMPException e) {
+ DTThrowable.rethrow(e);
+ }
+
+ //logger.debug("Properties Raw: \n{}", properties.toProperties());
+ logger.debug("Properties:\n" + properties.toString());
+ //logger.debug("\n===============================================\n");
+
+ idempotentStorageManager.setup(context);
+ lastCompletedWId = idempotentStorageManager.getLargestRecoveryWindow();
+ //logger.debug("++++++++++++++++++++Largest Completed: " + lastCompletedWId);
+
+
+ }
+
+ @Override
+ public void checkpointed(long arg0)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void committed(long window)
+ {
+ try {
+ idempotentStorageManager.deleteUpTo(operatorId, window);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+
+ }
+
+ protected T processMessage(BytesXMLMessage message)
+ {
+ T tuple = convert(message);
+ if (tuple != null) {
+ emitTuple(tuple);
+ }
+ return tuple;
+ }
+
+
+ @Override
+ public void activate(Context.OperatorContext context)
+ {
+ try {
+ session.connect();
+ reliableConsumer = session.getMessageConsumer(rcHandler, cbHandler);
+ //consumer = getConsumer();
+ reliableConsumer.start();
+
+ } catch (JCSMPException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ try {
+ consumer.stop();
+ clearConsumer();
+ consumer.close();
+ reliableConsumer.close();
+ } catch (JCSMPException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ idempotentStorageManager.teardown();
+ session.closeSession();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ this.windowId = windowId;
+ }
+
+
+ protected abstract T convert(BytesXMLMessage message);
+
+ protected abstract void emitTuple(T tuple);
+
+ protected abstract Consumer getConsumer() throws JCSMPException;
+
+ protected abstract void clearConsumer() throws JCSMPException;
+
+ public void setProperties(JCSMPProperties properties)
+ {
+ this.properties = properties;
+
+ }
+
+
+ public IdempotentStorageManager getIdempotentStorageManager()
+ {
+ return idempotentStorageManager;
+ }
+
+ public void setUnackedMessageLimit(String unackedMessageLimit)
+ {
+ this.unackedMessageLimit = unackedMessageLimit;
+ }
+
+ public String getUnackedMessageLimit()
+ {
+ return unackedMessageLimit;
+ }
+
+ public void setConnectRetries(String connectRetries)
+ {
+ this.connectRetries = connectRetries;
+ logger.info("+++++++++++++++++++reconnectRetries: {}", this.connectRetries);
+ }
+
+ public void setReconnectRetries(String reconnectRetries)
+ {
+ this.reconnectRetries = reconnectRetries;
+ logger.info("+++++++++++++++++++reconnectRetries: {}", this.reconnectRetries);
+ }
+
+ public void setReapplySubscriptions(boolean state)
+ {
+ this.properties.setBooleanProperty(JCSMPProperties.REAPPLY_SUBSCRIPTIONS, state);
+ }
+
+ public void startConsumer()
+ {
+ try {
+ consumer = getConsumer();
+ } catch (JCSMPException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+
+ public class ReconnectCallbackHandler implements JCSMPReconnectEventHandler
+ {
+
+
+ @Override
+ public void postReconnect() throws JCSMPException
+ {
+
+ logger.info("++++++++++++++Solace client now Reconnected -- possibe Solace HA or DR fail-over +++++++++++++");
+ TCPDisconnected = false;
+
+ }
+
+ @Override
+ public boolean preReconnect() throws JCSMPException
+ {
+ DRFailover = false;
+ logger.info("++++++++++++++Solace client now in Pre Reconnect state -- possibe Solace HA or DR fail-over +++++++++++++");
+ TCPDisconnected = true;
+ return true;
+ }
+
+ }
+
+ public class PrintingSessionEventHandler implements SessionEventHandler
+ {
+
+
+ public void handleEvent(SessionEventArgs event)
+ {
+ logger.info("Received Session Event %s with info %s\n", event.getEvent(), event.getInfo());
+
+ // Received event possibly due to DR fail-ver complete
+ if (event.getEvent() == SessionEvent.VIRTUAL_ROUTER_NAME_CHANGED) {
+ DRFailover = true; // may or may not need recovery
+ TCPDisconnected = false;
+ }
+
+ }
+ }
+
+ public class CallbackMessageHandler implements XMLMessageListener
+ {
+
+ @Override
+ public void onException(JCSMPException e)
+ {
+ DTThrowable.rethrow(e);
+
+ }
+
+ @Override
+ public void onReceive(BytesXMLMessage msg)
+ {
+ arrivedTopicMessagesToProcess.add(msg);
+
+ }
+
+ }
+
+
+}
diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java
new file mode 100644
index 0000000000..a616218469
--- /dev/null
+++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.Consumer;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.Topic;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.netlet.util.DTThrowable;
+
+public abstract class AbstractSolaceDirectInputOperator extends AbstractSolaceBaseInputOperator implements InputOperator, Operator.ActivationListener
+{
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceDirectInputOperator.class);
+
+ private final transient AtomicReference throwable;
+ private transient Context.OperatorContext context;
+ protected transient long currentWindowId;
+
+ @NotNull
+ protected String topicName;
+ private transient Topic topic;
+ private transient BytesXMLMessage recentMessage = null;
+
+ private transient List messages = new ArrayList();
+ protected final transient Map currentWindowRecoveryState;
+ protected static final transient int DEFAULT_BUFFER_SIZE = 500;
+
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ arrivedTopicMessagesToProcess = new ArrayBlockingQueue(Integer.parseInt(this.unackedMessageLimit));
+
+ this.context = context;
+
+ //setup info for HA and DR at the transport level
+ super.setConnectRetries(this.connectRetries);
+ super.setReconnectRetries(this.reconnectRetries);
+ super.setReapplySubscriptions(true);
+
+ super.setup(context);
+ topic = factory.createTopic(topicName);
+ addSubscription(topic);
+ }
+
+ public AbstractSolaceDirectInputOperator()
+ {
+ throwable = new AtomicReference();
+ currentWindowRecoveryState = Maps.newLinkedHashMap();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ super.beginWindow(windowId);
+ if (windowId <= lastCompletedWId) {
+ handleRecovery(currentWindowId);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ try {
+ if (windowId > lastCompletedWId) {
+ idempotentStorageManager.save(currentWindowRecoveryState, operatorId, windowId);
+ }
+ currentWindowRecoveryState.clear();
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ emitCount = 0; //reset emit count
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+ return;
+ }
+
+ BytesXMLMessage message;
+ try {
+ while (emitCount < DEFAULT_BUFFER_SIZE && ((message = (BytesXMLMessage)super.arrivedTopicMessagesToProcess.poll(10, TimeUnit.MILLISECONDS)) != null)) {
+ processMessage(message);
+ emitCount++;
+ }
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ }
+ emitCount++;
+
+ }
+
+ /*
+ @Override
+ protected T convert(BytesXMLMessage message) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ */
+
+ /*
+ @Override
+ protected void emitTuple(T tuple) {
+ // TODO Auto-generated method stub
+
+ }
+ */
+
+
+ @Override
+ protected Consumer getConsumer() throws JCSMPException
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /*
+ @Override
+ protected void clearConsumer() throws JCSMPException {
+ // TODO Auto-generated method stub
+
+ }
+ */
+
+ protected void addSubscription(Topic topic)
+ {
+ try {
+ session.addSubscription(topic);
+ } catch (JCSMPException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ protected void removeSubscription(Topic topic)
+ {
+ try {
+ session.removeSubscription(topic);
+ } catch (JCSMPException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ /*
+ @Override
+ protected T processMessage(BytesXMLMessage message)
+ {
+ T tuple = super.processMessage(message);
+ if(tuple != null) {
+ messages.add(tuple);
+ return tuple;
+ }
+
+ return null;
+ }
+ */
+
+ @Override
+ protected T processMessage(BytesXMLMessage message)
+ {
+
+ T payload = super.processMessage(message);
+ if (payload != null) {
+ currentWindowRecoveryState.put(message.getMessageIdLong(), payload);
+ }
+ recentMessage = message;
+ return payload;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void handleRecovery(long windowId)
+ {
+ LOG.info("++++++++++++++++++ Handle Recovery called");
+
+ Map recoveredData;
+ try {
+ recoveredData = (Map)idempotentStorageManager.load(operatorId, windowId);
+
+ if (recoveredData == null) {
+ return;
+ }
+ for (Map.Entry recoveredEntry : recoveredData.entrySet()) {
+ emitTuple(recoveredEntry.getValue());
+ }
+
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+
+ }
+
+ public String getTopicName()
+ {
+ return topicName;
+ }
+
+ public void setTopicName(String topicName)
+ {
+ this.topicName = topicName;
+ }
+
+}
diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java
new file mode 100644
index 0000000000..8f2f70f023
--- /dev/null
+++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java
@@ -0,0 +1,649 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.NotNull;
+import com.google.common.collect.Maps;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.Consumer;
+import com.solacesystems.jcsmp.ConsumerFlowProperties;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Endpoint;
+import com.solacesystems.jcsmp.EndpointProperties;
+import com.solacesystems.jcsmp.FlowReceiver;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacesystems.jcsmp.XMLMessageListener;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
+
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractSolaceGuaranteedIdempotentInputOperator extends AbstractSolaceBaseInputOperator implements InputOperator, Operator.ActivationListener
+{
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceGuaranteedIdempotentInputOperator.class);
+
+ @NotNull
+ protected String endpointName;
+
+
+ @NotNull
+ protected transient EndpointType endpointType = EndpointType.QUEUE;
+ private transient Endpoint endpoint;
+ @NotNull
+ public transient FlowCallbackMessageHandler FlowHandler = new FlowCallbackMessageHandler();
+ @NotNull
+ protected transient EndpointProperties endpointProperties = new EndpointProperties();
+ private transient BytesXMLMessage recentMessage = null;
+
+ private transient long[] operatorRecoveredWindows;
+ protected transient long currentWindowId;
+ @SuppressWarnings("unused")
+ private final transient AtomicReference throwable;
+ protected transient ArrayBlockingQueue arrivedMessagesToProcess;
+ protected transient BlockingQueue unackedMessages;
+
+ protected transient TreeMap lastMessages = new TreeMap();
+ @SuppressWarnings("unused")
+ private transient long windowTime;
+ protected final transient Map currentWindowRecoveryState;
+ protected transient LinkedList inFlightRecoveryMessages = new LinkedList(); // used by partition that restarts for duplicate detection
+ protected transient LinkedList inFlightRecoveryMessagesPartition = new LinkedList(); // used by partitions that didn't restart for duplicate detection
+ protected transient LinkedList inFlightRecoveryMessagesDR = new LinkedList(); // used by partition that detect DR fail over for duplicate detection
+
+ private transient Context.OperatorContext context;
+ private transient int partitionCount = 0;
+
+ protected static final transient int DEFAULT_BUFFER_SIZE = 500;
+ protected static transient int DR_COUNTER_SIZE = 2 * DEFAULT_BUFFER_SIZE;
+ protected transient int DR_COUNTER = 0;
+
+ protected transient volatile boolean doneDups = false;
+ protected transient volatile boolean doneDupsPartitioned = true;
+ protected transient volatile boolean donePartitionCheck = false;
+ protected transient volatile boolean doneDupsDR = true;
+
+
+ public AbstractSolaceGuaranteedIdempotentInputOperator()
+ {
+
+ throwable = new AtomicReference();
+ currentWindowRecoveryState = Maps.newLinkedHashMap();
+
+ }
+
+
+ protected boolean messageConsumed(BytesXMLMessage message) throws JCSMPException
+ {
+ if (message.getRedelivered()) {
+ return false;
+ }
+ return true;
+ }
+
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ this.context = context;
+
+ LOG.info("Initial Partition Count: {}", currentpartitionCount());
+
+ inFlightRecoveryMessages.clear();
+ inFlightRecoveryMessagesPartition.clear();
+ inFlightRecoveryMessagesDR.clear();
+
+
+ //super.setUnackedMessageLimit(this.unackedMessageLimit);
+ //setup info for HA and DR at the transport level
+ super.setConnectRetries(this.connectRetries);
+ super.setReconnectRetries(this.reconnectRetries);
+
+
+ super.setup(context);
+
+ LOG.info("Operator ID = " + context.getId());
+
+ windowTime = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+
+ arrivedMessagesToProcess = new ArrayBlockingQueue(Integer.parseInt(this.unackedMessageLimit));
+
+ unackedMessages = new ArrayBlockingQueue((Integer.parseInt(this.unackedMessageLimit)) * (context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT)) * 2);
+
+ endpoint = factory.createQueue(this.endpointName);
+
+ if (windowId > idempotentStorageManager.getLargestRecoveryWindow()) {
+ super.startConsumer();
+ }
+
+ try {
+ operatorRecoveredWindows = idempotentStorageManager.getWindowIds(context.getId());
+ if (operatorRecoveredWindows != null) {
+ Arrays.sort(operatorRecoveredWindows);
+ }
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+
+
+ }
+
+ @Override
+ protected T processMessage(BytesXMLMessage message)
+ {
+
+ T payload = super.processMessage(message);
+ if (payload != null) {
+ currentWindowRecoveryState.put(message.getMessageIdLong(), payload);
+ }
+ recentMessage = message;
+ return payload;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ //LOG.debug("Largest Recovery Wndow is : {} for current window: {}", idempotentStorageManager.getLargestRecoveryWindow(), windowId);
+ currentWindowId = windowId;
+ if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+ //LOG.debug("About to handle recovery, current windowID is: {} largested recovered ID is: {}" + currentWindowId, idempotentStorageManager.getLargestRecoveryWindow());
+ handleRecovery(windowId);
+ } else {
+ if (super.consumer == null) {
+ super.startConsumer();
+ LOG.debug("Started Flow Consumer after recovery is complete");
+ }
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ protected void handleRecovery(long windowId)
+ {
+ LOG.info("++++++++++++++++++ Handle Recovery called");
+
+ Map recoveredData;
+ try {
+ recoveredData = (Map)idempotentStorageManager.load(operatorId, windowId);
+
+ if (recoveredData == null) {
+ return;
+ }
+ for (Map.Entry recoveredEntry : recoveredData.entrySet()) {
+ emitTuple(recoveredEntry.getValue());
+ inFlightRecoveryMessages.add(recoveredEntry.getValue());
+ }
+
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+
+ }
+
+
+ @Override
+ public void endWindow()
+ {
+ @SuppressWarnings("unused")
+ boolean stateSaved = false;
+ boolean ackCompleted = false;
+ int messagesToAck = unackedMessages.size();
+
+ if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) {
+
+
+ if (recentMessage != null) {
+ lastMessages.put(currentWindowId, recentMessage);
+ }
+
+ try {
+
+ if (recentMessage != null) {
+ idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
+ stateSaved = true;
+ LOG.debug("Saved for window: " + currentWindowId);
+ currentWindowRecoveryState.clear();
+ LOG.debug("acking messages");
+ ackCompleted = ackMessages();
+ LOG.debug("Acked status: " + ackCompleted + " on window " + currentWindowId + " ack count: : " + messagesToAck);
+ }
+ } catch (Throwable t) {
+ if (!ackCompleted) {
+ LOG.info("confirm recovery of {} for {} does not exist", operatorId, currentWindowId, t);
+ }
+ DTThrowable.rethrow(t);
+ }
+
+
+ emitCount = 0; //reset emit count
+ } else {
+ currentWindowRecoveryState.clear();
+ ackCompleted = ackMessages();
+ LOG.debug("acking messages completed successfully: " + ackCompleted);
+ }
+
+ }
+
+
+ @Override
+ public void emitTuples()
+ {
+ if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+ return;
+ }
+ //If in HA or DR fail-over, block until Solace TCP connection is reestablished so recovery windows are not lost as empty windows
+ int sleepCounter = 0;
+ while (super.TCPDisconnected == true) {
+ sleepCounter++;
+ try {
+ Thread.sleep(super.reconnectRetryMillis);
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ }
+ if (sleepCounter % 10 == 0) {
+ LOG.info("Sleeping for another 30 seconds waiting for TCP reconnect for Solace with milliseconds sleep per cycle = {}", super.reconnectRetryMillis);
+ }
+ LOG.info("Queued messages to process: {}", arrivedMessagesToProcess.size());
+ }
+
+
+ BytesXMLMessage message;
+
+ try {
+ // process messages, window is defined by emitCount or timeout of waiting for messages for 10 milliseocnds
+ while (emitCount < DEFAULT_BUFFER_SIZE && ((message = (BytesXMLMessage)arrivedMessagesToProcess.poll(10, TimeUnit.MILLISECONDS)) != null)) {
+
+
+ boolean goodToGo = true;
+ if (message != null) {
+
+ //LOG.debug("To be processed: {} sequence number: {} Received message with ID: {} and AppID: {} redelivered: {}", arrivedMessagesToProcess.size(), message.getSequenceNumber(), message.getMessageIdLong(), message.getApplicationMessageId(), message.getRedelivered());
+ //LOG.debug(" AppID = {} redelivered: {}", message.getApplicationMessageId(), message.getRedelivered());
+
+
+ // Checking for duplicates after recovery from operator restart looking for re-delivered messages in restarted operator
+ if (message.getRedelivered() && inFlightRecoveryMessages.size() > 0) {
+ T payload = convert(message);
+ if (inFlightRecoveryMessages.contains(payload)) {
+ LOG.info("Redelivered Message Duplicate possibly due to input operator restart");
+ goodToGo = false;
+ if (message.getDeliveryMode() == DeliveryMode.PERSISTENT || message.getDeliveryMode() == DeliveryMode.NON_PERSISTENT) {
+ unackedMessages.add(message);
+ }
+
+ recentMessage = message;
+
+ }
+
+
+ } else {
+ doneDups = true;
+ }
+
+ /*
+ if(message.getRedelivered() ) {
+ LOG.debug("In FLight Size: {} Current Part Count: {} Dups: {} and {}", inFlightRecoveryMessages.size(), currentpartitionCount(), doneDupsPartitioned, donePartitionCheck);
+ }
+ */
+
+ //Operator was not restarted, re-delivered messages are a result of another partitioned operator restart
+ if (message.getRedelivered() && inFlightRecoveryMessages.size() == 0 && currentpartitionCount() > 1 && doneDupsPartitioned == true && donePartitionCheck == false) {
+ try {
+ doneDupsPartitioned = loadPartitionReplayCheck();
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ donePartitionCheck = true;
+ }
+
+ if (message.getRedelivered() && doneDupsPartitioned == false && inFlightRecoveryMessagesPartition.size() >= 0) {
+ T payload = convert(message);
+ if (inFlightRecoveryMessagesPartition.contains(payload)) {
+ LOG.info("Redelivered Message Duplicate possibly due to input operator restart in another partition");
+ goodToGo = false;
+ if (message.getDeliveryMode() == DeliveryMode.PERSISTENT || message.getDeliveryMode() == DeliveryMode.NON_PERSISTENT) {
+ unackedMessages.add(message);
+ }
+
+ recentMessage = message;
+
+ }
+ } else {
+ doneDupsPartitioned = true;
+ donePartitionCheck = false; //Get ready in case another partition restarts and results in replayed messages
+ }
+
+
+ // Checking for duplicates after recovery from DR looking for redelivered messages
+ if (super.DRFailover == true && !(message.getRedelivered()) && doneDupsDR == true && donePartitionCheck == false) {
+ try {
+ doneDupsDR = loadPartitionReplayCheck();
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ donePartitionCheck = true;
+ DR_COUNTER_SIZE = DR_COUNTER_SIZE + arrivedMessagesToProcess.size();
+ }
+
+
+ if (inFlightRecoveryMessagesDR.size() == 0 && super.DRFailover == true) {
+ super.DRFailover = false;
+ doneDupsDR = true;
+ donePartitionCheck = false;
+ inFlightRecoveryMessagesDR.clear();
+ LOG.info("Cleared in flight recovery messages, no more possible duplicate messages detected after DR fail over");
+ }
+
+ if (!(message.getRedelivered()) && doneDupsDR == false && inFlightRecoveryMessagesDR.size() > 0 && super.DRFailover == true && DR_COUNTER < DR_COUNTER_SIZE) {
+ DR_COUNTER++;
+ T payload = convert(message);
+ if (inFlightRecoveryMessagesDR.contains(payload)) {
+ LOG.info("Message Duplicate detected after Solace DR fail over");
+ goodToGo = false;
+ if (message.getDeliveryMode() == DeliveryMode.PERSISTENT || message.getDeliveryMode() == DeliveryMode.NON_PERSISTENT) {
+ unackedMessages.add(message);
+ }
+
+ recentMessage = message;
+
+ }
+ //Reset DR processing for duplicates after 2 windows worth of message checks
+ } else if (DR_COUNTER == DR_COUNTER_SIZE) {
+ //Once there are no more duplicates detected there will be no more duplicates due to DR fail over
+ doneDupsDR = true;
+ donePartitionCheck = false;
+ inFlightRecoveryMessagesDR.clear();
+ super.DRFailover = false;
+ DR_COUNTER = 0;
+ DR_COUNTER_SIZE = 2 * DEFAULT_BUFFER_SIZE;
+ LOG.info("Cleared in flight recovery messages, no more possible duplicate messages detected after DR fail over");
+ }
+
+
+ if (goodToGo) {
+ //if the redelivery flag is no no longer on the messages we can dispose of the inFLightRecoveryMessages
+ if (message.getRedelivered() == false && inFlightRecoveryMessages.size() > 0 && doneDups == true) {
+ inFlightRecoveryMessages.clear();
+ LOG.info("Cleared in flight recovery messages, no more redelivered or DR recovery messages");
+ doneDups = false;
+ }
+ if (message.getRedelivered() == false && inFlightRecoveryMessagesPartition.size() > 0 && doneDupsPartitioned == true) {
+ inFlightRecoveryMessagesPartition.clear();
+ LOG.info("Cleared in flight recovery messages, no more redelivered messages");
+ doneDupsPartitioned = false;
+ }
+
+ processMessage(message);
+ if (message.getDeliveryMode() == DeliveryMode.PERSISTENT || message.getDeliveryMode() == DeliveryMode.NON_PERSISTENT) {
+ unackedMessages.add(message);
+ }
+ emitCount++;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ }
+
+ }
+
+ @Override
+ public void committed(long window)
+ {
+ if (recentMessage == null) {
+ return;
+ }
+
+ Set windows = lastMessages.keySet();
+ Iterator iterator = windows.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next() <= window) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+
+ super.committed(window);
+ }
+
+ private boolean ackMessages()
+ {
+ boolean processedOK = false;
+
+
+ BytesXMLMessage messageToAckUpTo = lastMessages.get(currentWindowId);
+
+ if (messageToAckUpTo != null) {
+ if (unackedMessages.size() > 0) {
+ while (unackedMessages.peek() != messageToAckUpTo) {
+ try {
+ BytesXMLMessage taken = unackedMessages.take();
+ //LOG.debug("Acking: {}", taken.getApplicationMessageId());
+ taken.ackMessage();
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ if (unackedMessages.peek() == messageToAckUpTo) {
+ try {
+ BytesXMLMessage taken = unackedMessages.take();
+ taken.ackMessage();
+ } catch (InterruptedException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+ processedOK = true;
+ } else {
+ LOG.debug("Unacked Array is size zero");
+
+ }
+ } else {
+ LOG.info("messageToAckUpTo is null -- possibly due to being in recovery stage");
+ }
+
+
+ return processedOK;
+
+ }
+
+ public void setEndpointName(String endpointName)
+ {
+ this.endpointName = endpointName;
+ LOG.info("+++++++++++++++++++enpointName: {}", this.endpointName);
+ }
+
+ public int currentpartitionCount()
+ {
+
+ Attribute> parts = OperatorContext.PARTITIONER;
+
+ //If only one partition the attribute is null
+ if (context.getValue(parts) != null) {
+ partitionCount = ((StatelessPartitioner>)context.getValue(parts)).getPartitionCount();
+ LOG.debug("Current Partition Count: " + partitionCount);
+ } else {
+ partitionCount = 1;
+ LOG.debug("Current Partition Count: " + partitionCount);
+ }
+ return partitionCount;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public boolean loadPartitionReplayCheck() throws IOException
+ {
+ if (!(super.DRFailover)) {
+ LOG.info("Received redelivered message from Solace, another parition must have restarted");
+ } else {
+ LOG.info("Received DR fail over event from Solace, the partiions are now talking to another Solace Router");
+ }
+ FSOpsIdempotentStorageManager snapshotState = idempotentStorageManager.getIdempotentStateSnapshot();
+ boolean _doneDupsPartitioned = true;
+ LOG.info("Largest recovery window: {}", snapshotState.getLargestRecoveryWindow());
+
+ LOG.info("Recovery Path: {}", snapshotState.getRecoveryPath());
+
+
+ Set opIds = snapshotState.getOperatorIds();
+ LOG.info("Received {} operatorIDs, with values:", opIds.size());
+
+ int[] arrOpIds = new int[opIds.size()];
+ int index = 0;
+ for (Integer i : opIds) {
+ arrOpIds[index++] = i;
+ }
+ for (int x = 0; x < arrOpIds.length; x++) {
+ LOG.info(Integer.toString(arrOpIds[x]));
+ }
+
+
+ for (int i = 0; i < arrOpIds.length; i++) {
+
+
+ long[] wins = snapshotState.getOrderedWindowIds(arrOpIds[i]);
+ try {
+ //Get last two recovery windows
+
+ LOG.info("Window to recover: {} for partition: {}", ((long)wins[wins.length - 1]), arrOpIds[i]);
+ Map recoveredData = (Map)snapshotState.load(arrOpIds[i], ((long)wins[wins.length - 1]));
+
+ if (recoveredData == null) {
+ LOG.info("Recovered data is null for window: {}", ((long)wins[wins.length - 1]));
+
+ } else {
+ for (Map.Entry recoveredEntry : recoveredData.entrySet()) {
+ if (!(super.DRFailover)) {
+ inFlightRecoveryMessagesPartition.add(recoveredEntry.getValue());
+ } else {
+ inFlightRecoveryMessagesDR.add(recoveredEntry.getValue());
+ }
+ }
+
+
+ LOG.info("Recovered data is {} messages for window: {}" + recoveredData.size(), ((long)wins[wins.length - 1]));
+ }
+
+ LOG.info("Window to recover: {} for partition: {}", ((long)wins[wins.length - 2]), arrOpIds[i]);
+ recoveredData = (Map)snapshotState.load(arrOpIds[i], ((long)wins[wins.length - 2]));
+
+ if (recoveredData == null) {
+ LOG.info("Recovered data is null for window: {}", ((long)wins[wins.length - 2]));
+ //continue;
+ } else {
+ for (Map.Entry recoveredEntry : recoveredData.entrySet()) {
+ if (!(super.DRFailover)) {
+ inFlightRecoveryMessagesPartition.add(recoveredEntry.getValue());
+ } else {
+ inFlightRecoveryMessagesDR.add(recoveredEntry.getValue());
+ }
+ }
+ LOG.info("Recovered data is {} messages for window: {}", recoveredData.size(), ((long)wins[wins.length - 2]));
+ }
+ _doneDupsPartitioned = false;
+
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+
+
+ LOG.info("Added parition data from partition: {}", arrOpIds[i]);
+ }
+
+ if (!(super.DRFailover)) {
+ LOG.info("Total Recovery Partition Data Records: {} ", inFlightRecoveryMessagesPartition.size());
+ } else {
+ LOG.info("Total Recovery DR fail over Data Records: {}", inFlightRecoveryMessagesDR.size());
+ }
+ snapshotState.teardown();
+ return _doneDupsPartitioned;
+ }
+
+
+ // Start the actual consumption of Solace messages from the Queue
+ protected Consumer getConsumer() throws JCSMPException
+ {
+
+ ConsumerFlowProperties consumerFlowProperties = new ConsumerFlowProperties();
+
+ consumerFlowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
+ consumerFlowProperties.setEndpoint(this.endpoint);
+
+ FlowReceiver f_receiver = session.createFlow(FlowHandler, consumerFlowProperties);
+ f_receiver.start();
+ LOG.info("Flow started on queue: {}", f_receiver.getDestination());
+ return f_receiver;
+ }
+
+ //public void setIdempotentStorageManager(IdempotentStorageManager storageManager)
+ public void setIdempotentStorageManager(FSOpsIdempotentStorageManager storageManager)
+ {
+ this.idempotentStorageManager = storageManager;
+ }
+
+
+ public IdempotentStorageManager getIdempotentStorageManager()
+ {
+ return this.idempotentStorageManager;
+ }
+
+
+ public class FlowCallbackMessageHandler implements XMLMessageListener
+ {
+
+
+ @Override
+ public void onException(JCSMPException e)
+ {
+ DTThrowable.rethrow(e);
+
+ }
+
+ @Override
+ public void onReceive(BytesXMLMessage message)
+ {
+ arrivedMessagesToProcess.add(message);
+ }
+ }
+
+
+}
diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/EndpointType.java b/solace/src/main/java/org/apache/apex/malhar/solace/EndpointType.java
new file mode 100644
index 0000000000..0236215f79
--- /dev/null
+++ b/solace/src/main/java/org/apache/apex/malhar/solace/EndpointType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+/**
+ *
+ */
+public enum EndpointType
+{
+ QUEUE, TOPIC
+}
diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/FSOpsIdempotentStorageManager.java b/solace/src/main/java/org/apache/apex/malhar/solace/FSOpsIdempotentStorageManager.java
new file mode 100644
index 0000000000..0143a51431
--- /dev/null
+++ b/solace/src/main/java/org/apache/apex/malhar/solace/FSOpsIdempotentStorageManager.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+
+import com.google.common.collect.Sets;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+
+/**
+ *
+ */
+public class FSOpsIdempotentStorageManager extends IdempotentStorageManager.FSIdempotentStorageManager
+{
+
+ protected transient Context.OperatorContext context;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ this.context = context;
+ }
+
+ public FSOpsIdempotentStorageManager getIdempotentStateSnapshot()
+ {
+ FSOpsIdempotentStorageManager snapshot = new FSOpsIdempotentStorageManager();
+ snapshot.setup(context);
+ return snapshot;
+ }
+
+ public Set getOperatorIds() throws IOException
+ {
+ Set ids = Sets.newLinkedHashSet();
+ FileStatus[] fileStatuses = fs.listStatus(appPath);
+ for (FileStatus fileStatus : fileStatuses) {
+ ids.add(Integer.parseInt(fileStatus.getPath().getName()));
+ }
+ return ids;
+ }
+
+ public long[] getOrderedWindowIds(int operatorId) throws IOException
+ {
+ long[] windowIds = getWindowIds(operatorId);
+ Arrays.sort(windowIds);
+ return windowIds;
+ }
+
+}
diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java
new file mode 100644
index 0000000000..7c4ceb104f
--- /dev/null
+++ b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+import com.solacesystems.jcsmp.BytesMessage;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.TextMessage;
+import com.datatorrent.api.DefaultOutputPort;
+
+
+//public class SolaceGuaranteedTextStrInputOperator extends AbstractSolaceGuaranteedInputOperator
+public class SolaceGuaranteedTextStrInputOperator extends AbstractSolaceGuaranteedIdempotentInputOperator
+{
+ public final transient DefaultOutputPort output = new DefaultOutputPort();
+
+ @Override
+ protected String convert(BytesXMLMessage message)
+ {
+ String out = null;
+ if (message instanceof TextMessage) {
+ out = ((TextMessage)message).getText();
+ } else if (message instanceof BytesMessage) {
+
+ out = new String(((BytesMessage)message).getData());
+ }
+
+ return out;
+
+ }
+
+ @Override
+ protected void emitTuple(String tuple)
+ {
+ output.emit(tuple);
+ }
+
+
+ @Override
+ protected void clearConsumer() throws JCSMPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java
new file mode 100644
index 0000000000..52748da1e8
--- /dev/null
+++ b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+import com.solacesystems.jcsmp.BytesMessage;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.TextMessage;
+import com.datatorrent.api.DefaultOutputPort;
+
+public class SolaceReliableTextStrInputOperator extends AbstractSolaceDirectInputOperator
+{
+
+ public final transient DefaultOutputPort output = new DefaultOutputPort();
+
+ @Override
+ protected String convert(BytesXMLMessage message)
+ {
+ String out = null;
+ if (message instanceof TextMessage) {
+ out = ((TextMessage)message).getText();
+ } else if (message instanceof BytesMessage) {
+
+ out = new String(((BytesMessage)message).getData());
+ }
+
+ return out;
+
+ }
+
+ @Override
+ protected void emitTuple(String tuple)
+ {
+ output.emit(tuple);
+ }
+
+
+ @Override
+ protected void clearConsumer() throws JCSMPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/solace/src/test/java/org/apache/apex/malhar/solace/ApplicationTest.java b/solace/src/test/java/org/apache/apex/malhar/solace/ApplicationTest.java
new file mode 100644
index 0000000000..8cea81dcf3
--- /dev/null
+++ b/solace/src/test/java/org/apache/apex/malhar/solace/ApplicationTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+import java.io.IOException;
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+
+ @Test
+ public void testApplication() throws IOException, Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/SolaceTestConf.xml"));
+ lma.prepareDAG(new SolTestApplication(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(300000); // runs for 5 minutes and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+}
diff --git a/solace/src/test/java/org/apache/apex/malhar/solace/SolTestApplication.java b/solace/src/test/java/org/apache/apex/malhar/solace/SolTestApplication.java
new file mode 100644
index 0000000000..a0cfe81deb
--- /dev/null
+++ b/solace/src/test/java/org/apache/apex/malhar/solace/SolTestApplication.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.solace;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.solacesystems.jcsmp.JCSMPProperties;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.stream.Counter;
+
+//import com.datatorrent.api.Context.PortContext;
+//import com.datatorrent.lib.util.BaseKeyValueOperator.DefaultPartitionCodec;
+//import org.apache.hadoop.fs.Path;
+
+/**
+ *
+ */
+@ApplicationAnnotation(name = "SolTestApp")
+public class SolTestApplication implements StreamingApplication
+{
+ private static final Logger logger = LoggerFactory.getLogger(SolTestApplication.class);
+ private static final String DT_SOLACE_PROP_IDENTIFIER = "dt.solace";
+ private static final String DT_SOLACE_PROP_SEARCH = DT_SOLACE_PROP_IDENTIFIER + "\\..*";
+ private static final int DT_SOLACE_PROP_INDEX = DT_SOLACE_PROP_IDENTIFIER.length() + 1;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Map props = conf.getValByRegex(DT_SOLACE_PROP_SEARCH);
+ logger.info("{}", props);
+ JCSMPProperties properties = new JCSMPProperties();
+
+ for (Map.Entry entry : props.entrySet()) {
+
+ properties.setProperty(entry.getKey().substring(DT_SOLACE_PROP_INDEX), entry.getValue());
+ logger.info("Property: {} Value: {}", entry.getKey().substring(DT_SOLACE_PROP_INDEX), entry.getValue());
+ }
+
+ //Used for sticky partitioning
+ //@SuppressWarnings("unused")
+ //DefaultPartitionCodec codec = new DefaultPartitionCodec();
+
+ dag.setAttribute(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 20);
+ dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 20);
+
+ /*====ECS processing====*/
+ //Test for Guaranteed Solace consumers
+ //SolaceGuaranteedTextStrInputOperator ecsInput = dag.addOperator("SolaceEcsInput", SolaceGuaranteedTextStrInputOperator.class);
+ SolaceReliableTextStrInputOperator ecsInput = dag.addOperator("SolaceEcsInput", SolaceReliableTextStrInputOperator.class);
+ //Test for Reliable Solace consumers
+ ecsInput.setProperties(properties);
+ dag.setAttribute(ecsInput, Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 600);
+
+
+ Counter counter = dag.addOperator("Counter", new Counter());
+
+
+ ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());
+
+
+ dag.addStream("parseEcsXml", ecsInput.output, counter.input);
+ dag.addStream("console", counter.output, console.input);
+
+ }
+
+}
diff --git a/solace/src/test/resources/SolaceTestConf.xml b/solace/src/test/resources/SolaceTestConf.xml
new file mode 100644
index 0000000000..46d62645cf
--- /dev/null
+++ b/solace/src/test/resources/SolaceTestConf.xml
@@ -0,0 +1,77 @@
+
+
+
+
+ dt.attr.MASTER_MEMORY_MB
+ 1024
+
+
+
+ dt.operator.SolaceEcsInput.prop.unackedMessageLimit
+ 100000
+
+
+
+
+ dt.solace.host
+ host1:55555,host2:55555
+
+
+
+ dt.solace.username
+ username
+
+
+ dt.solace.password
+ password
+
+
+ dt.solace.vpn_name
+ vpn-name
+
+
+ dt.operator.SolaceEcsInput.prop.topicName
+ topic-name
+
+
+
+
+
+ dt.operator.SolaceEcsInput.prop.connectRetries
+ -1
+
+
+ dt.operator.SolaceEcsInput.prop.reconnectRetries
+ -1
+
+
+
+ dt.operator.SolaceEcsInput.attr.PARTITIONER
+ com.datatorrent.common.partitioner.StatelessPartitioner:4
+
+
+
diff --git a/solace/src/test/resources/log4j.properties b/solace/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..6bb22c45b4
--- /dev/null
+++ b/solace/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=WARN
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.com.solacesystems.jcsmp=WARN