diff --git a/pom.xml b/pom.xml index 56168f61ef..5d473893e9 100644 --- a/pom.xml +++ b/pom.xml @@ -214,6 +214,7 @@ contrib kafka examples + solace diff --git a/solace/pom.xml b/solace/pom.xml index 7d6d10806f..189daabfbc 100755 --- a/solace/pom.xml +++ b/solace/pom.xml @@ -25,7 +25,7 @@ org.apache.apex malhar - 3.7.0-SNAPSHOT + 3.8.0-SNAPSHOT malhar-solace diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java index 13a7b0dcb6..ce2616084a 100644 --- a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java +++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceBaseInputOperator.java @@ -19,9 +19,7 @@ package org.apache.apex.malhar.solace; import java.io.IOException; -import java.util.LinkedList; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import javax.validation.constraints.NotNull; import org.slf4j.Logger; @@ -42,26 +40,23 @@ import com.datatorrent.api.Context; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.CheckpointListener; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.netlet.util.DTThrowable; @SuppressWarnings("unused") public abstract class AbstractSolaceBaseInputOperator extends BaseOperator implements - InputOperator, Operator.ActivationListener, CheckpointListener + InputOperator, Operator.ActivationListener, Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger(AbstractSolaceBaseInputOperator.class); @NotNull protected JCSMPProperties properties = new JCSMPProperties(); - protected String connectRetries; - protected String reconnectRetries; - protected String unackedMessageLimit; + protected int connectRetries; + protected int reconnectRetries; + protected int unackedMessageLimit; - - //protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); protected FSOpsIdempotentStorageManager idempotentStorageManager = new FSOpsIdempotentStorageManager(); protected transient JCSMPFactory factory; @@ -71,16 +66,18 @@ public abstract class AbstractSolaceBaseInputOperator extends BaseOperator im protected transient Consumer reliableConsumer; protected transient int operatorId; - protected long windowId; + protected transient long currentWindowId; protected transient long lastCompletedWId; protected transient int emitCount; - protected transient volatile boolean DRFailover = false; - protected transient volatile boolean TCPDisconnected = false; + protected transient volatile boolean drFailover = false; + protected transient volatile boolean tcpDisconnected = false; - protected transient BlockingQueue unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove - protected LinkedList inFlightMessageId = new LinkedList(); //keeps track of all in flight IDs since they are not necessarily sequential + //protected transient BlockingQueue unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove + //protected LinkedList inFlightMessageId = new LinkedList(); //keeps track of all in flight IDs since they are not necessarily sequential + // Messages are received asynchronously and collected in a queue, these are processed by the main operator thread and at that time fault tolerance + // and idempotency processing is done so this queue can remain transient protected transient ArrayBlockingQueue arrivedTopicMessagesToProcess; //protected transient com.solace.dt.operator.DTSolaceOperatorInputOutput.ArrayBlockingQueue arrivedMessagesToProcess; @@ -89,23 +86,22 @@ public abstract class AbstractSolaceBaseInputOperator extends BaseOperator im private transient CallbackMessageHandler cbHandler = new CallbackMessageHandler(); - int spinMillis; + protected transient int spinMillis; protected transient int reconnectRetryMillis = 0; @Override public void setup(Context.OperatorContext context) { - operatorId = context.getId(); - logger.info("OperatorID from Base class: {}", operatorId); + logger.debug("OperatorID: {}", operatorId); spinMillis = context.getValue(com.datatorrent.api.Context.OperatorContext.SPIN_MILLIS); factory = JCSMPFactory.onlyInstance(); //Required for HA and DR to try forever if set to "-1" JCSMPChannelProperties channelProperties = (JCSMPChannelProperties)this.properties.getProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES); - channelProperties.setConnectRetries(Integer.parseInt(this.connectRetries)); - channelProperties.setReconnectRetries(Integer.parseInt(this.reconnectRetries)); + channelProperties.setConnectRetries(this.connectRetries); + channelProperties.setReconnectRetries(this.reconnectRetries); reconnectRetryMillis = channelProperties.getReconnectRetryWaitInMillis(); @@ -122,16 +118,17 @@ public void setup(Context.OperatorContext context) idempotentStorageManager.setup(context); lastCompletedWId = idempotentStorageManager.getLargestRecoveryWindow(); - //logger.debug("++++++++++++++++++++Largest Completed: " + lastCompletedWId); - + //logger.debug("Largest Completed: " + lastCompletedWId); + } + @Override + public void beforeCheckpoint(long l) + { } @Override public void checkpointed(long arg0) { - // TODO Auto-generated method stub - } @Override @@ -154,7 +151,6 @@ protected T processMessage(BytesXMLMessage message) return tuple; } - @Override public void activate(Context.OperatorContext context) { @@ -163,7 +159,6 @@ public void activate(Context.OperatorContext context) reliableConsumer = session.getMessageConsumer(rcHandler, cbHandler); //consumer = getConsumer(); reliableConsumer.start(); - } catch (JCSMPException e) { DTThrowable.rethrow(e); } @@ -173,9 +168,11 @@ public void activate(Context.OperatorContext context) public void deactivate() { try { - consumer.stop(); - clearConsumer(); - consumer.close(); + if (consumer != null) { + consumer.stop(); + clearConsumer(); + consumer.close(); + } reliableConsumer.close(); } catch (JCSMPException e) { DTThrowable.rethrow(e); @@ -185,7 +182,6 @@ public void deactivate() @Override public void teardown() { - idempotentStorageManager.teardown(); session.closeSession(); } @@ -194,10 +190,9 @@ public void teardown() public void beginWindow(long windowId) { super.beginWindow(windowId); - this.windowId = windowId; + this.currentWindowId = windowId; } - protected abstract T convert(BytesXMLMessage message); protected abstract void emitTuple(T tuple); @@ -209,35 +204,38 @@ public void beginWindow(long windowId) public void setProperties(JCSMPProperties properties) { this.properties = properties; - } + public JCSMPProperties getProperties() + { + return properties; + } public IdempotentStorageManager getIdempotentStorageManager() { return idempotentStorageManager; } - public void setUnackedMessageLimit(String unackedMessageLimit) + public void setUnackedMessageLimit(int unackedMessageLimit) { this.unackedMessageLimit = unackedMessageLimit; } - public String getUnackedMessageLimit() + public int getUnackedMessageLimit() { return unackedMessageLimit; } - public void setConnectRetries(String connectRetries) + public void setConnectRetries(int connectRetries) { this.connectRetries = connectRetries; - logger.info("+++++++++++++++++++reconnectRetries: {}", this.connectRetries); + logger.debug("connectRetries: {}", this.connectRetries); } - public void setReconnectRetries(String reconnectRetries) + public void setReconnectRetries(int reconnectRetries) { this.reconnectRetries = reconnectRetries; - logger.info("+++++++++++++++++++reconnectRetries: {}", this.reconnectRetries); + logger.debug("reconnectRetries: {}", this.reconnectRetries); } public void setReapplySubscriptions(boolean state) @@ -245,7 +243,7 @@ public void setReapplySubscriptions(boolean state) this.properties.setBooleanProperty(JCSMPProperties.REAPPLY_SUBSCRIPTIONS, state); } - public void startConsumer() + protected void startConsumer() { try { consumer = getConsumer(); @@ -257,42 +255,36 @@ public void startConsumer() public class ReconnectCallbackHandler implements JCSMPReconnectEventHandler { - - @Override public void postReconnect() throws JCSMPException { - logger.info("++++++++++++++Solace client now Reconnected -- possibe Solace HA or DR fail-over +++++++++++++"); - TCPDisconnected = false; + logger.info("Solace client now Reconnected -- possibe Solace HA or DR fail-over"); + tcpDisconnected = false; } @Override public boolean preReconnect() throws JCSMPException { - DRFailover = false; - logger.info("++++++++++++++Solace client now in Pre Reconnect state -- possibe Solace HA or DR fail-over +++++++++++++"); - TCPDisconnected = true; + drFailover = false; + logger.info("Solace client now in Pre Reconnect state -- possibe Solace HA or DR fail-over"); + tcpDisconnected = true; return true; } - } public class PrintingSessionEventHandler implements SessionEventHandler { - - public void handleEvent(SessionEventArgs event) { logger.info("Received Session Event %s with info %s\n", event.getEvent(), event.getInfo()); // Received event possibly due to DR fail-ver complete if (event.getEvent() == SessionEvent.VIRTUAL_ROUTER_NAME_CHANGED) { - DRFailover = true; // may or may not need recovery - TCPDisconnected = false; + drFailover = true; // may or may not need recovery + tcpDisconnected = false; } - } } @@ -303,17 +295,18 @@ public class CallbackMessageHandler implements XMLMessageListener public void onException(JCSMPException e) { DTThrowable.rethrow(e); - } @Override public void onReceive(BytesXMLMessage msg) { - arrivedTopicMessagesToProcess.add(msg); - + try { + arrivedTopicMessagesToProcess.put(msg); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } } } - } diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java index a616218469..90361adf04 100644 --- a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java +++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceDirectInputOperator.java @@ -19,12 +19,9 @@ package org.apache.apex.malhar.solace; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.validation.constraints.NotNull; import org.slf4j.Logger; @@ -45,47 +42,45 @@ public abstract class AbstractSolaceDirectInputOperator extends AbstractSolac private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceDirectInputOperator.class); - private final transient AtomicReference throwable; - private transient Context.OperatorContext context; - protected transient long currentWindowId; - @NotNull protected String topicName; private transient Topic topic; - private transient BytesXMLMessage recentMessage = null; + //private transient BytesXMLMessage recentMessage = null; - private transient List messages = new ArrayList(); + //private transient List messages = new ArrayList(); protected final transient Map currentWindowRecoveryState; - protected static final transient int DEFAULT_BUFFER_SIZE = 500; + protected static final int DEFAULT_BUFFER_SIZE = 500; + //private final transient AtomicReference throwable; @Override public void setup(Context.OperatorContext context) { - arrivedTopicMessagesToProcess = new ArrayBlockingQueue(Integer.parseInt(this.unackedMessageLimit)); - - this.context = context; + arrivedTopicMessagesToProcess = new ArrayBlockingQueue(this.unackedMessageLimit); //setup info for HA and DR at the transport level - super.setConnectRetries(this.connectRetries); - super.setReconnectRetries(this.reconnectRetries); super.setReapplySubscriptions(true); super.setup(context); - topic = factory.createTopic(topicName); - addSubscription(topic); } public AbstractSolaceDirectInputOperator() { - throwable = new AtomicReference(); + //throwable = new AtomicReference(); currentWindowRecoveryState = Maps.newLinkedHashMap(); } + @Override + public void activate(Context.OperatorContext context) + { + super.activate(context); + topic = factory.createTopic(topicName); + addSubscription(topic); + } + @Override public void beginWindow(long windowId) { - currentWindowId = windowId; super.beginWindow(windowId); if (windowId <= lastCompletedWId) { handleRecovery(currentWindowId); @@ -97,8 +92,8 @@ public void endWindow() { super.endWindow(); try { - if (windowId > lastCompletedWId) { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, windowId); + if (currentWindowId > lastCompletedWId) { + idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } currentWindowRecoveryState.clear(); } catch (IOException e) { @@ -123,39 +118,19 @@ public void emitTuples() } catch (InterruptedException e) { DTThrowable.rethrow(e); } - emitCount++; - - } - - /* - @Override - protected T convert(BytesXMLMessage message) { - // TODO Auto-generated method stub - return null; - } - */ - - /* - @Override - protected void emitTuple(T tuple) { - // TODO Auto-generated method stub - + //emitCount++; } - */ - @Override protected Consumer getConsumer() throws JCSMPException { - // TODO Auto-generated method stub return null; } /* @Override - protected void clearConsumer() throws JCSMPException { - // TODO Auto-generated method stub - + protected void clearConsumer() throws JCSMPException + { } */ @@ -194,19 +169,18 @@ protected T processMessage(BytesXMLMessage message) @Override protected T processMessage(BytesXMLMessage message) { - T payload = super.processMessage(message); if (payload != null) { currentWindowRecoveryState.put(message.getMessageIdLong(), payload); } - recentMessage = message; + //recentMessage = message; return payload; } @SuppressWarnings("unchecked") protected void handleRecovery(long windowId) { - LOG.info("++++++++++++++++++ Handle Recovery called"); + LOG.info("Handle Recovery called"); Map recoveredData; try { @@ -222,7 +196,6 @@ protected void handleRecovery(long windowId) } catch (IOException e) { DTThrowable.rethrow(e); } - } public String getTopicName() diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java index 8f2f70f023..9a1a1c2fd8 100644 --- a/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java +++ b/solace/src/main/java/org/apache/apex/malhar/solace/AbstractSolaceGuaranteedIdempotentInputOperator.java @@ -66,11 +66,11 @@ public abstract class AbstractSolaceGuaranteedIdempotentInputOperator extends protected String endpointName; - @NotNull - protected transient EndpointType endpointType = EndpointType.QUEUE; + //@NotNull + //protected transient EndpointType endpointType = EndpointType.QUEUE; private transient Endpoint endpoint; @NotNull - public transient FlowCallbackMessageHandler FlowHandler = new FlowCallbackMessageHandler(); + public transient FlowCallbackMessageHandler flowHandler = new FlowCallbackMessageHandler(); @NotNull protected transient EndpointProperties endpointProperties = new EndpointProperties(); private transient BytesXMLMessage recentMessage = null; @@ -94,8 +94,8 @@ public abstract class AbstractSolaceGuaranteedIdempotentInputOperator extends private transient int partitionCount = 0; protected static final transient int DEFAULT_BUFFER_SIZE = 500; - protected static transient int DR_COUNTER_SIZE = 2 * DEFAULT_BUFFER_SIZE; - protected transient int DR_COUNTER = 0; + protected transient int drCounterSize = 2 * DEFAULT_BUFFER_SIZE; + protected transient int drCounter = 0; protected transient volatile boolean doneDups = false; protected transient volatile boolean doneDupsPartitioned = true; @@ -105,10 +105,8 @@ public abstract class AbstractSolaceGuaranteedIdempotentInputOperator extends public AbstractSolaceGuaranteedIdempotentInputOperator() { - throwable = new AtomicReference(); currentWindowRecoveryState = Maps.newLinkedHashMap(); - } @@ -135,8 +133,8 @@ public void setup(Context.OperatorContext context) //super.setUnackedMessageLimit(this.unackedMessageLimit); //setup info for HA and DR at the transport level - super.setConnectRetries(this.connectRetries); - super.setReconnectRetries(this.reconnectRetries); + //super.setConnectRetries(this.connectRetries); + //super.setReconnectRetries(this.reconnectRetries); super.setup(context); @@ -145,15 +143,17 @@ public void setup(Context.OperatorContext context) windowTime = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) * context.getValue(DAGContext.STREAMING_WINDOW_SIZE_MILLIS); - arrivedMessagesToProcess = new ArrayBlockingQueue(Integer.parseInt(this.unackedMessageLimit)); + arrivedMessagesToProcess = new ArrayBlockingQueue(unackedMessageLimit); - unackedMessages = new ArrayBlockingQueue((Integer.parseInt(this.unackedMessageLimit)) * (context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT)) * 2); + unackedMessages = new ArrayBlockingQueue(unackedMessageLimit * (context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT)) * 2); endpoint = factory.createQueue(this.endpointName); - if (windowId > idempotentStorageManager.getLargestRecoveryWindow()) { + /* + if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { super.startConsumer(); } + */ try { operatorRecoveredWindows = idempotentStorageManager.getWindowIds(context.getId()); @@ -163,14 +163,11 @@ public void setup(Context.OperatorContext context) } catch (IOException e) { DTThrowable.rethrow(e); } - - } @Override protected T processMessage(BytesXMLMessage message) { - T payload = super.processMessage(message); if (payload != null) { currentWindowRecoveryState.put(message.getMessageIdLong(), payload); @@ -183,7 +180,7 @@ protected T processMessage(BytesXMLMessage message) public void beginWindow(long windowId) { //LOG.debug("Largest Recovery Wndow is : {} for current window: {}", idempotentStorageManager.getLargestRecoveryWindow(), windowId); - currentWindowId = windowId; + super.beginWindow(windowId); if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { //LOG.debug("About to handle recovery, current windowID is: {} largested recovered ID is: {}" + currentWindowId, idempotentStorageManager.getLargestRecoveryWindow()); handleRecovery(windowId); @@ -199,7 +196,7 @@ public void beginWindow(long windowId) @SuppressWarnings("unchecked") protected void handleRecovery(long windowId) { - LOG.info("++++++++++++++++++ Handle Recovery called"); + LOG.info("Handle Recovery called"); Map recoveredData; try { @@ -260,7 +257,6 @@ public void endWindow() ackCompleted = ackMessages(); LOG.debug("acking messages completed successfully: " + ackCompleted); } - } @@ -272,7 +268,7 @@ public void emitTuples() } //If in HA or DR fail-over, block until Solace TCP connection is reestablished so recovery windows are not lost as empty windows int sleepCounter = 0; - while (super.TCPDisconnected == true) { + while (tcpDisconnected) { sleepCounter++; try { Thread.sleep(super.reconnectRetryMillis); @@ -326,7 +322,7 @@ public void emitTuples() */ //Operator was not restarted, re-delivered messages are a result of another partitioned operator restart - if (message.getRedelivered() && inFlightRecoveryMessages.size() == 0 && currentpartitionCount() > 1 && doneDupsPartitioned == true && donePartitionCheck == false) { + if (message.getRedelivered() && inFlightRecoveryMessages.size() == 0 && currentpartitionCount() > 1 && doneDupsPartitioned && donePartitionCheck == false) { try { doneDupsPartitioned = loadPartitionReplayCheck(); } catch (IOException e) { @@ -354,27 +350,27 @@ public void emitTuples() // Checking for duplicates after recovery from DR looking for redelivered messages - if (super.DRFailover == true && !(message.getRedelivered()) && doneDupsDR == true && donePartitionCheck == false) { + if (drFailover && !(message.getRedelivered()) && doneDupsDR && donePartitionCheck == false) { try { doneDupsDR = loadPartitionReplayCheck(); } catch (IOException e) { DTThrowable.rethrow(e); } donePartitionCheck = true; - DR_COUNTER_SIZE = DR_COUNTER_SIZE + arrivedMessagesToProcess.size(); + drCounterSize = drCounterSize + arrivedMessagesToProcess.size(); } - if (inFlightRecoveryMessagesDR.size() == 0 && super.DRFailover == true) { - super.DRFailover = false; + if (inFlightRecoveryMessagesDR.size() == 0 && drFailover) { + drFailover = false; doneDupsDR = true; donePartitionCheck = false; inFlightRecoveryMessagesDR.clear(); LOG.info("Cleared in flight recovery messages, no more possible duplicate messages detected after DR fail over"); } - if (!(message.getRedelivered()) && doneDupsDR == false && inFlightRecoveryMessagesDR.size() > 0 && super.DRFailover == true && DR_COUNTER < DR_COUNTER_SIZE) { - DR_COUNTER++; + if (!(message.getRedelivered()) && doneDupsDR == false && inFlightRecoveryMessagesDR.size() > 0 && drFailover && drCounter < drCounterSize) { + drCounter++; T payload = convert(message); if (inFlightRecoveryMessagesDR.contains(payload)) { LOG.info("Message Duplicate detected after Solace DR fail over"); @@ -387,26 +383,26 @@ public void emitTuples() } //Reset DR processing for duplicates after 2 windows worth of message checks - } else if (DR_COUNTER == DR_COUNTER_SIZE) { + } else if (drCounter == drCounterSize) { //Once there are no more duplicates detected there will be no more duplicates due to DR fail over doneDupsDR = true; donePartitionCheck = false; inFlightRecoveryMessagesDR.clear(); - super.DRFailover = false; - DR_COUNTER = 0; - DR_COUNTER_SIZE = 2 * DEFAULT_BUFFER_SIZE; + drFailover = false; + drCounter = 0; + drCounterSize = 2 * DEFAULT_BUFFER_SIZE; LOG.info("Cleared in flight recovery messages, no more possible duplicate messages detected after DR fail over"); } if (goodToGo) { //if the redelivery flag is no no longer on the messages we can dispose of the inFLightRecoveryMessages - if (message.getRedelivered() == false && inFlightRecoveryMessages.size() > 0 && doneDups == true) { + if (message.getRedelivered() == false && inFlightRecoveryMessages.size() > 0 && doneDups) { inFlightRecoveryMessages.clear(); LOG.info("Cleared in flight recovery messages, no more redelivered or DR recovery messages"); doneDups = false; } - if (message.getRedelivered() == false && inFlightRecoveryMessagesPartition.size() > 0 && doneDupsPartitioned == true) { + if (message.getRedelivered() == false && inFlightRecoveryMessagesPartition.size() > 0 && doneDupsPartitioned) { inFlightRecoveryMessagesPartition.clear(); LOG.info("Cleared in flight recovery messages, no more redelivered messages"); doneDupsPartitioned = false; @@ -423,7 +419,6 @@ public void emitTuples() } catch (InterruptedException e) { DTThrowable.rethrow(e); } - } @Override @@ -483,13 +478,12 @@ private boolean ackMessages() return processedOK; - } public void setEndpointName(String endpointName) { this.endpointName = endpointName; - LOG.info("+++++++++++++++++++enpointName: {}", this.endpointName); + LOG.info("enpointName: {}", this.endpointName); } public int currentpartitionCount() @@ -512,7 +506,7 @@ public int currentpartitionCount() @SuppressWarnings("unchecked") public boolean loadPartitionReplayCheck() throws IOException { - if (!(super.DRFailover)) { + if (!(drFailover)) { LOG.info("Received redelivered message from Solace, another parition must have restarted"); } else { LOG.info("Received DR fail over event from Solace, the partiions are now talking to another Solace Router"); @@ -552,7 +546,7 @@ public boolean loadPartitionReplayCheck() throws IOException } else { for (Map.Entry recoveredEntry : recoveredData.entrySet()) { - if (!(super.DRFailover)) { + if (!(drFailover)) { inFlightRecoveryMessagesPartition.add(recoveredEntry.getValue()); } else { inFlightRecoveryMessagesDR.add(recoveredEntry.getValue()); @@ -571,7 +565,7 @@ public boolean loadPartitionReplayCheck() throws IOException //continue; } else { for (Map.Entry recoveredEntry : recoveredData.entrySet()) { - if (!(super.DRFailover)) { + if (!(drFailover)) { inFlightRecoveryMessagesPartition.add(recoveredEntry.getValue()); } else { inFlightRecoveryMessagesDR.add(recoveredEntry.getValue()); @@ -589,7 +583,7 @@ public boolean loadPartitionReplayCheck() throws IOException LOG.info("Added parition data from partition: {}", arrOpIds[i]); } - if (!(super.DRFailover)) { + if (!(drFailover)) { LOG.info("Total Recovery Partition Data Records: {} ", inFlightRecoveryMessagesPartition.size()); } else { LOG.info("Total Recovery DR fail over Data Records: {}", inFlightRecoveryMessagesDR.size()); @@ -602,13 +596,12 @@ public boolean loadPartitionReplayCheck() throws IOException // Start the actual consumption of Solace messages from the Queue protected Consumer getConsumer() throws JCSMPException { - ConsumerFlowProperties consumerFlowProperties = new ConsumerFlowProperties(); consumerFlowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT); consumerFlowProperties.setEndpoint(this.endpoint); - FlowReceiver f_receiver = session.createFlow(FlowHandler, consumerFlowProperties); + FlowReceiver f_receiver = session.createFlow(flowHandler, consumerFlowProperties); f_receiver.start(); LOG.info("Flow started on queue: {}", f_receiver.getDestination()); return f_receiver; @@ -629,19 +622,20 @@ public IdempotentStorageManager getIdempotentStorageManager() public class FlowCallbackMessageHandler implements XMLMessageListener { - - @Override public void onException(JCSMPException e) { DTThrowable.rethrow(e); - } @Override public void onReceive(BytesXMLMessage message) { - arrivedMessagesToProcess.add(message); + try { + arrivedMessagesToProcess.put(message); + } catch (InterruptedException e) { + DTThrowable.rethrow(e); + } } } diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java index 7c4ceb104f..1b9c32e810 100644 --- a/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java +++ b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceGuaranteedTextStrInputOperator.java @@ -42,7 +42,6 @@ protected String convert(BytesXMLMessage message) } return out; - } @Override @@ -56,7 +55,6 @@ protected void emitTuple(String tuple) protected void clearConsumer() throws JCSMPException { // TODO Auto-generated method stub - } } diff --git a/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java index 52748da1e8..a11a288652 100644 --- a/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java +++ b/solace/src/main/java/org/apache/apex/malhar/solace/SolaceReliableTextStrInputOperator.java @@ -41,7 +41,6 @@ protected String convert(BytesXMLMessage message) } return out; - } @Override @@ -55,7 +54,6 @@ protected void emitTuple(String tuple) protected void clearConsumer() throws JCSMPException { // TODO Auto-generated method stub - } }