Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

APEXMALHAR-2539 Solace input operators #571

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

pramodin
Copy link
Contributor

@pramodin pramodin commented Mar 6, 2017

Solace input operators to read data from solace messaging system.

This was built as a team effort between Solace and DataTorrent, hence the multiple authors in the commit. The authors have been listed in the order of the size of the contribution.

@pramodin
Copy link
Contributor Author

pramodin commented Mar 7, 2017

@sanjaypujare please see


<dependencies>
<dependency>
<groupId>com.solacesystems</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the license situation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The license info is linked here
https://mvnrepository.com/artifact/com.solacesystems/sol-jcsmp/10.0.1
and available here
http://dev.solace.com/solace-license.txt

I am going through it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the license which is "solely for the Licensee’s internal business purposes". But we are just referencing the software as a dependency in this pom file which does not subject us to any redistribution or bundling terms. The source code here itself has Apache license so I think we are okay.

Copy link
Contributor

@tweise tweise Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide the references for "I think we are okay".

One potential issue is that users that are not aware of licensing restrictions unknowingly create restrictions for their own product, as it would be the case with xGPL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference http://dev.solace.com/solace-license.txt and section 2.2:

"2.2 License of APIs. Provided Licensee complies with this Agreement and any terms that SOLACE provides, SOLACE grants to Licensee a non-exclusive, royalty free license, during the term of this Agreement, to download, install and use, the applicable application programming interfaces that may be made available by SOLACE with the Software (“APIs”) solely to create interfaces between the Software and the Licensee’s software or third party software on Licensee’s systems."

We are creating the interface between Solace and our software/3rd party software.

Also the Solace binary jars are made available on the maven repository and that combined with the "internal business use" license described in 1.1 the users of these operators are allowed to use the jars. Only for the redistribution scenario the developers may need to be aware of this license IMO.

solace/pom.xml Outdated
<parent>
<groupId>org.apache.apex</groupId>
<artifactId>malhar</artifactId>
<version>3.7.0-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=> 3.8.0-SNAPSHOT

import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.CheckpointListener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is deprecated, so can we use CheckpointNotificationListener instead? Why use deprecated class in new code?

import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.CheckpointListener;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.IdempotentStorageManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This too is deprecated so why not use WindowDataManager instead?

private static final Logger logger = LoggerFactory.getLogger(AbstractSolaceBaseInputOperator.class);

@NotNull
protected JCSMPProperties properties = new JCSMPProperties();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should take a look at this properties variable and its NotNull constraint and where all this variable is set. My concerns are:

  • here it is initialized with an empty JCSMProperties object only to address the NotNull constraint
  • the setProperties() method just overwrites the existing variable with a new value
  • there is a place where a property is set in the existing JCSMProperties object JCSMPProperties.REAPPLY_SUBSCRIPTIONS.
  • if setProperties() is called after setReapplySubscriptions() won't it clobber the earlier setting?

To solve this can we change setProperties() to copy properties from the argument to the member variable as my comment there says?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the properties variable initialized helps in configuring the operator from properties from a property file, for example operator.prop.properties.properties['name']...


public void setProperties(JCSMPProperties properties)
{
this.properties = properties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to:

this.properties.fromProperties(properties.toProperties());

Or a better way to copy properties from the argument to the properties data member?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is like down converting and up converting back again. Any specific advantages to this?

{

operatorId = context.getId();
logger.info("OperatorID from Base class: {}", operatorId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why logger.info() and not logger.debug()? Also the "from Base class" comment is not clear. Do you mean AbstractSolaceBaseInputOperator class? Is this logger line useful?

protected JCSMPProperties properties = new JCSMPProperties();
protected String connectRetries;
protected String reconnectRetries;
protected String unackedMessageLimit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All 3 variables (connectRetries, reconnectRetries and unackedMessageLimit) sound like a number and they are indeed used as integers ultimately to set integral values on other objects. I would like these to be treated as int's here as well instead of String's. I can't think of a reason why they are String's now.


//setup info for HA and DR at the transport level
super.setConnectRetries(this.connectRetries);
super.setReconnectRetries(this.reconnectRetries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above 2 set* calls don't make any sense other than the fact that the call generates a logger.info line. Note that connectRetries and reconnectRetries are defined only in AbstractSolaceBaseInputOperator so

super.setConnectRetries(this.connectRetries)

assigns a variable value to itself via a function call. Need a reason for this code.

protected String unackedMessageLimit;


//protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to have this commented line around since it declares the same idempotentStorageManager variable? Best to remove this line



try {
session = factory.createSession(this.properties, null, new PrintingSessionEventHandler());
Copy link
Contributor

@sanjaypujare sanjaypujare May 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more appropriate to move this to activate() where you createSession just before connect()?

Also the 2nd argument is null, but is it better to pass factory.getDefaultContext() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an offline call so I think it is ok here. It should also be ok to leave the second argument as null because internally it makes that assignment when it is null.

protected transient JCSMPSession session;

protected transient Consumer consumer;
protected transient Consumer reliableConsumer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have code comments to describe the purpose of having 2 consumers with one being called reliableConsumer.

} catch (InterruptedException e) {
DTThrowable.rethrow(e);
}
emitCount++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this extra increment of emitCount? Is this correct? If yes, can we have a comment about why this is required?

//super.setUnackedMessageLimit(this.unackedMessageLimit);
//setup info for HA and DR at the transport level
super.setConnectRetries(this.connectRetries);
super.setReconnectRetries(this.reconnectRetries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, why have these redundant calls?



@Override
public void emitTuples()
Copy link
Contributor

@sanjaypujare sanjaypujare May 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we break this 160 line function into smaller functions? My suggestions follow. Also please get rid of blank lines that serve no purpose



// Checking for duplicates after recovery from DR looking for redelivered messages
if (super.DRFailover == true && !(message.getRedelivered()) && doneDupsDR == true && donePartitionCheck == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "== true" everywhere .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also is the reference to super. necessary?

@Override
public void checkpointed(long arg0)
{
// TODO Auto-generated method stub
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls remove the autogenerated comment,


}

protected T processMessage(BytesXMLMessage message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This processMessage is overridden in subclasses AbstractSolaceDirectInputOperator and AbstractSolaceGuaranteedIdempotentInputOperator and those methods also call this one as super.processMessage(..). Unless this processMessage(..) is going to be used in future by other subclasses without overriding it can we call this method something else (say processMessageHelper) so it is clearer

try {
session.connect();
reliableConsumer = session.getMessageConsumer(rcHandler, cbHandler);
//consumer = getConsumer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the commented code

LOG.info("Sleeping for another 30 seconds waiting for TCP reconnect for Solace with milliseconds sleep per cycle = {}", super.reconnectRetryMillis);
}
LOG.info("Queued messages to process: {}", arrivedMessagesToProcess.size());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above block of 14 lines can be a separate method?



if (goodToGo) {
//if the redelivery flag is no no longer on the messages we can dispose of the inFLightRecoveryMessages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole block up to line 419 can be refactored into a separate function.


protected static final transient int DEFAULT_BUFFER_SIZE = 500;
protected static transient int DR_COUNTER_SIZE = 2 * DEFAULT_BUFFER_SIZE;
protected transient int DR_COUNTER = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the above 2 variables defined in upper case when they are not constants? Can we change to camel-case? Also DR_COUNTER_SIZE is class static but I am not sure if that is intended.

protected transient volatile boolean DRFailover = false;
protected transient volatile boolean TCPDisconnected = false;

protected transient BlockingQueue<BytesXMLMessage> unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unackedMessages is not used anywhere in the project. What is the purpose? Can we remove it? If it needs to be kept can we have some comments about why it is there and why it is transient?

protected transient volatile boolean TCPDisconnected = false;

protected transient BlockingQueue<BytesXMLMessage> unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove
protected LinkedList<Long> inFlightMessageId = new LinkedList<Long>(); //keeps track of all in flight IDs since they are not necessarily sequential
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This too is not used anywhere. Can we remove it? Also note this is not transient...


protected transient BlockingQueue<BytesXMLMessage> unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove
protected LinkedList<Long> inFlightMessageId = new LinkedList<Long>(); //keeps track of all in flight IDs since they are not necessarily sequential
protected transient ArrayBlockingQueue<BytesXMLMessage> arrivedTopicMessagesToProcess;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is declared transient. It will be good to describe (in a code comment) how you don't lose messages even if this is transient (e.g. similar to Kafka's EARLIEST or APPLICATION setting for the offset).


private transient CallbackMessageHandler cbHandler = new CallbackMessageHandler();

int spinMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not transient? It is initialized in setup()

@Override
public void onReceive(BytesXMLMessage msg)
{
arrivedTopicMessagesToProcess.add(msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of things:

  • would put(msg) be better than add() since put is blocking and that is what you want when the queue is full (depends on the contract of XMLMessageListener)

  • even with add() if it throws IllegalStateException (when the queue is full) who is going to handle it? Won't there be a loss of messages otherwise?

  • don't we need to synchronize on the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a synchronized queue

}
//If in HA or DR fail-over, block until Solace TCP connection is reestablished so recovery windows are not lost as empty windows
int sleepCounter = 0;
while (super.TCPDisconnected == true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=> while (TCPDisconnected)

unless there is some advantage in using this idiom that I am not aware of

import com.datatorrent.netlet.util.DTThrowable;

@OperatorAnnotation(checkpointableWithinAppWindow = false)
public abstract class AbstractSolaceGuaranteedIdempotentInputOperator<T> extends AbstractSolaceBaseInputOperator<T> implements InputOperator, Operator.ActivationListener<Context.OperatorContext>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs for this class to explain what this class does and how it is different from other input operators?


}

/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove all this commented code? If there is some other value in this commented code can we instead use code documentation to achieve that result?

return null;
}

/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented code?

}
}

/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented code?

public void setConnectRetries(String connectRetries)
{
this.connectRetries = connectRetries;
logger.info("+++++++++++++++++++reconnectRetries: {}", this.connectRetries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep this logger.info in production code? Why is showing this more important than other properties?

public void setReconnectRetries(String reconnectRetries)
{
this.reconnectRetries = reconnectRetries;
logger.info("+++++++++++++++++++reconnectRetries: {}", this.reconnectRetries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for this logger.info.

public void postReconnect() throws JCSMPException
{

logger.info("++++++++++++++Solace client now Reconnected -- possibe Solace HA or DR fail-over +++++++++++++");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leading and trailing "++++" in the message is not the standard format in other log messages in malhar. We should standardize on log messages

{
try {
consumer.stop();
clearConsumer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearConsumer() is a no-op everywhere. Can we remove the definition and call unless there is a reason for it in the future?

consumer.stop();
clearConsumer();
consumer.close();
reliableConsumer.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there no

reliableConsumer.stop();

?

Also the deactivate() and activate() are not symmetric in terms of how consumer is initialized and closed so it is difficult to prove the correctness of the logic. Is there any way we can make it symmetric?


@Override
public void beginWindow(long windowId)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call to super.beginWindow() is missing which is highly suspicious. Note that assignment to super.windowId doesn't then happen but super.windowId is used elsewhere in the class. Also see my comment about possible duplication between super.windowId and this.currentWindowId.

private transient BytesXMLMessage recentMessage = null;

private transient long[] operatorRecoveredWindows;
protected transient long currentWindowId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a "windowId" variable in AbstractSolaceBaseInputOperator (super class). So this currentWindowId serves the same purpose it looks like so this seems to be duplication which is also error prone.


private final transient AtomicReference<Throwable> throwable;
private transient Context.OperatorContext context;
protected transient long currentWindowId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it serve the same purpose as windowId in AbstractSolaceBaseInputOperator (super class)? If there is duplication can we eliminate it?

private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceDirectInputOperator.class);

private final transient AtomicReference<Throwable> throwable;
private transient Context.OperatorContext context;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is set but not used anywhere. Can we remove it?


private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceDirectInputOperator.class);

private final transient AtomicReference<Throwable> throwable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used anywhere. Can we remove?

@NotNull
protected String topicName;
private transient Topic topic;
private transient BytesXMLMessage recentMessage = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recentMessage is assigned to but never used. Can we remove?

private transient Topic topic;
private transient BytesXMLMessage recentMessage = null;

private transient List<T> messages = new ArrayList<T>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never used anywhere. Can we remove?

private transient BytesXMLMessage recentMessage = null;

private transient List<T> messages = new ArrayList<T>();
protected final transient Map<Long, T> currentWindowRecoveryState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is marked transient but created in the constructor (not setup). Looks highly suspicious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It holds the idempotent state for the current window before it is saved.


private transient List<T> messages = new ArrayList<T>();
protected final transient Map<Long, T> currentWindowRecoveryState;
protected static final transient int DEFAULT_BUFFER_SIZE = 500;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transient not necessary for a static final integer constant. @PramodSSImmaneni ?

super.setReconnectRetries(this.reconnectRetries);
super.setReapplySubscriptions(true);

super.setup(context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally super.setup(..) should be at the top unless there is a logical reason for it to be where it is now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional property reapply subscription is being set


super.setup(context);
topic = factory.createTopic(topicName);
addSubscription(topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this to activate() (esp if session creation happens in super.activate())

public void beginWindow(long windowId)
{
currentWindowId = windowId;
super.beginWindow(windowId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to the top unless logic requires it?



@NotNull
protected transient EndpointType endpointType = EndpointType.QUEUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endpointType not used anywhere. Pls remove

@tweise
Copy link
Contributor

tweise commented May 22, 2017

Commit should have a single author so that the contributor stats are not messed up. Work can be split into multiple commits if that is necessary.

@pramodin pramodin changed the title Solace input operators APEXMALHAR-2539 Solace input operators Aug 31, 2017
@pramodin
Copy link
Contributor Author

@tweise to your comment, in this case unfortunately individual contributions weren't clearly kept track of and hence the single commit. I don't think contributor stats should be the reason to not allow a collaborative commit, that stats information and graph generated by other tools, that is mainly tracking information and not central to the working or contents of the project, being a little inaccurate should be of concern. These will be very few and far between and will be a small minority and would probably appear at the bottom of most of those stats.

@pramodin
Copy link
Contributor Author

Went through one more round of refactoring and changes.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants