Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java pub-sub Implementation #135

Open
wants to merge 23 commits into
base: single-machine
Choose a base branch
from

Conversation

rudy2steiner
Copy link

@rudy2steiner rudy2steiner commented Jan 27, 2019

What changes were proposed in this pull request?

  1. provide a producer and consumer implement for confluo as a pub/sub system in java
  2. also include a simple produce/consume benchmark test

How was this patch tested?

TestPubSub class provide unit tests

related issue #123 ,#125

@AmplabJenkins
Copy link
Collaborator

Can one of the admins verify this patch?

@anuragkh
Copy link
Contributor

Jenkins test this please.

@AmplabJenkins
Copy link
Collaborator

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/confluo-prb/360/
Test FAILed.

@rudy2steiner
Copy link
Author

Jenkins test this please.

1 similar comment
@anuragkh
Copy link
Contributor

Jenkins test this please.

@anuragkh anuragkh changed the title Pub sub system benchmark Java pub-sub Implementation Jan 28, 2019
@AmplabJenkins
Copy link
Collaborator

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/confluo-prb/361/
Test PASSed.

<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- not append assembly id in release file name -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you switched from jar-with-dependencies to separate assembly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i used to package all the dependency into lib,but it's ok to use jar-with-dependencies

@@ -0,0 +1,42 @@
#!/bin/sh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be under javaclient/sbin instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i decided to remove the shell

@@ -0,0 +1,42 @@
#!/bin/sh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

* @return filtered properties by domain
**/
public static Properties parse(String domain, String file) throws IOException{
URL url= PropertiesParser.class.getClassLoader().getResource(file);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code formatting seems to be off (e.g., 2 spaces indentation, spaces before =, etc.). We use Google style for code formatting, please see here: https://github.com/google/google-java-format

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have reformatted my code by google-java-format

* @return multilog id
* @throws TException Cannot get the atomic multilog
*/
public long getAtomicMultilog(String name) throws TException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps rename this to getAtomicMultiLogId()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

public class PropertiesParser {

/**
* parse unique domain properties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use sentence case for documentation, i.e., capitalize first letter of each sentence. Please use this consistently across all documentation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@anuragkh anuragkh requested a review from ujvl January 28, 2019 19:37

/**
*
* A builder for a batch of message,which different from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand the need for a separate MessageBatchBuilder?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a test case(testBatchWriteReadInMemory) in TestRpcClient was added,which try to batch 3 records into MessageBatchBuilder,but can't read all the record from confluod, which may cause by MessageBatchBuilder.so i try to don't care the record timestamp,batch record seperately in MessageBatchBuilder
you may check this?

@@ -237,14 +261,9 @@ public void removeTrigger(String triggerName) throws TException {
client.removeTrigger(curMultilogId, triggerName);
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recovery

/**
*
**/
public void start() throws TException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in the constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

* stop and close client
*
**/
public void stop() throws TException{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cleaner approach would be to extend Closeable, and implement this as close().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@rudy2steiner
Copy link
Author

thanks for your code review, i'll try to optimize my implementation following your advices in next few days @anuragkh

@anuragkh
Copy link
Contributor

Thanks for submitting the PR! Look forward to your changes.

@rudy2steiner
Copy link
Author

@anuragkh you may review this PR again

@@ -27,6 +26,7 @@
RecordBatchBuilder(Schema schema) {
this.numRecords = 0;
this.schema = schema;
this.batch=new TreeMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting issue

public MessageBatchBuilder(Schema schema) {
this.numRecords = 0;
this.schema = schema;
this.batch= new rpc_record_batch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting issue

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for missing formate this class

public List<Record> readBatch(long offset,int batchSize) throws TException {
ByteBuffer batchBuffer=readBatchRaw(offset,batchSize);
List<Record> batchResult=new ArrayList<>(batchSize);
int remaining=batchBuffer.remaining();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting issues in multiple locations in this file.

* @param rec The record.
* @return The packed ByteBuffer.
*/
public ByteBuffer pack(boolean direct,String... rec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting issue


public ConfluoConsumer(Properties properties) throws TException {

host = properties.getProperty(ConsumerConfig.BOOTSTRAP_ADDRESS, ConsumerConfig.BOOTSTRAP_ADDRESS_DEFAULT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments on ProducerConfig

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return curSchema;
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary newlines

@@ -0,0 +1,10 @@

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to move this readme to the confluo/streaming/ directory, since it does not apply to the entire confluo/ directory. Also, it would make the readme more readable with the following changes:
(1) There should be a heading; maybe Confluo Pub-sub?
(2) Use sentence case instead of using lower-case for the entire document.

*/
public static Properties parse(String domain, String file) throws IOException {
URL url = PropertiesParser.class.getClassLoader().getResource(file);
Properties pro = new Properties();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pro -> properties

batched++;
if (batched >= produceBatchSize) {
long offset = client.appendBatch(batchBuilder.getBatch());
// logger.info("write offset:"+offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused comment

@anuragkh
Copy link
Contributor

Most of the changes look great! I left some minor comments on formatting, it will make the code much more readable if we can get those in.

batchSize = records.size();
messageListener.accept(records);
} else {
List<Record> messages=new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we init messages with client.read(offset) (like in prefetch case)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i don't understand how to do

}
long actualDurationMs = System.currentTimeMillis() - start;
long qps = totalConsumedMsgNum * 1000 / actualDurationMs;
logger.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this info or error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be info ,thanks for you comment

"%d consumer finished,total msg:%d, elapsed:%d ms, qps:%d/s",
consumeNum, totalConsumedMsgNum, actualDurationMs, qps));
} catch (InterruptedException e) {
logger.info("interrupted", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this error logger.error ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean to log the exception in INFO level

"%d producer finished,total msg:%d, elapsed:%d ms, qps:%d/s",
producerNum, totalProducedMsgNum, actualDurationMs, qps));
} catch (InterruptedException e) {
logger.info("interrupted", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@rudy2steiner
Copy link
Author

add ConfluoProducer/Cosumer pub-sub test,hope my changes make my PR more readable

@anuragkh
Copy link
Contributor

anuragkh commented Mar 7, 2019

Hi @rudy2steiner, sorry I was traveling for the past few days. I think this patch still needs better tests -- please see the relevant comments on the test.

@rudy2steiner
Copy link
Author

removed non-unit test

@anuragkh
Copy link
Contributor

Hi @rudy2steiner, thanks for that change. What I meant was we need to add tests that assert the correctness of the implementation. Currently the existing tests do not assert anything. We should update the tests to make sure we are checking for correctness using asserts.

@rudy2steiner
Copy link
Author

Have you notice the major change in the class TestPubSub in the commit: 7f9e0cb;
And the newest TestPubSub.java contains the testConfluoPubSub function, in that i have 4 steps to assert the correctness of the implements(produce/consume) as following:

  • record the current offset,named mq_offset_mark, in target topic,eg mq_001
  • produce some of messages(pMeesages) to mq_001,
  • starting consume from the old offset mq_offset_mark
  • assert consume messages(cMessages) with pMessages in order

https://github.com/ucbrise/confluo/blob/7f9e0cbd41f625a9b37e6b0b7df035ccc74627f9/javaclient/src/test/java/TestPubSub.java

@AmplabJenkins
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/confluo-prb/387/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants