-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Java pub-sub Implementation #135
base: single-machine
Are you sure you want to change the base?
Conversation
Can one of the admins verify this patch? |
Jenkins test this please. |
Test FAILed. |
Jenkins test this please. |
1 similar comment
Jenkins test this please. |
Test PASSed. |
javaclient/pom.xml
Outdated
<descriptorRefs> | ||
<descriptorRef>jar-with-dependencies</descriptorRef> | ||
</descriptorRefs> | ||
<!-- not append assembly id in release file name --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why you switched from jar-with-dependencies to separate assembly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i used to package all the dependency into lib,but it's ok to use jar-with-dependencies
@@ -0,0 +1,42 @@ | |||
#!/bin/sh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be under javaclient/sbin
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i decided to remove the shell
@@ -0,0 +1,42 @@ | |||
#!/bin/sh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
* @return filtered properties by domain | ||
**/ | ||
public static Properties parse(String domain, String file) throws IOException{ | ||
URL url= PropertiesParser.class.getClassLoader().getResource(file); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code formatting seems to be off (e.g., 2 spaces indentation, spaces before =
, etc.). We use Google style for code formatting, please see here: https://github.com/google/google-java-format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have reformatted my code by google-java-format
* @return multilog id | ||
* @throws TException Cannot get the atomic multilog | ||
*/ | ||
public long getAtomicMultilog(String name) throws TException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps rename this to getAtomicMultiLogId()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
public class PropertiesParser { | ||
|
||
/** | ||
* parse unique domain properties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use sentence case for documentation, i.e., capitalize first letter of each sentence. Please use this consistently across all documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
/** | ||
* | ||
* A builder for a batch of message,which different from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand the need for a separate MessageBatchBuilder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a test case(testBatchWriteReadInMemory) in TestRpcClient was added,which try to batch 3 records into MessageBatchBuilder,but can't read all the record from confluod, which may cause by MessageBatchBuilder.so i try to don't care the record timestamp,batch record seperately in MessageBatchBuilder
you may check this?
@@ -237,14 +261,9 @@ public void removeTrigger(String triggerName) throws TException { | |||
client.removeTrigger(curMultilogId, triggerName); | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recovery
/** | ||
* | ||
**/ | ||
public void start() throws TException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
* stop and close client | ||
* | ||
**/ | ||
public void stop() throws TException{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A cleaner approach would be to extend Closeable, and implement this as close().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
thanks for your code review, i'll try to optimize my implementation following your advices in next few days @anuragkh |
Thanks for submitting the PR! Look forward to your changes. |
@anuragkh you may review this PR again |
@@ -27,6 +26,7 @@ | |||
RecordBatchBuilder(Schema schema) { | |||
this.numRecords = 0; | |||
this.schema = schema; | |||
this.batch=new TreeMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting issue
public MessageBatchBuilder(Schema schema) { | ||
this.numRecords = 0; | ||
this.schema = schema; | ||
this.batch= new rpc_record_batch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for missing formate this class
public List<Record> readBatch(long offset,int batchSize) throws TException { | ||
ByteBuffer batchBuffer=readBatchRaw(offset,batchSize); | ||
List<Record> batchResult=new ArrayList<>(batchSize); | ||
int remaining=batchBuffer.remaining(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting issues in multiple locations in this file.
* @param rec The record. | ||
* @return The packed ByteBuffer. | ||
*/ | ||
public ByteBuffer pack(boolean direct,String... rec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting issue
|
||
public ConfluoConsumer(Properties properties) throws TException { | ||
|
||
host = properties.getProperty(ConsumerConfig.BOOTSTRAP_ADDRESS, ConsumerConfig.BOOTSTRAP_ADDRESS_DEFAULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments on ProducerConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
return curSchema; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary newlines
@@ -0,0 +1,10 @@ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to move this readme to the confluo/streaming/
directory, since it does not apply to the entire confluo/
directory. Also, it would make the readme more readable with the following changes:
(1) There should be a heading; maybe Confluo Pub-sub
?
(2) Use sentence case instead of using lower-case for the entire document.
*/ | ||
public static Properties parse(String domain, String file) throws IOException { | ||
URL url = PropertiesParser.class.getClassLoader().getResource(file); | ||
Properties pro = new Properties(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pro
-> properties
batched++; | ||
if (batched >= produceBatchSize) { | ||
long offset = client.appendBatch(batchBuilder.getBatch()); | ||
// logger.info("write offset:"+offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused comment
Most of the changes look great! I left some minor comments on formatting, it will make the code much more readable if we can get those in. |
batchSize = records.size(); | ||
messageListener.accept(records); | ||
} else { | ||
List<Record> messages=new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we init messages with client.read(offset)
(like in prefetch case)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, i don't understand how to do
} | ||
long actualDurationMs = System.currentTimeMillis() - start; | ||
long qps = totalConsumedMsgNum * 1000 / actualDurationMs; | ||
logger.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this info or error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be info ,thanks for you comment
"%d consumer finished,total msg:%d, elapsed:%d ms, qps:%d/s", | ||
consumeNum, totalConsumedMsgNum, actualDurationMs, qps)); | ||
} catch (InterruptedException e) { | ||
logger.info("interrupted", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this error logger.error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean to log the exception in INFO level
"%d producer finished,total msg:%d, elapsed:%d ms, qps:%d/s", | ||
producerNum, totalProducedMsgNum, actualDurationMs, qps)); | ||
} catch (InterruptedException e) { | ||
logger.info("interrupted", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
add ConfluoProducer/Cosumer pub-sub test,hope my changes make my PR more readable |
Hi @rudy2steiner, sorry I was traveling for the past few days. I think this patch still needs better tests -- please see the relevant comments on the test. |
removed non-unit test |
Hi @rudy2steiner, thanks for that change. What I meant was we need to add tests that assert the correctness of the implementation. Currently the existing tests do not assert anything. We should update the tests to make sure we are checking for correctness using asserts. |
Have you notice the major change in the class TestPubSub in the commit: 7f9e0cb;
|
Refer to this link for build results (access rights to CI server needed): |
What changes were proposed in this pull request?
How was this patch tested?
TestPubSub class provide unit tests
related issue #123 ,#125