Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodin committed Aug 31, 2017
1 parent 5dd66c9 commit 508b205
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 154 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@
<module>contrib</module>
<module>kafka</module>
<module>examples</module>
<module>solace</module>
</modules>

<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion solace/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.apex</groupId>
<artifactId>malhar</artifactId>
<version>3.7.0-SNAPSHOT</version>
<version>3.8.0-SNAPSHOT</version>
</parent>

<artifactId>malhar-solace</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.apex.malhar.solace;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
Expand All @@ -42,26 +40,23 @@
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.CheckpointListener;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.netlet.util.DTThrowable;

@SuppressWarnings("unused")
public abstract class AbstractSolaceBaseInputOperator<T> extends BaseOperator implements
InputOperator, Operator.ActivationListener<Context.OperatorContext>, CheckpointListener
InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener
{

private static final Logger logger = LoggerFactory.getLogger(AbstractSolaceBaseInputOperator.class);

@NotNull
protected JCSMPProperties properties = new JCSMPProperties();
protected String connectRetries;
protected String reconnectRetries;
protected String unackedMessageLimit;
protected int connectRetries;
protected int reconnectRetries;
protected int unackedMessageLimit;


//protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
protected FSOpsIdempotentStorageManager idempotentStorageManager = new FSOpsIdempotentStorageManager();

protected transient JCSMPFactory factory;
Expand All @@ -71,16 +66,18 @@ public abstract class AbstractSolaceBaseInputOperator<T> extends BaseOperator im
protected transient Consumer reliableConsumer;

protected transient int operatorId;
protected long windowId;
protected transient long currentWindowId;
protected transient long lastCompletedWId;

protected transient int emitCount;

protected transient volatile boolean DRFailover = false;
protected transient volatile boolean TCPDisconnected = false;
protected transient volatile boolean drFailover = false;
protected transient volatile boolean tcpDisconnected = false;

protected transient BlockingQueue<BytesXMLMessage> unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove
protected LinkedList<Long> inFlightMessageId = new LinkedList<Long>(); //keeps track of all in flight IDs since they are not necessarily sequential
//protected transient BlockingQueue<BytesXMLMessage> unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove
//protected LinkedList<Long> inFlightMessageId = new LinkedList<Long>(); //keeps track of all in flight IDs since they are not necessarily sequential
// Messages are received asynchronously and collected in a queue, these are processed by the main operator thread and at that time fault tolerance
// and idempotency processing is done so this queue can remain transient
protected transient ArrayBlockingQueue<BytesXMLMessage> arrivedTopicMessagesToProcess;

//protected transient com.solace.dt.operator.DTSolaceOperatorInputOutput.ArrayBlockingQueue<BytesXMLMessage> arrivedMessagesToProcess;
Expand All @@ -89,23 +86,22 @@ public abstract class AbstractSolaceBaseInputOperator<T> extends BaseOperator im

private transient CallbackMessageHandler cbHandler = new CallbackMessageHandler();

int spinMillis;
protected transient int spinMillis;

protected transient int reconnectRetryMillis = 0;

@Override
public void setup(Context.OperatorContext context)
{

operatorId = context.getId();
logger.info("OperatorID from Base class: {}", operatorId);
logger.debug("OperatorID: {}", operatorId);
spinMillis = context.getValue(com.datatorrent.api.Context.OperatorContext.SPIN_MILLIS);
factory = JCSMPFactory.onlyInstance();

//Required for HA and DR to try forever if set to "-1"
JCSMPChannelProperties channelProperties = (JCSMPChannelProperties)this.properties.getProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES);
channelProperties.setConnectRetries(Integer.parseInt(this.connectRetries));
channelProperties.setReconnectRetries(Integer.parseInt(this.reconnectRetries));
channelProperties.setConnectRetries(this.connectRetries);
channelProperties.setReconnectRetries(this.reconnectRetries);

reconnectRetryMillis = channelProperties.getReconnectRetryWaitInMillis();

Expand All @@ -122,16 +118,17 @@ public void setup(Context.OperatorContext context)

idempotentStorageManager.setup(context);
lastCompletedWId = idempotentStorageManager.getLargestRecoveryWindow();
//logger.debug("++++++++++++++++++++Largest Completed: " + lastCompletedWId);

//logger.debug("Largest Completed: " + lastCompletedWId);
}

@Override
public void beforeCheckpoint(long l)
{
}

@Override
public void checkpointed(long arg0)
{
// TODO Auto-generated method stub

}

@Override
Expand All @@ -154,7 +151,6 @@ protected T processMessage(BytesXMLMessage message)
return tuple;
}


@Override
public void activate(Context.OperatorContext context)
{
Expand All @@ -163,7 +159,6 @@ public void activate(Context.OperatorContext context)
reliableConsumer = session.getMessageConsumer(rcHandler, cbHandler);
//consumer = getConsumer();
reliableConsumer.start();

} catch (JCSMPException e) {
DTThrowable.rethrow(e);
}
Expand All @@ -173,9 +168,11 @@ public void activate(Context.OperatorContext context)
public void deactivate()
{
try {
consumer.stop();
clearConsumer();
consumer.close();
if (consumer != null) {
consumer.stop();
clearConsumer();
consumer.close();
}
reliableConsumer.close();
} catch (JCSMPException e) {
DTThrowable.rethrow(e);
Expand All @@ -185,7 +182,6 @@ public void deactivate()
@Override
public void teardown()
{

idempotentStorageManager.teardown();
session.closeSession();
}
Expand All @@ -194,10 +190,9 @@ public void teardown()
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
this.windowId = windowId;
this.currentWindowId = windowId;
}


protected abstract T convert(BytesXMLMessage message);

protected abstract void emitTuple(T tuple);
Expand All @@ -209,43 +204,46 @@ public void beginWindow(long windowId)
public void setProperties(JCSMPProperties properties)
{
this.properties = properties;

}

public JCSMPProperties getProperties()
{
return properties;
}

public IdempotentStorageManager getIdempotentStorageManager()
{
return idempotentStorageManager;
}

public void setUnackedMessageLimit(String unackedMessageLimit)
public void setUnackedMessageLimit(int unackedMessageLimit)
{
this.unackedMessageLimit = unackedMessageLimit;
}

public String getUnackedMessageLimit()
public int getUnackedMessageLimit()
{
return unackedMessageLimit;
}

public void setConnectRetries(String connectRetries)
public void setConnectRetries(int connectRetries)
{
this.connectRetries = connectRetries;
logger.info("+++++++++++++++++++reconnectRetries: {}", this.connectRetries);
logger.debug("connectRetries: {}", this.connectRetries);
}

public void setReconnectRetries(String reconnectRetries)
public void setReconnectRetries(int reconnectRetries)
{
this.reconnectRetries = reconnectRetries;
logger.info("+++++++++++++++++++reconnectRetries: {}", this.reconnectRetries);
logger.debug("reconnectRetries: {}", this.reconnectRetries);
}

public void setReapplySubscriptions(boolean state)
{
this.properties.setBooleanProperty(JCSMPProperties.REAPPLY_SUBSCRIPTIONS, state);
}

public void startConsumer()
protected void startConsumer()
{
try {
consumer = getConsumer();
Expand All @@ -257,42 +255,36 @@ public void startConsumer()

public class ReconnectCallbackHandler implements JCSMPReconnectEventHandler
{


@Override
public void postReconnect() throws JCSMPException
{

logger.info("++++++++++++++Solace client now Reconnected -- possibe Solace HA or DR fail-over +++++++++++++");
TCPDisconnected = false;
logger.info("Solace client now Reconnected -- possibe Solace HA or DR fail-over");
tcpDisconnected = false;

}

@Override
public boolean preReconnect() throws JCSMPException
{
DRFailover = false;
logger.info("++++++++++++++Solace client now in Pre Reconnect state -- possibe Solace HA or DR fail-over +++++++++++++");
TCPDisconnected = true;
drFailover = false;
logger.info("Solace client now in Pre Reconnect state -- possibe Solace HA or DR fail-over");
tcpDisconnected = true;
return true;
}

}

public class PrintingSessionEventHandler implements SessionEventHandler
{


public void handleEvent(SessionEventArgs event)
{
logger.info("Received Session Event %s with info %s\n", event.getEvent(), event.getInfo());

// Received event possibly due to DR fail-ver complete
if (event.getEvent() == SessionEvent.VIRTUAL_ROUTER_NAME_CHANGED) {
DRFailover = true; // may or may not need recovery
TCPDisconnected = false;
drFailover = true; // may or may not need recovery
tcpDisconnected = false;
}

}
}

Expand All @@ -303,17 +295,18 @@ public class CallbackMessageHandler implements XMLMessageListener
public void onException(JCSMPException e)
{
DTThrowable.rethrow(e);

}

@Override
public void onReceive(BytesXMLMessage msg)
{
arrivedTopicMessagesToProcess.add(msg);

try {
arrivedTopicMessagesToProcess.put(msg);
} catch (InterruptedException e) {
DTThrowable.rethrow(e);
}
}

}


}
Loading

0 comments on commit 508b205

Please sign in to comment.