-
Notifications
You must be signed in to change notification settings - Fork 148
APEXMALHAR-2539 Solace input operators #571
base: master
Are you sure you want to change the base?
Conversation
@sanjaypujare please see |
|
||
<dependencies> | ||
<dependency> | ||
<groupId>com.solacesystems</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the license situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. The license info is linked here
https://mvnrepository.com/artifact/com.solacesystems/sol-jcsmp/10.0.1
and available here
http://dev.solace.com/solace-license.txt
I am going through it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the license which is "solely for the Licensee’s internal business purposes". But we are just referencing the software as a dependency in this pom file which does not subject us to any redistribution or bundling terms. The source code here itself has Apache license so I think we are okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide the references for "I think we are okay".
One potential issue is that users that are not aware of licensing restrictions unknowingly create restrictions for their own product, as it would be the case with xGPL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reference http://dev.solace.com/solace-license.txt and section 2.2:
"2.2 License of APIs. Provided Licensee complies with this Agreement and any terms that SOLACE provides, SOLACE grants to Licensee a non-exclusive, royalty free license, during the term of this Agreement, to download, install and use, the applicable application programming interfaces that may be made available by SOLACE with the Software (“APIs”) solely to create interfaces between the Software and the Licensee’s software or third party software on Licensee’s systems."
We are creating the interface between Solace and our software/3rd party software.
Also the Solace binary jars are made available on the maven repository and that combined with the "internal business use" license described in 1.1 the users of these operators are allowed to use the jars. Only for the redistribution scenario the developers may need to be aware of this license IMO.
solace/pom.xml
Outdated
<parent> | ||
<groupId>org.apache.apex</groupId> | ||
<artifactId>malhar</artifactId> | ||
<version>3.7.0-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
=> 3.8.0-SNAPSHOT
import com.datatorrent.api.Context; | ||
import com.datatorrent.api.InputOperator; | ||
import com.datatorrent.api.Operator; | ||
import com.datatorrent.api.Operator.CheckpointListener; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is deprecated, so can we use CheckpointNotificationListener instead? Why use deprecated class in new code?
import com.datatorrent.api.Operator; | ||
import com.datatorrent.api.Operator.CheckpointListener; | ||
import com.datatorrent.common.util.BaseOperator; | ||
import com.datatorrent.lib.io.IdempotentStorageManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This too is deprecated so why not use WindowDataManager instead?
private static final Logger logger = LoggerFactory.getLogger(AbstractSolaceBaseInputOperator.class); | ||
|
||
@NotNull | ||
protected JCSMPProperties properties = new JCSMPProperties(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should take a look at this properties variable and its NotNull constraint and where all this variable is set. My concerns are:
- here it is initialized with an empty JCSMProperties object only to address the NotNull constraint
- the setProperties() method just overwrites the existing variable with a new value
- there is a place where a property is set in the existing JCSMProperties object JCSMPProperties.REAPPLY_SUBSCRIPTIONS.
- if setProperties() is called after setReapplySubscriptions() won't it clobber the earlier setting?
To solve this can we change setProperties() to copy properties from the argument to the member variable as my comment there says?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having the properties variable initialized helps in configuring the operator from properties from a property file, for example operator.prop.properties.properties['name']...
|
||
public void setProperties(JCSMPProperties properties) | ||
{ | ||
this.properties = properties; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this to:
this.properties.fromProperties(properties.toProperties());
Or a better way to copy properties from the argument to the properties data member?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is like down converting and up converting back again. Any specific advantages to this?
{ | ||
|
||
operatorId = context.getId(); | ||
logger.info("OperatorID from Base class: {}", operatorId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why logger.info() and not logger.debug()? Also the "from Base class" comment is not clear. Do you mean AbstractSolaceBaseInputOperator class? Is this logger line useful?
protected JCSMPProperties properties = new JCSMPProperties(); | ||
protected String connectRetries; | ||
protected String reconnectRetries; | ||
protected String unackedMessageLimit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All 3 variables (connectRetries, reconnectRetries and unackedMessageLimit) sound like a number and they are indeed used as integers ultimately to set integral values on other objects. I would like these to be treated as int's here as well instead of String's. I can't think of a reason why they are String's now.
|
||
//setup info for HA and DR at the transport level | ||
super.setConnectRetries(this.connectRetries); | ||
super.setReconnectRetries(this.reconnectRetries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above 2 set* calls don't make any sense other than the fact that the call generates a logger.info line. Note that connectRetries and reconnectRetries are defined only in AbstractSolaceBaseInputOperator so
super.setConnectRetries(this.connectRetries)
assigns a variable value to itself via a function call. Need a reason for this code.
protected String unackedMessageLimit; | ||
|
||
|
||
//protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to have this commented line around since it declares the same idempotentStorageManager variable? Best to remove this line
|
||
|
||
try { | ||
session = factory.createSession(this.properties, null, new PrintingSessionEventHandler()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it more appropriate to move this to activate() where you createSession just before connect()?
Also the 2nd argument is null, but is it better to pass factory.getDefaultContext() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an offline call so I think it is ok here. It should also be ok to leave the second argument as null because internally it makes that assignment when it is null.
protected transient JCSMPSession session; | ||
|
||
protected transient Consumer consumer; | ||
protected transient Consumer reliableConsumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have code comments to describe the purpose of having 2 consumers with one being called reliableConsumer.
} catch (InterruptedException e) { | ||
DTThrowable.rethrow(e); | ||
} | ||
emitCount++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this extra increment of emitCount? Is this correct? If yes, can we have a comment about why this is required?
//super.setUnackedMessageLimit(this.unackedMessageLimit); | ||
//setup info for HA and DR at the transport level | ||
super.setConnectRetries(this.connectRetries); | ||
super.setReconnectRetries(this.reconnectRetries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above, why have these redundant calls?
|
||
|
||
@Override | ||
public void emitTuples() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we break this 160 line function into smaller functions? My suggestions follow. Also please get rid of blank lines that serve no purpose
|
||
|
||
// Checking for duplicates after recovery from DR looking for redelivered messages | ||
if (super.DRFailover == true && !(message.getRedelivered()) && doneDupsDR == true && donePartitionCheck == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove "== true" everywhere .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also is the reference to super. necessary?
@Override | ||
public void checkpointed(long arg0) | ||
{ | ||
// TODO Auto-generated method stub |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls remove the autogenerated comment,
|
||
} | ||
|
||
protected T processMessage(BytesXMLMessage message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This processMessage is overridden in subclasses AbstractSolaceDirectInputOperator and AbstractSolaceGuaranteedIdempotentInputOperator and those methods also call this one as super.processMessage(..). Unless this processMessage(..) is going to be used in future by other subclasses without overriding it can we call this method something else (say processMessageHelper) so it is clearer
try { | ||
session.connect(); | ||
reliableConsumer = session.getMessageConsumer(rcHandler, cbHandler); | ||
//consumer = getConsumer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the commented code
LOG.info("Sleeping for another 30 seconds waiting for TCP reconnect for Solace with milliseconds sleep per cycle = {}", super.reconnectRetryMillis); | ||
} | ||
LOG.info("Queued messages to process: {}", arrivedMessagesToProcess.size()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above block of 14 lines can be a separate method?
|
||
|
||
if (goodToGo) { | ||
//if the redelivery flag is no no longer on the messages we can dispose of the inFLightRecoveryMessages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this whole block up to line 419 can be refactored into a separate function.
|
||
protected static final transient int DEFAULT_BUFFER_SIZE = 500; | ||
protected static transient int DR_COUNTER_SIZE = 2 * DEFAULT_BUFFER_SIZE; | ||
protected transient int DR_COUNTER = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are the above 2 variables defined in upper case when they are not constants? Can we change to camel-case? Also DR_COUNTER_SIZE is class static but I am not sure if that is intended.
protected transient volatile boolean DRFailover = false; | ||
protected transient volatile boolean TCPDisconnected = false; | ||
|
||
protected transient BlockingQueue<BytesXMLMessage> unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unackedMessages is not used anywhere in the project. What is the purpose? Can we remove it? If it needs to be kept can we have some comments about why it is there and why it is transient?
protected transient volatile boolean TCPDisconnected = false; | ||
|
||
protected transient BlockingQueue<BytesXMLMessage> unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove | ||
protected LinkedList<Long> inFlightMessageId = new LinkedList<Long>(); //keeps track of all in flight IDs since they are not necessarily sequential |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This too is not used anywhere. Can we remove it? Also note this is not transient...
|
||
protected transient BlockingQueue<BytesXMLMessage> unackedMessages; // hosts the Solace messages that need to be acked when the streaming window is OK to remove | ||
protected LinkedList<Long> inFlightMessageId = new LinkedList<Long>(); //keeps track of all in flight IDs since they are not necessarily sequential | ||
protected transient ArrayBlockingQueue<BytesXMLMessage> arrivedTopicMessagesToProcess; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is declared transient. It will be good to describe (in a code comment) how you don't lose messages even if this is transient (e.g. similar to Kafka's EARLIEST or APPLICATION setting for the offset).
|
||
private transient CallbackMessageHandler cbHandler = new CallbackMessageHandler(); | ||
|
||
int spinMillis; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not transient? It is initialized in setup()
@Override | ||
public void onReceive(BytesXMLMessage msg) | ||
{ | ||
arrivedTopicMessagesToProcess.add(msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of things:
-
would put(msg) be better than add() since put is blocking and that is what you want when the queue is full (depends on the contract of XMLMessageListener)
-
even with add() if it throws IllegalStateException (when the queue is full) who is going to handle it? Won't there be a loss of messages otherwise?
-
don't we need to synchronize on the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a synchronized queue
} | ||
//If in HA or DR fail-over, block until Solace TCP connection is reestablished so recovery windows are not lost as empty windows | ||
int sleepCounter = 0; | ||
while (super.TCPDisconnected == true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
=> while (TCPDisconnected)
unless there is some advantage in using this idiom that I am not aware of
import com.datatorrent.netlet.util.DTThrowable; | ||
|
||
@OperatorAnnotation(checkpointableWithinAppWindow = false) | ||
public abstract class AbstractSolaceGuaranteedIdempotentInputOperator<T> extends AbstractSolaceBaseInputOperator<T> implements InputOperator, Operator.ActivationListener<Context.OperatorContext> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadocs for this class to explain what this class does and how it is different from other input operators?
|
||
} | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove all this commented code? If there is some other value in this commented code can we instead use code documentation to achieve that result?
return null; | ||
} | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented code?
} | ||
} | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented code?
public void setConnectRetries(String connectRetries) | ||
{ | ||
this.connectRetries = connectRetries; | ||
logger.info("+++++++++++++++++++reconnectRetries: {}", this.connectRetries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to keep this logger.info in production code? Why is showing this more important than other properties?
public void setReconnectRetries(String reconnectRetries) | ||
{ | ||
this.reconnectRetries = reconnectRetries; | ||
logger.info("+++++++++++++++++++reconnectRetries: {}", this.reconnectRetries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto for this logger.info.
public void postReconnect() throws JCSMPException | ||
{ | ||
|
||
logger.info("++++++++++++++Solace client now Reconnected -- possibe Solace HA or DR fail-over +++++++++++++"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The leading and trailing "++++" in the message is not the standard format in other log messages in malhar. We should standardize on log messages
{ | ||
try { | ||
consumer.stop(); | ||
clearConsumer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clearConsumer() is a no-op everywhere. Can we remove the definition and call unless there is a reason for it in the future?
consumer.stop(); | ||
clearConsumer(); | ||
consumer.close(); | ||
reliableConsumer.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there no
reliableConsumer.stop();
?
Also the deactivate() and activate() are not symmetric in terms of how consumer is initialized and closed so it is difficult to prove the correctness of the logic. Is there any way we can make it symmetric?
|
||
@Override | ||
public void beginWindow(long windowId) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call to super.beginWindow() is missing which is highly suspicious. Note that assignment to super.windowId doesn't then happen but super.windowId is used elsewhere in the class. Also see my comment about possible duplication between super.windowId and this.currentWindowId.
private transient BytesXMLMessage recentMessage = null; | ||
|
||
private transient long[] operatorRecoveredWindows; | ||
protected transient long currentWindowId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a "windowId" variable in AbstractSolaceBaseInputOperator (super class). So this currentWindowId serves the same purpose it looks like so this seems to be duplication which is also error prone.
|
||
private final transient AtomicReference<Throwable> throwable; | ||
private transient Context.OperatorContext context; | ||
protected transient long currentWindowId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it serve the same purpose as windowId in AbstractSolaceBaseInputOperator (super class)? If there is duplication can we eliminate it?
private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceDirectInputOperator.class); | ||
|
||
private final transient AtomicReference<Throwable> throwable; | ||
private transient Context.OperatorContext context; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is set but not used anywhere. Can we remove it?
|
||
private static final Logger LOG = LoggerFactory.getLogger(AbstractSolaceDirectInputOperator.class); | ||
|
||
private final transient AtomicReference<Throwable> throwable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used anywhere. Can we remove?
@NotNull | ||
protected String topicName; | ||
private transient Topic topic; | ||
private transient BytesXMLMessage recentMessage = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recentMessage is assigned to but never used. Can we remove?
private transient Topic topic; | ||
private transient BytesXMLMessage recentMessage = null; | ||
|
||
private transient List<T> messages = new ArrayList<T>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never used anywhere. Can we remove?
private transient BytesXMLMessage recentMessage = null; | ||
|
||
private transient List<T> messages = new ArrayList<T>(); | ||
protected final transient Map<Long, T> currentWindowRecoveryState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is marked transient but created in the constructor (not setup). Looks highly suspicious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It holds the idempotent state for the current window before it is saved.
|
||
private transient List<T> messages = new ArrayList<T>(); | ||
protected final transient Map<Long, T> currentWindowRecoveryState; | ||
protected static final transient int DEFAULT_BUFFER_SIZE = 500; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transient not necessary for a static final integer constant. @PramodSSImmaneni ?
super.setReconnectRetries(this.reconnectRetries); | ||
super.setReapplySubscriptions(true); | ||
|
||
super.setup(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally super.setup(..) should be at the top unless there is a logical reason for it to be where it is now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional property reapply subscription is being set
|
||
super.setup(context); | ||
topic = factory.createTopic(topicName); | ||
addSubscription(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move this to activate() (esp if session creation happens in super.activate())
public void beginWindow(long windowId) | ||
{ | ||
currentWindowId = windowId; | ||
super.beginWindow(windowId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to the top unless logic requires it?
|
||
|
||
@NotNull | ||
protected transient EndpointType endpointType = EndpointType.QUEUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
endpointType not used anywhere. Pls remove
Commit should have a single author so that the contributor stats are not messed up. Work can be split into multiple commits if that is necessary. |
453d5f2
to
508b205
Compare
508b205
to
76b372c
Compare
@tweise to your comment, in this case unfortunately individual contributions weren't clearly kept track of and hence the single commit. I don't think contributor stats should be the reason to not allow a collaborative commit, that stats information and graph generated by other tools, that is mainly tracking information and not central to the working or contents of the project, being a little inaccurate should be of concern. These will be very few and far between and will be a small minority and would probably appear at the bottom of most of those stats. |
76b372c
to
08d9d96
Compare
Went through one more round of refactoring and changes. |
Solace input operators to read data from solace messaging system.
This was built as a team effort between Solace and DataTorrent, hence the multiple authors in the commit. The authors have been listed in the order of the size of the contribution.