From 08d9d962ff178744986ae628849c3c6e2a5f8785 Mon Sep 17 00:00:00 2001 From: Heinz Schaffner & Pramod Immaneni & Ashwin Putta Date: Sun, 5 Mar 2017 22:23:50 -0800 Subject: [PATCH] Solace input operators --- pom.xml | 1 + solace/pom.xml | 227 +++++++ .../AbstractSolaceBaseInputOperator.java | 305 +++++++++ .../AbstractSolaceDirectInputOperator.java | 199 ++++++ ...laceGuaranteedIdempotentInputOperator.java | 635 ++++++++++++++++++ .../solace/FSOpsIdempotentStorageManager.java | 70 ++ .../SolaceGuaranteedTextStrInputOperator.java | 52 ++ .../SolaceReliableTextStrInputOperator.java | 51 ++ .../apex/malhar/solace/ApplicationTest.java | 51 ++ .../malhar/solace/SolTestApplication.java | 90 +++ solace/src/test/resources/SolaceTestConf.xml | 77 +++ solace/src/test/resources/log4j.properties | 43 ++ 12 files changed, 1801 insertions(+) create mode 100755 solace/pom.xml create mode 100644 solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java create mode 100644 solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java create mode 100644 solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java create mode 100644 solace/src/main/java/org/apache/apex/malhar/solace/FSOpsIdempotentStorageManager.java create mode 100644 solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java create mode 100644 solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java create mode 100644 solace/src/test/java/org/apache/apex/malhar/solace/ApplicationTest.java create mode 100644 solace/src/test/java/org/apache/apex/malhar/solace/SolTestApplication.java create mode 100644 solace/src/test/resources/SolaceTestConf.xml create mode 100644 solace/src/test/resources/log4j.properties diff --git a/pom.xml b/pom.xml index d6bdfdd336..56168f61ef 100644 --- a/pom.xml +++ b/pom.xml @@ -204,6 +204,7 @@ samples sql flume + solace diff --git a/solace/pom.xml b/solace/pom.xml new file mode 100755 index 0000000000..8b7a2f680b --- /dev/null +++ b/solace/pom.xml @@ -0,0 +1,227 @@ + + + 4.0.0 + + + org.apache.apex + malhar + 3.8.0-SNAPSHOT + + + malhar-solace + Apache Apex Malhar Solace Support + jar + + + + 10.0.1 + true + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + test-jar + + package + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + createJavadocDirectory + generate-resources + + + + + + + + run + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + xml-doclet + generate-resources + + javadoc + + + com.github.markusbernhardt.xmldoclet.XmlDoclet + -d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml + false + + com.github.markusbernhardt + xml-doclet + 1.0.4 + + + + + + attach-sources + + jar + + + true + + + customTag1 + a + Custom Tag One: + + + customTag2 + a + Custom Tag two: + + + customTag3 + a + Custom Tag three: + + + + + + + + + org.codehaus.mojo + xml-maven-plugin + 1.0 + + + transform-xmljavadoc + generate-resources + + transform + + + + + + + ${project.build.directory}/generated-resources/xml-javadoc + + ${project.artifactId}-${project.version}-javadoc.xml + + XmlJavadocCommentsExtractor.xsl + ${project.build.directory}/generated-resources/xml-javadoc + + + + + + + maven-resources-plugin + 2.6 + + + copy-resources + process-resources + + copy-resources + + + ${basedir}/target/classes + + + ${project.build.directory}/generated-resources/xml-javadoc + + ${project.artifactId}-${project.version}-javadoc.xml + + true + + + + + + + + maven-surefire-plugin + + -Xmx2048m + + + + + + + + com.solacesystems + sol-jms + 10.0.1 + + + com.solacesystems + sol-jcsmp + 10.0.1 + + + com.solacesystems + sol-common + 10.0.1 + + + ${project.groupId} + malhar-library + ${project.version} + + + ${project.groupId} + malhar-library + ${project.version} + test + tests + + + ${project.groupId} + apex-common + ${apex.core.version} + jar + + + diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java new file mode 100644 index 0000000000..e85c4e9665 --- /dev/null +++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.solace; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.Consumer; +import com.solacesystems.jcsmp.JCSMPChannelProperties; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.solacesystems.jcsmp.JCSMPReconnectEventHandler; +import com.solacesystems.jcsmp.JCSMPSession; +import com.solacesystems.jcsmp.SessionEvent; +import com.solacesystems.jcsmp.SessionEventArgs; +import com.solacesystems.jcsmp.SessionEventHandler; +import com.solacesystems.jcsmp.XMLMessageListener; +import com.datatorrent.api.Context; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.netlet.util.DTThrowable; + +@SuppressWarnings("unused") +public abstract class AbstractSolaceBaseInputOperator extends BaseOperator implements + InputOperator, Operator.ActivationListener, Operator.CheckpointNotificationListener +{ + + private static final Logger logger = LoggerFactory.getLogger(AbstractSolaceBaseInputOperator.class); + + @NotNull + protected JCSMPProperties properties = new JCSMPProperties(); + protected int connectRetries = -1; + protected int reconnectRetries = -1; + @Min(1) + protected int unackedMessageLimit; + + protected FSOpsIdempotentStorageManager idempotentStorageManager = new FSOpsIdempotentStorageManager(); + + protected transient JCSMPFactory factory; + protected transient JCSMPSession session; + + //Consumer not being currently used and implemented by the specific operators + //protected transient Consumer consumer; + protected transient Consumer reliableConsumer; + + protected transient int operatorId; + protected transient long currentWindowId; + protected transient long lastCompletedWId; + + protected transient int emitCount; + + protected transient volatile boolean drFailover = false; + protected transient volatile boolean tcpDisconnected = false; + + //protected transient BlockingQueue unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove + //protected LinkedList inFlightMessageId = new LinkedList(); //keeps track of all in flight IDs since they are not necessarily sequential + // Messages are received asynchronously and collected in a queue, these are processed by the main operator thread and at that time fault tolerance + // and idempotency processing is done so this queue can remain transient + protected transient ArrayBlockingQueue arrivedTopicMessagesToProcess; + + //protected transient com.solace.dt.operator.DTSolaceOperatorInputOutput.ArrayBlockingQueue arrivedMessagesToProcess; + + private transient ReconnectCallbackHandler rcHandler; + + private transient MessageHandler cbHandler; + + protected transient int reconnectRetryMillis = 0; + + protected transient volatile Throwable throwable; + + protected static final int DEFAULT_BUFFER_SIZE = 500; + + @Override + public void setup(Context.OperatorContext context) + { + operatorId = context.getId(); + logger.debug("OperatorID: {}", operatorId); + + rcHandler = new ReconnectCallbackHandler(); + cbHandler = new MessageHandler(arrivedTopicMessagesToProcess); + + factory = JCSMPFactory.onlyInstance(); + + //Required for HA and DR to try forever if set to "-1" + JCSMPChannelProperties channelProperties = (JCSMPChannelProperties)this.properties.getProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES); + channelProperties.setConnectRetries(this.connectRetries); + channelProperties.setReconnectRetries(this.reconnectRetries); + + reconnectRetryMillis = channelProperties.getReconnectRetryWaitInMillis(); + + try { + session = factory.createSession(this.properties, null, new SessionHandler()); + } catch (JCSMPException e) { + DTThrowable.rethrow(e); + } + + //logger.debug("Properties Raw: \n{}", properties.toProperties()); + logger.debug("Properties:\n" + properties.toString()); + //logger.debug("\n===============================================\n"); + + idempotentStorageManager.setup(context); + lastCompletedWId = idempotentStorageManager.getLargestRecoveryWindow(); + //logger.debug("Largest Completed: " + lastCompletedWId); + } + + @Override + public void beforeCheckpoint(long l) + { + } + + @Override + public void checkpointed(long arg0) + { + } + + @Override + public void committed(long window) + { + try { + idempotentStorageManager.deleteUpTo(operatorId, window); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + + } + + protected T processMessage(BytesXMLMessage message) + { + T tuple = convert(message); + if (tuple != null) { + emitTuple(tuple); + } + return tuple; + } + + @Override + public void activate(Context.OperatorContext context) + { + try { + session.connect(); + reliableConsumer = session.getMessageConsumer(rcHandler, cbHandler); + //consumer = getConsumer(); + reliableConsumer.start(); + } catch (JCSMPException e) { + DTThrowable.rethrow(e); + } + } + + @Override + public void deactivate() + { + reliableConsumer.close(); + } + + @Override + public void teardown() + { + idempotentStorageManager.teardown(); + session.closeSession(); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + this.currentWindowId = windowId; + } + + protected abstract T convert(BytesXMLMessage message); + + protected abstract void emitTuple(T tuple); + + public void setProperties(JCSMPProperties properties) + { + this.properties = properties; + } + + public JCSMPProperties getProperties() + { + return properties; + } + + public IdempotentStorageManager getIdempotentStorageManager() + { + return idempotentStorageManager; + } + + public void setUnackedMessageLimit(int unackedMessageLimit) + { + this.unackedMessageLimit = unackedMessageLimit; + } + + public int getUnackedMessageLimit() + { + return unackedMessageLimit; + } + + public void setConnectRetries(int connectRetries) + { + this.connectRetries = connectRetries; + logger.debug("connectRetries: {}", this.connectRetries); + } + + public void setReconnectRetries(int reconnectRetries) + { + this.reconnectRetries = reconnectRetries; + logger.debug("reconnectRetries: {}", this.reconnectRetries); + } + + public void setReapplySubscriptions(boolean state) + { + this.properties.setBooleanProperty(JCSMPProperties.REAPPLY_SUBSCRIPTIONS, state); + } + + private class ReconnectCallbackHandler implements JCSMPReconnectEventHandler + { + @Override + public void postReconnect() throws JCSMPException + { + + logger.info("Solace client now Reconnected -- possibe Solace HA or DR fail-over"); + tcpDisconnected = false; + + } + + @Override + public boolean preReconnect() throws JCSMPException + { + drFailover = false; + logger.info("Solace client now in Pre Reconnect state -- possibe Solace HA or DR fail-over"); + tcpDisconnected = true; + return true; + } + } + + private class SessionHandler implements SessionEventHandler + { + public void handleEvent(SessionEventArgs event) + { + logger.info("Received Session Event %s with info %s\n", event.getEvent(), event.getInfo()); + + // Received event possibly due to DR fail-ver complete + if (event.getEvent() == SessionEvent.VIRTUAL_ROUTER_NAME_CHANGED) { + drFailover = true; // may or may not need recovery + tcpDisconnected = false; + } + } + } + + protected class MessageHandler implements XMLMessageListener + { + + BlockingQueue messageQueue; + + MessageHandler(BlockingQueue messageQueue) + { + this.messageQueue = messageQueue; + } + + @Override + public void onException(JCSMPException e) + { + throwable = e; + DTThrowable.rethrow(e); + } + + @Override + public void onReceive(BytesXMLMessage msg) + { + try { + messageQueue.put(msg); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } + } + + } + +} diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java new file mode 100644 index 0000000000..4cea78b47f --- /dev/null +++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.solace; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.Topic; +import com.datatorrent.api.Context; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.netlet.util.DTThrowable; + +public abstract class AbstractSolaceDirectInputOperator extends AbstractSolaceBaseInputOperator implements InputOperator, Operator.ActivationListener +{ + + private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceDirectInputOperator.class); + + @NotNull + protected String topicName; + private transient Topic topic; + //private transient BytesXMLMessage recentMessage = null; + + //private transient List messages = new ArrayList(); + protected final transient Map currentWindowRecoveryState = Maps.newLinkedHashMap(); + + //private final transient AtomicReference throwable; + + @Override + public void setup(Context.OperatorContext context) + { + arrivedTopicMessagesToProcess = new ArrayBlockingQueue(this.unackedMessageLimit); + + //setup info for HA and DR at the transport level + super.setReapplySubscriptions(true); + + super.setup(context); + } + + public AbstractSolaceDirectInputOperator() + { + //throwable = new AtomicReference(); + } + + @Override + public void activate(Context.OperatorContext context) + { + super.activate(context); + topic = factory.createTopic(topicName); + addSubscription(topic); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + if (windowId <= lastCompletedWId) { + handleRecovery(currentWindowId); + } + } + + @Override + public void endWindow() + { + super.endWindow(); + try { + if (currentWindowId > lastCompletedWId) { + idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + } + currentWindowRecoveryState.clear(); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + emitCount = 0; //reset emit count + } + + @Override + public void emitTuples() + { + if (throwable != null) { + DTThrowable.rethrow(throwable); + } + + if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + return; + } + + BytesXMLMessage message; + try { + while (emitCount < DEFAULT_BUFFER_SIZE && ((message = (BytesXMLMessage)super.arrivedTopicMessagesToProcess.poll(10, TimeUnit.MILLISECONDS)) != null)) { + processMessage(message); + emitCount++; + } + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } + //emitCount++; + } + + protected void addSubscription(Topic topic) + { + try { + session.addSubscription(topic); + } catch (JCSMPException e) { + DTThrowable.rethrow(e); + } + } + + protected void removeSubscription(Topic topic) + { + try { + session.removeSubscription(topic); + } catch (JCSMPException e) { + DTThrowable.rethrow(e); + } + } + + /* + @Override + protected T processMessage(BytesXMLMessage message) + { + T tuple = super.processMessage(message); + if(tuple != null) { + messages.add(tuple); + return tuple; + } + + return null; + } + */ + + @Override + protected T processMessage(BytesXMLMessage message) + { + T payload = super.processMessage(message); + if (payload != null) { + currentWindowRecoveryState.put(message.getMessageIdLong(), payload); + } + //recentMessage = message; + return payload; + } + + @SuppressWarnings("unchecked") + protected void handleRecovery(long windowId) + { + LOG.info("Handle Recovery called {}", windowId); + + Map recoveredData; + try { + recoveredData = (Map)idempotentStorageManager.load(operatorId, windowId); + + if (recoveredData == null) { + return; + } + for (Map.Entry recoveredEntry : recoveredData.entrySet()) { + emitTuple(recoveredEntry.getValue()); + } + + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + + public String getTopicName() + { + return topicName; + } + + public void setTopicName(String topicName) + { + this.topicName = topicName; + } + +} diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java new file mode 100644 index 0000000000..df0817d2b0 --- /dev/null +++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java @@ -0,0 +1,635 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.solace; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.NotNull; +import com.google.common.collect.Maps; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.Consumer; +import com.solacesystems.jcsmp.ConsumerFlowProperties; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Endpoint; +import com.solacesystems.jcsmp.FlowReceiver; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.netlet.util.DTThrowable; + +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractSolaceGuaranteedIdempotentInputOperator extends AbstractSolaceBaseInputOperator implements InputOperator, Operator.ActivationListener +{ + private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceGuaranteedIdempotentInputOperator.class); + + @NotNull + protected String endpointName; + + private transient Endpoint endpoint; + protected transient Consumer consumer; + @NotNull + public transient MessageHandler flowHandler; + /* + @NotNull + protected transient EndpointProperties endpointProperties = new EndpointProperties(); + */ + private transient BytesXMLMessage recentMessage = null; + + //private transient long[] operatorRecoveredWindows; + protected transient long currentWindowId; + protected transient ArrayBlockingQueue arrivedMessagesToProcess; + protected transient ArrayBlockingQueue unackedMessages; + + protected transient TreeMap lastMessages = new TreeMap(); + @SuppressWarnings("unused") + private transient long windowTime; + protected final transient Map currentWindowRecoveryState; + protected transient LinkedList inFlightRecoveryMessages = new LinkedList(); // used by partition that restarts for duplicate detection + protected transient LinkedList inFlightRecoveryMessagesPartition = new LinkedList(); // used by partitions that didn't restart for duplicate detection + protected transient LinkedList inFlightRecoveryMessagesDR = new LinkedList(); // used by partition that detect DR fail over for duplicate detection + + //private transient Context.OperatorContext context; + private transient int partitionCount = 0; + + protected transient int drCounterSize = 2 * DEFAULT_BUFFER_SIZE; + protected transient int drCounter = 0; + + protected transient volatile boolean doneDups = false; + protected transient volatile boolean doneDupsPartitioned = true; + protected transient volatile boolean donePartitionCheck = false; + protected transient volatile boolean doneDupsDR = true; + + + public AbstractSolaceGuaranteedIdempotentInputOperator() + { + currentWindowRecoveryState = Maps.newLinkedHashMap(); + } + + + protected boolean messageConsumed(BytesXMLMessage message) throws JCSMPException + { + if (message.getRedelivered()) { + return false; + } + return true; + } + + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + flowHandler = new MessageHandler(arrivedMessagesToProcess); + + calculatePartitionCount(context); + + LOG.info("Initial Partition Count: {}", partitionCount); + + inFlightRecoveryMessages.clear(); + inFlightRecoveryMessagesPartition.clear(); + inFlightRecoveryMessagesDR.clear(); + + + //super.setUnackedMessageLimit(this.unackedMessageLimit); + //setup info for HA and DR at the transport level + //super.setConnectRetries(this.connectRetries); + //super.setReconnectRetries(this.reconnectRetries); + + LOG.info("Operator ID = " + context.getId()); + + windowTime = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + + arrivedMessagesToProcess = new ArrayBlockingQueue(unackedMessageLimit); + + unackedMessages = new ArrayBlockingQueue(unackedMessageLimit * (context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT)) * 2); + + endpoint = factory.createQueue(this.endpointName); + + /* + if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + super.startConsumer(); + } + */ + + /* + try { + operatorRecoveredWindows = idempotentStorageManager.getWindowIds(context.getId()); + if (operatorRecoveredWindows != null) { + Arrays.sort(operatorRecoveredWindows); + } + } catch (IOException e) { + DTThrowable.rethrow(e); + } + */ + } + + @Override + public void activate(OperatorContext context) + { + super.activate(context); + } + + @Override + public void deactivate() + { + if (consumer != null) { + consumer.stop(); + consumer.close(); + } + super.deactivate(); + } + + @Override + protected T processMessage(BytesXMLMessage message) + { + T payload = super.processMessage(message); + if (payload != null) { + currentWindowRecoveryState.put(message.getMessageIdLong(), payload); + } + recentMessage = message; + return payload; + } + + @Override + public void beginWindow(long windowId) + { + //LOG.debug("Largest Recovery Wndow is : {} for current window: {}", idempotentStorageManager.getLargestRecoveryWindow(), windowId); + super.beginWindow(windowId); + if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + //LOG.debug("About to handle recovery, current windowID is: {} largested recovered ID is: {}" + currentWindowId, idempotentStorageManager.getLargestRecoveryWindow()); + handleRecovery(windowId); + } else { + // Don't start consumer in case of a failure recovery till the previous messages are replayed + if (consumer == null) { + try { + consumer = getConsumer(); + } catch (JCSMPException e) { + DTThrowable.rethrow(e); + } + LOG.debug("Started Flow Consumer after recovery is complete"); + } + } + } + + + @SuppressWarnings("unchecked") + protected void handleRecovery(long windowId) + { + LOG.info("Handle Recovery called"); + + Map recoveredData; + try { + recoveredData = (Map)idempotentStorageManager.load(operatorId, windowId); + + if (recoveredData == null) { + return; + } + for (Map.Entry recoveredEntry : recoveredData.entrySet()) { + emitTuple(recoveredEntry.getValue()); + inFlightRecoveryMessages.add(recoveredEntry.getValue()); + } + + } catch (IOException e) { + DTThrowable.rethrow(e); + } + + } + + + @Override + public void endWindow() + { + @SuppressWarnings("unused") + boolean stateSaved = false; + boolean ackCompleted = false; + int messagesToAck = unackedMessages.size(); + + if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + + + if (recentMessage != null) { + lastMessages.put(currentWindowId, recentMessage); + } + + try { + + if (recentMessage != null) { + idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + stateSaved = true; + LOG.debug("Saved for window: " + currentWindowId); + currentWindowRecoveryState.clear(); + LOG.debug("acking messages"); + ackCompleted = ackMessages(); + LOG.debug("Acked status: " + ackCompleted + " on window " + currentWindowId + " ack count: : " + messagesToAck); + } + } catch (Throwable t) { + if (!ackCompleted) { + LOG.info("confirm recovery of {} for {} does not exist", operatorId, currentWindowId, t); + } + DTThrowable.rethrow(t); + } + + + emitCount = 0; //reset emit count + } else { + currentWindowRecoveryState.clear(); + ackCompleted = ackMessages(); + LOG.debug("acking messages completed successfully: " + ackCompleted); + } + } + + + @Override + public void emitTuples() + { + if (throwable != null) { + DTThrowable.rethrow(throwable); + } + + if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + return; + } + //If in HA or DR fail-over, block until Solace TCP connection is reestablished so recovery windows are not lost as empty windows + int sleepCounter = 0; + while (tcpDisconnected) { + sleepCounter++; + try { + Thread.sleep(super.reconnectRetryMillis); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } + if (sleepCounter % 10 == 0) { + LOG.info("Sleeping for reconnectRetryMillis waiting for TCP reconnect for Solace with milliseconds sleep per cycle = {}", super.reconnectRetryMillis); + } + LOG.info("Queued messages to process: {}", arrivedMessagesToProcess.size()); + } + + + BytesXMLMessage message; + + try { + // process messages, window is defined by emitCount or timeout of waiting for messages for 10 milliseocnds + while (emitCount < DEFAULT_BUFFER_SIZE && ((message = (BytesXMLMessage)arrivedMessagesToProcess.poll(10, TimeUnit.MILLISECONDS)) != null)) { + + + boolean goodToGo = true; + if (message != null) { + + //LOG.debug("To be processed: {} sequence number: {} Received message with ID: {} and AppID: {} redelivered: {}", arrivedMessagesToProcess.size(), message.getSequenceNumber(), message.getMessageIdLong(), message.getApplicationMessageId(), message.getRedelivered()); + //LOG.debug(" AppID = {} redelivered: {}", message.getApplicationMessageId(), message.getRedelivered()); + + + // Checking for duplicates after recovery from operator restart looking for re-delivered messages in restarted operator + if (message.getRedelivered() && inFlightRecoveryMessages.size() > 0) { + T payload = convert(message); + if (inFlightRecoveryMessages.contains(payload)) { + LOG.info("Redelivered Message Duplicate possibly due to input operator restart"); + goodToGo = false; + if (message.getDeliveryMode() == DeliveryMode.PERSISTENT || message.getDeliveryMode() == DeliveryMode.NON_PERSISTENT) { + unackedMessages.add(message); + } + + recentMessage = message; + + } + + + } else { + doneDups = true; + } + + /* + if(message.getRedelivered() ) { + LOG.debug("In FLight Size: {} Current Part Count: {} Dups: {} and {}", inFlightRecoveryMessages.size(), partitionCount, doneDupsPartitioned, donePartitionCheck); + } + */ + + //Operator was not restarted, re-delivered messages are a result of another partitioned operator restart + if (message.getRedelivered() && inFlightRecoveryMessages.size() == 0 && partitionCount > 1 && doneDupsPartitioned && donePartitionCheck == false) { + try { + doneDupsPartitioned = loadPartitionReplayCheck(); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + donePartitionCheck = true; + } + + if (message.getRedelivered() && doneDupsPartitioned == false && inFlightRecoveryMessagesPartition.size() >= 0) { + T payload = convert(message); + if (inFlightRecoveryMessagesPartition.contains(payload)) { + LOG.info("Redelivered Message Duplicate possibly due to input operator restart in another partition"); + goodToGo = false; + if (message.getDeliveryMode() == DeliveryMode.PERSISTENT || message.getDeliveryMode() == DeliveryMode.NON_PERSISTENT) { + unackedMessages.add(message); + } + + recentMessage = message; + + } + } else { + doneDupsPartitioned = true; + donePartitionCheck = false; //Get ready in case another partition restarts and results in replayed messages + } + + + // Checking for duplicates after recovery from DR looking for redelivered messages + if (drFailover && !(message.getRedelivered()) && doneDupsDR && donePartitionCheck == false) { + try { + doneDupsDR = loadPartitionReplayCheck(); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + donePartitionCheck = true; + drCounterSize = drCounterSize + arrivedMessagesToProcess.size(); + } + + + if (inFlightRecoveryMessagesDR.size() == 0 && drFailover) { + drFailover = false; + doneDupsDR = true; + donePartitionCheck = false; + inFlightRecoveryMessagesDR.clear(); + LOG.info("Cleared in flight recovery messages, no more possible duplicate messages detected after DR fail over"); + } + + if (!(message.getRedelivered()) && doneDupsDR == false && inFlightRecoveryMessagesDR.size() > 0 && drFailover && drCounter < drCounterSize) { + drCounter++; + T payload = convert(message); + if (inFlightRecoveryMessagesDR.contains(payload)) { + LOG.info("Message Duplicate detected after Solace DR fail over"); + goodToGo = false; + if (message.getDeliveryMode() == DeliveryMode.PERSISTENT || message.getDeliveryMode() == DeliveryMode.NON_PERSISTENT) { + unackedMessages.add(message); + } + + recentMessage = message; + + } + //Reset DR processing for duplicates after 2 windows worth of message checks + } else if (drCounter == drCounterSize) { + //Once there are no more duplicates detected there will be no more duplicates due to DR fail over + doneDupsDR = true; + donePartitionCheck = false; + inFlightRecoveryMessagesDR.clear(); + drFailover = false; + drCounter = 0; + drCounterSize = 2 * DEFAULT_BUFFER_SIZE; + LOG.info("Cleared in flight recovery messages, no more possible duplicate messages detected after DR fail over"); + } + + + if (goodToGo) { + //if the redelivery flag is no no longer on the messages we can dispose of the inFLightRecoveryMessages + if (message.getRedelivered() == false && inFlightRecoveryMessages.size() > 0 && doneDups) { + inFlightRecoveryMessages.clear(); + LOG.info("Cleared in flight recovery messages, no more redelivered or DR recovery messages"); + doneDups = false; + } + if (message.getRedelivered() == false && inFlightRecoveryMessagesPartition.size() > 0 && doneDupsPartitioned) { + inFlightRecoveryMessagesPartition.clear(); + LOG.info("Cleared in flight recovery messages, no more redelivered messages"); + doneDupsPartitioned = false; + } + + processMessage(message); + if (message.getDeliveryMode() == DeliveryMode.PERSISTENT || message.getDeliveryMode() == DeliveryMode.NON_PERSISTENT) { + unackedMessages.add(message); + } + emitCount++; + } + } + } + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } + } + + @Override + public void committed(long window) + { + if (recentMessage == null) { + return; + } + + Set windows = lastMessages.keySet(); + Iterator iterator = windows.iterator(); + while (iterator.hasNext()) { + if (iterator.next() <= window) { + iterator.remove(); + } else { + break; + } + } + + super.committed(window); + } + + private boolean ackMessages() + { + boolean processedOK = false; + + + BytesXMLMessage messageToAckUpTo = lastMessages.get(currentWindowId); + + if (messageToAckUpTo != null) { + if (unackedMessages.size() > 0) { + while (unackedMessages.peek() != messageToAckUpTo) { + try { + BytesXMLMessage taken = unackedMessages.take(); + //LOG.debug("Acking: {}", taken.getApplicationMessageId()); + taken.ackMessage(); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } + } + if (unackedMessages.peek() == messageToAckUpTo) { + try { + BytesXMLMessage taken = unackedMessages.take(); + taken.ackMessage(); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } + } + processedOK = true; + } else { + LOG.debug("Unacked Array is size zero"); + + } + } else { + LOG.info("messageToAckUpTo is null -- possibly due to being in recovery stage"); + } + + + return processedOK; + } + + public void setEndpointName(String endpointName) + { + this.endpointName = endpointName; + LOG.info("enpointName: {}", this.endpointName); + } + + public void calculatePartitionCount(OperatorContext context) + { + Partitioner partitioner = context.getValue(OperatorContext.PARTITIONER); + //If only one partition the attribute is null + if ((partitioner != null) && (partitioner instanceof StatelessPartitioner)) { + partitionCount = ((StatelessPartitioner)partitioner).getPartitionCount(); + } else { + partitionCount = 1; + } + LOG.debug("Current Partition Count: " + partitionCount); + } + + @SuppressWarnings("unchecked") + public boolean loadPartitionReplayCheck() throws IOException + { + if (!(drFailover)) { + LOG.info("Received redelivered message from Solace, another parition must have restarted"); + } else { + LOG.info("Received DR fail over event from Solace, the partiions are now talking to another Solace Router"); + } + FSOpsIdempotentStorageManager snapshotState = idempotentStorageManager.getIdempotentStateSnapshot(); + boolean _doneDupsPartitioned = true; + LOG.info("Largest recovery window: {}", snapshotState.getLargestRecoveryWindow()); + + LOG.info("Recovery Path: {}", snapshotState.getRecoveryPath()); + + + Set opIds = snapshotState.getOperatorIds(); + LOG.info("Received {} operatorIDs, with values:", opIds.size()); + + int[] arrOpIds = new int[opIds.size()]; + int index = 0; + for (Integer i : opIds) { + arrOpIds[index++] = i; + } + for (int x = 0; x < arrOpIds.length; x++) { + LOG.info(Integer.toString(arrOpIds[x])); + } + + + for (int i = 0; i < arrOpIds.length; i++) { + + + long[] wins = snapshotState.getOrderedWindowIds(arrOpIds[i]); + try { + //Get last two recovery windows + + LOG.info("Window to recover: {} for partition: {}", ((long)wins[wins.length - 1]), arrOpIds[i]); + Map recoveredData = (Map)snapshotState.load(arrOpIds[i], ((long)wins[wins.length - 1])); + + if (recoveredData == null) { + LOG.info("Recovered data is null for window: {}", ((long)wins[wins.length - 1])); + + } else { + for (Map.Entry recoveredEntry : recoveredData.entrySet()) { + if (!(drFailover)) { + inFlightRecoveryMessagesPartition.add(recoveredEntry.getValue()); + } else { + inFlightRecoveryMessagesDR.add(recoveredEntry.getValue()); + } + } + + + LOG.info("Recovered data is {} messages for window: {}" + recoveredData.size(), ((long)wins[wins.length - 1])); + } + + LOG.info("Window to recover: {} for partition: {}", ((long)wins[wins.length - 2]), arrOpIds[i]); + recoveredData = (Map)snapshotState.load(arrOpIds[i], ((long)wins[wins.length - 2])); + + if (recoveredData == null) { + LOG.info("Recovered data is null for window: {}", ((long)wins[wins.length - 2])); + //continue; + } else { + for (Map.Entry recoveredEntry : recoveredData.entrySet()) { + if (!(drFailover)) { + inFlightRecoveryMessagesPartition.add(recoveredEntry.getValue()); + } else { + inFlightRecoveryMessagesDR.add(recoveredEntry.getValue()); + } + } + LOG.info("Recovered data is {} messages for window: {}", recoveredData.size(), ((long)wins[wins.length - 2])); + } + _doneDupsPartitioned = false; + + } catch (IOException e) { + DTThrowable.rethrow(e); + } + + + LOG.info("Added parition data from partition: {}", arrOpIds[i]); + } + + if (!(drFailover)) { + LOG.info("Total Recovery Partition Data Records: {} ", inFlightRecoveryMessagesPartition.size()); + } else { + LOG.info("Total Recovery DR fail over Data Records: {}", inFlightRecoveryMessagesDR.size()); + } + snapshotState.teardown(); + return _doneDupsPartitioned; + } + + + // Start the actual consumption of Solace messages from the Queue + protected Consumer getConsumer() throws JCSMPException + { + ConsumerFlowProperties consumerFlowProperties = new ConsumerFlowProperties(); + + consumerFlowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT); + consumerFlowProperties.setEndpoint(this.endpoint); + + FlowReceiver f_receiver = session.createFlow(flowHandler, consumerFlowProperties); + f_receiver.start(); + LOG.info("Flow started on queue: {}", f_receiver.getDestination()); + return f_receiver; + } + + //public void setIdempotentStorageManager(IdempotentStorageManager storageManager) + public void setIdempotentStorageManager(FSOpsIdempotentStorageManager storageManager) + { + this.idempotentStorageManager = storageManager; + } + + + public IdempotentStorageManager getIdempotentStorageManager() + { + return this.idempotentStorageManager; + } + +} diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/FSOpsIdempotentStorageManager.java b/solace/src/main/java/org/apache/apex/malhar/solace/FSOpsIdempotentStorageManager.java new file mode 100644 index 0000000000..0143a51431 --- /dev/null +++ b/solace/src/main/java/org/apache/apex/malhar/solace/FSOpsIdempotentStorageManager.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.solace; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; + +import com.google.common.collect.Sets; +import com.datatorrent.api.Context; +import com.datatorrent.lib.io.IdempotentStorageManager; + +/** + * + */ +public class FSOpsIdempotentStorageManager extends IdempotentStorageManager.FSIdempotentStorageManager +{ + + protected transient Context.OperatorContext context; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + this.context = context; + } + + public FSOpsIdempotentStorageManager getIdempotentStateSnapshot() + { + FSOpsIdempotentStorageManager snapshot = new FSOpsIdempotentStorageManager(); + snapshot.setup(context); + return snapshot; + } + + public Set getOperatorIds() throws IOException + { + Set ids = Sets.newLinkedHashSet(); + FileStatus[] fileStatuses = fs.listStatus(appPath); + for (FileStatus fileStatus : fileStatuses) { + ids.add(Integer.parseInt(fileStatus.getPath().getName())); + } + return ids; + } + + public long[] getOrderedWindowIds(int operatorId) throws IOException + { + long[] windowIds = getWindowIds(operatorId); + Arrays.sort(windowIds); + return windowIds; + } + +} diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java new file mode 100644 index 0000000000..6e55cdb693 --- /dev/null +++ b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.solace; + +import com.solacesystems.jcsmp.BytesMessage; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.TextMessage; +import com.datatorrent.api.DefaultOutputPort; + + +//public class SolaceGuaranteedTextStrInputOperator extends AbstractSolaceGuaranteedInputOperator +public class SolaceGuaranteedTextStrInputOperator extends AbstractSolaceGuaranteedIdempotentInputOperator +{ + public final transient DefaultOutputPort output = new DefaultOutputPort(); + + @Override + protected String convert(BytesXMLMessage message) + { + String out = null; + if (message instanceof TextMessage) { + out = ((TextMessage)message).getText(); + } else if (message instanceof BytesMessage) { + + out = new String(((BytesMessage)message).getData()); + } + + return out; + } + + @Override + protected void emitTuple(String tuple) + { + output.emit(tuple); + } + +} diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java new file mode 100644 index 0000000000..c133448579 --- /dev/null +++ b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.solace; + +import com.solacesystems.jcsmp.BytesMessage; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.TextMessage; +import com.datatorrent.api.DefaultOutputPort; + +public class SolaceReliableTextStrInputOperator extends AbstractSolaceDirectInputOperator +{ + + public final transient DefaultOutputPort output = new DefaultOutputPort(); + + @Override + protected String convert(BytesXMLMessage message) + { + String out = null; + if (message instanceof TextMessage) { + out = ((TextMessage)message).getText(); + } else if (message instanceof BytesMessage) { + + out = new String(((BytesMessage)message).getData()); + } + + return out; + } + + @Override + protected void emitTuple(String tuple) + { + output.emit(tuple); + } + +} diff --git a/solace/src/test/java/org/apache/apex/malhar/solace/ApplicationTest.java b/solace/src/test/java/org/apache/apex/malhar/solace/ApplicationTest.java new file mode 100644 index 0000000000..8cea81dcf3 --- /dev/null +++ b/solace/src/test/java/org/apache/apex/malhar/solace/ApplicationTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.solace; + +import java.io.IOException; +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest +{ + + @Test + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/SolaceTestConf.xml")); + lma.prepareDAG(new SolTestApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(300000); // runs for 5 minutes and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} diff --git a/solace/src/test/java/org/apache/apex/malhar/solace/SolTestApplication.java b/solace/src/test/java/org/apache/apex/malhar/solace/SolTestApplication.java new file mode 100644 index 0000000000..a0cfe81deb --- /dev/null +++ b/solace/src/test/java/org/apache/apex/malhar/solace/SolTestApplication.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.solace; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + +import com.solacesystems.jcsmp.JCSMPProperties; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.stream.Counter; + +//import com.datatorrent.api.Context.PortContext; +//import com.datatorrent.lib.util.BaseKeyValueOperator.DefaultPartitionCodec; +//import org.apache.hadoop.fs.Path; + +/** + * + */ +@ApplicationAnnotation(name = "SolTestApp") +public class SolTestApplication implements StreamingApplication +{ + private static final Logger logger = LoggerFactory.getLogger(SolTestApplication.class); + private static final String DT_SOLACE_PROP_IDENTIFIER = "dt.solace"; + private static final String DT_SOLACE_PROP_SEARCH = DT_SOLACE_PROP_IDENTIFIER + "\\..*"; + private static final int DT_SOLACE_PROP_INDEX = DT_SOLACE_PROP_IDENTIFIER.length() + 1; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Map props = conf.getValByRegex(DT_SOLACE_PROP_SEARCH); + logger.info("{}", props); + JCSMPProperties properties = new JCSMPProperties(); + + for (Map.Entry entry : props.entrySet()) { + + properties.setProperty(entry.getKey().substring(DT_SOLACE_PROP_INDEX), entry.getValue()); + logger.info("Property: {} Value: {}", entry.getKey().substring(DT_SOLACE_PROP_INDEX), entry.getValue()); + } + + //Used for sticky partitioning + //@SuppressWarnings("unused") + //DefaultPartitionCodec codec = new DefaultPartitionCodec(); + + dag.setAttribute(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 20); + dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 20); + + /*====ECS processing====*/ + //Test for Guaranteed Solace consumers + //SolaceGuaranteedTextStrInputOperator ecsInput = dag.addOperator("SolaceEcsInput", SolaceGuaranteedTextStrInputOperator.class); + SolaceReliableTextStrInputOperator ecsInput = dag.addOperator("SolaceEcsInput", SolaceReliableTextStrInputOperator.class); + //Test for Reliable Solace consumers + ecsInput.setProperties(properties); + dag.setAttribute(ecsInput, Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 600); + + + Counter counter = dag.addOperator("Counter", new Counter()); + + + ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator()); + + + dag.addStream("parseEcsXml", ecsInput.output, counter.input); + dag.addStream("console", counter.output, console.input); + + } + +} diff --git a/solace/src/test/resources/SolaceTestConf.xml b/solace/src/test/resources/SolaceTestConf.xml new file mode 100644 index 0000000000..46d62645cf --- /dev/null +++ b/solace/src/test/resources/SolaceTestConf.xml @@ -0,0 +1,77 @@ + + + + + dt.attr.MASTER_MEMORY_MB + 1024 + + + + dt.operator.SolaceEcsInput.prop.unackedMessageLimit + 100000 + + + + + dt.solace.host + host1:55555,host2:55555 + + + + dt.solace.username + username + + + dt.solace.password + password + + + dt.solace.vpn_name + vpn-name + + + dt.operator.SolaceEcsInput.prop.topicName + topic-name + + + + + + dt.operator.SolaceEcsInput.prop.connectRetries + -1 + + + dt.operator.SolaceEcsInput.prop.reconnectRetries + -1 + + + + dt.operator.SolaceEcsInput.attr.PARTITIONER + com.datatorrent.common.partitioner.StatelessPartitioner:4 + + + diff --git a/solace/src/test/resources/log4j.properties b/solace/src/test/resources/log4j.properties new file mode 100644 index 0000000000..6bb22c45b4 --- /dev/null +++ b/solace/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=WARN +test.log.console.threshold=WARN + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +log4j.logger.org.apache.apex=INFO + +log4j.logger.com.solacesystems.jcsmp=WARN