diff --git a/pom.xml b/pom.xml index 557dd77..1f64570 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 com.zanox.vertx.mods @@ -175,7 +176,6 @@ - io.vertx vertx-maven-plugin ${maven.vertx.plugin.version} diff --git a/src/main/README.txt b/src/main/README.txt deleted file mode 100644 index 0447763..0000000 --- a/src/main/README.txt +++ /dev/null @@ -1,6 +0,0 @@ -Put any Java or Groovy classes used in your module in the java or groovy directories. - -Put any other resources that you want included in your module in the resources directory, this includes any -JavaScript, Ruby, Python, Groovy or CoffeeScript scripts or any other stuff you want in your module. - -The mod.json file also goes in the resources directory so it's copied over too. \ No newline at end of file diff --git a/src/main/assembly/mod.xml b/src/main/assembly/mod.xml index 941934a..9a09401 100644 --- a/src/main/assembly/mod.xml +++ b/src/main/assembly/mod.xml @@ -1,6 +1,6 @@ - mod diff --git a/src/main/java/com/zanox/vertx/mods/KinesisMessageProcessor.java b/src/main/java/com/zanox/vertx/mods/KinesisMessageProcessor.java index 50b0cd3..7248c9a 100644 --- a/src/main/java/com/zanox/vertx/mods/KinesisMessageProcessor.java +++ b/src/main/java/com/zanox/vertx/mods/KinesisMessageProcessor.java @@ -17,7 +17,8 @@ package com.zanox.vertx.mods; import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.*; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.RegionUtils; import com.amazonaws.retry.RetryPolicy; @@ -44,128 +45,123 @@ */ public class KinesisMessageProcessor extends BusModBase implements Handler> { - private AmazonKinesisAsyncClient kinesisAsyncClient; - private String streamName, partitionKey, region; + private AmazonKinesisAsyncClient kinesisAsyncClient; + private String streamName, partitionKey, region; - @Override - public void handle(Message jsonObjectMessage) { - try { - sendMessageToKinesis(jsonObjectMessage); - } + @Override + public void handle(Message jsonObjectMessage) { + try { + sendMessageToKinesis(jsonObjectMessage); + } catch (KinesisException exc) { + logger.error(exc); + } + } - catch (KinesisException exc) { - logger.error(exc); - } - } + @Override + public void start() { + super.start(); - @Override - public void start() { - super.start(); + kinesisAsyncClient = createClient(); - kinesisAsyncClient = createClient(); + // Get the address of EventBus where the message was published + final String address = getMandatoryStringConfig("address"); - // Get the address of EventBus where the message was published - final String address = getMandatoryStringConfig("address"); + vertx.eventBus().registerHandler(address, this); + } - vertx.eventBus().registerHandler(address, this); - } + @Override + public void stop() { + if (kinesisAsyncClient != null) { + kinesisAsyncClient.shutdown(); + } + } - @Override - public void stop() { - if (kinesisAsyncClient != null) { - kinesisAsyncClient.shutdown(); - } - } + private AmazonKinesisAsyncClient createClient() { - private AmazonKinesisAsyncClient createClient() { + // Building Kinesis configuration + int connectionTimeout = getOptionalIntConfig(CONNECTION_TIMEOUT, ClientConfiguration.DEFAULT_CONNECTION_TIMEOUT); + int maxConnection = getOptionalIntConfig(MAX_CONNECTION, ClientConfiguration.DEFAULT_MAX_CONNECTIONS); - // Building Kinesis configuration - int connectionTimeout = getOptionalIntConfig(CONNECTION_TIMEOUT, ClientConfiguration.DEFAULT_CONNECTION_TIMEOUT); - int maxConnection = getOptionalIntConfig(MAX_CONNECTION, ClientConfiguration.DEFAULT_MAX_CONNECTIONS); + // TODO: replace default retry policy + RetryPolicy retryPolicy = ClientConfiguration.DEFAULT_RETRY_POLICY; + int socketTimeout = getOptionalIntConfig(SOCKET_TIMEOUT, ClientConfiguration.DEFAULT_SOCKET_TIMEOUT); + boolean useReaper = getOptionalBooleanConfig(USE_REAPER, ClientConfiguration.DEFAULT_USE_REAPER); + String userAgent = getOptionalStringConfig(USER_AGENT, ClientConfiguration.DEFAULT_USER_AGENT); - // TODO: replace default retry policy - RetryPolicy retryPolicy = ClientConfiguration.DEFAULT_RETRY_POLICY; - int socketTimeout = getOptionalIntConfig(SOCKET_TIMEOUT, ClientConfiguration.DEFAULT_SOCKET_TIMEOUT); - boolean useReaper = getOptionalBooleanConfig(USE_REAPER, ClientConfiguration.DEFAULT_USE_REAPER); - String userAgent = getOptionalStringConfig(USER_AGENT, ClientConfiguration.DEFAULT_USER_AGENT); + streamName = getMandatoryStringConfig(STREAM_NAME); + partitionKey = getMandatoryStringConfig(PARTITION_KEY); + region = getMandatoryStringConfig(REGION); - streamName = getMandatoryStringConfig(STREAM_NAME); - partitionKey = getMandatoryStringConfig(PARTITION_KEY); - region = getMandatoryStringConfig(REGION); + logger.info(" --- Stream name: " + streamName); + logger.info(" --- Partition key: " + partitionKey); + logger.info(" --- Region: " + region); - logger.info(" --- Stream name: " + streamName); - logger.info(" --- Partition key: " + partitionKey); - logger.info(" --- Region: " + region); + ClientConfiguration clientConfiguration = new ClientConfiguration(); + clientConfiguration.setConnectionTimeout(connectionTimeout); + clientConfiguration.setMaxConnections(maxConnection); + clientConfiguration.setRetryPolicy(retryPolicy); + clientConfiguration.setSocketTimeout(socketTimeout); + clientConfiguration.setUseReaper(useReaper); + clientConfiguration.setUserAgent(userAgent); - ClientConfiguration clientConfiguration = new ClientConfiguration(); - clientConfiguration.setConnectionTimeout(connectionTimeout); - clientConfiguration.setMaxConnections(maxConnection); - clientConfiguration.setRetryPolicy(retryPolicy); - clientConfiguration.setSocketTimeout(socketTimeout); - clientConfiguration.setUseReaper(useReaper); - clientConfiguration.setUserAgent(userAgent); - - // Reading credentials from Classpath + // Reading credentials from Classpath // the file is called AwsCredentials.properties - // Properties are: - // - accessKey - // - secretKey - AWSCredentialsProvider awsCredentialsProvider = new ClasspathPropertiesFileCredentialsProvider(); - - // Configuring Kinesis-client with configuration - AmazonKinesisAsyncClient kinesisAsyncClient = new AmazonKinesisAsyncClient(awsCredentialsProvider, clientConfiguration); - Region awsRegion = RegionUtils.getRegion(region); - kinesisAsyncClient.setRegion(awsRegion); - - return kinesisAsyncClient; - } - - protected void sendMessageToKinesis(Message event) throws KinesisException { - if (kinesisAsyncClient == null) { - throw new KinesisException("AmazonKinesisAsyncClient is not initialized"); - } - - if(!isValid(event.body().getString(PAYLOAD))) { - logger.error("Invalid message provided."); - return; - } - - JsonObject object = event.body(); - logger.debug(" --- Got event " + event.toString()); - logger.debug(" --- Got body + " + object.toString()); - - byte [] payload = object.getBinary(PAYLOAD); - - if (payload == null) { - logger.debug(" --- Payload is null, trying to get the payload as String"); - payload = object.getString(PAYLOAD).getBytes(); - } - logger.debug("Binary payload size: " + payload.length); - - PutRecordRequest putRecordRequest = new PutRecordRequest(); - putRecordRequest.setStreamName(streamName); - putRecordRequest.setPartitionKey(partitionKey); - - logger.info("Writing to streamName " + streamName + " using partitionkey " + partitionKey); - - putRecordRequest.setData(ByteBuffer.wrap(payload)); - - Future futureResult = kinesisAsyncClient.putRecordAsync(putRecordRequest); - try - { - PutRecordResult recordResult = futureResult.get(); - logger.info("Sent message to Kinesis: " + recordResult.toString()); - sendOK(event); - } - - catch (InterruptedException | ExecutionException iexc) { - logger.error(iexc); - sendError(event, "Failed sending message to Kinesis", iexc); - } - } - - private boolean isValid(String str) { - return str != null && !str.isEmpty(); - } + // Properties are: + // - accessKey + // - secretKey + AWSCredentialsProvider awsCredentialsProvider = new ClasspathPropertiesFileCredentialsProvider(); + + // Configuring Kinesis-client with configuration + AmazonKinesisAsyncClient kinesisAsyncClient = new AmazonKinesisAsyncClient(awsCredentialsProvider, clientConfiguration); + Region awsRegion = RegionUtils.getRegion(region); + kinesisAsyncClient.setRegion(awsRegion); + + return kinesisAsyncClient; + } + + protected void sendMessageToKinesis(Message event) throws KinesisException { + if (kinesisAsyncClient == null) { + throw new KinesisException("AmazonKinesisAsyncClient is not initialized"); + } + + if (!isValid(event.body().getString(PAYLOAD))) { + logger.error("Invalid message provided."); + return; + } + + JsonObject object = event.body(); + logger.debug(" --- Got event " + event.toString()); + logger.debug(" --- Got body + " + object.toString()); + + byte[] payload = object.getBinary(PAYLOAD); + + if (payload == null) { + logger.debug(" --- Payload is null, trying to get the payload as String"); + payload = object.getString(PAYLOAD).getBytes(); + } + logger.debug("Binary payload size: " + payload.length); + + PutRecordRequest putRecordRequest = new PutRecordRequest(); + putRecordRequest.setStreamName(streamName); + putRecordRequest.setPartitionKey(partitionKey); + + logger.info("Writing to streamName " + streamName + " using partitionkey " + partitionKey); + + putRecordRequest.setData(ByteBuffer.wrap(payload)); + + Future futureResult = kinesisAsyncClient.putRecordAsync(putRecordRequest); + try { + PutRecordResult recordResult = futureResult.get(); + logger.info("Sent message to Kinesis: " + recordResult.toString()); + sendOK(event); + } catch (InterruptedException | ExecutionException iexc) { + logger.error(iexc); + sendError(event, "Failed sending message to Kinesis", iexc); + } + } + + private boolean isValid(String str) { + return str != null && !str.isEmpty(); + } } diff --git a/src/main/java/com/zanox/vertx/mods/exception/KinesisException.java b/src/main/java/com/zanox/vertx/mods/exception/KinesisException.java index 034201d..a5c2679 100644 --- a/src/main/java/com/zanox/vertx/mods/exception/KinesisException.java +++ b/src/main/java/com/zanox/vertx/mods/exception/KinesisException.java @@ -18,23 +18,23 @@ public class KinesisException extends Exception { - public KinesisException() { - super(); //To change body of overridden methods use File | Settings | File Templates. - } + public KinesisException() { + super(); //To change body of overridden methods use File | Settings | File Templates. + } - public KinesisException(String message) { - super(message); //To change body of overridden methods use File | Settings | File Templates. - } + public KinesisException(String message) { + super(message); //To change body of overridden methods use File | Settings | File Templates. + } - public KinesisException(String message, Throwable cause) { - super(message, cause); //To change body of overridden methods use File | Settings | File Templates. - } + public KinesisException(String message, Throwable cause) { + super(message, cause); //To change body of overridden methods use File | Settings | File Templates. + } - public KinesisException(Throwable cause) { - super(cause); //To change body of overridden methods use File | Settings | File Templates. - } + public KinesisException(Throwable cause) { + super(cause); //To change body of overridden methods use File | Settings | File Templates. + } - protected KinesisException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); //To change body of overridden methods use File | Settings | File Templates. - } + protected KinesisException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); //To change body of overridden methods use File | Settings | File Templates. + } } diff --git a/src/main/java/com/zanox/vertx/mods/internal/EventProperties.java b/src/main/java/com/zanox/vertx/mods/internal/EventProperties.java index d009b9b..b2cd8ac 100644 --- a/src/main/java/com/zanox/vertx/mods/internal/EventProperties.java +++ b/src/main/java/com/zanox/vertx/mods/internal/EventProperties.java @@ -18,10 +18,10 @@ public class EventProperties { - /* Non-instantiable class */ - private EventProperties() {} - - public static final String HEADER = "header"; - public static final String PAYLOAD = "payload"; + public static final String HEADER = "header"; + public static final String PAYLOAD = "payload"; + /* Non-instantiable class */ + private EventProperties() { + } } diff --git a/src/main/java/com/zanox/vertx/mods/internal/KinesisProperties.java b/src/main/java/com/zanox/vertx/mods/internal/KinesisProperties.java index 6dfffd8..61a8f05 100644 --- a/src/main/java/com/zanox/vertx/mods/internal/KinesisProperties.java +++ b/src/main/java/com/zanox/vertx/mods/internal/KinesisProperties.java @@ -18,17 +18,17 @@ public class KinesisProperties { - private KinesisProperties() {} - - public static final String CONNECTION_TIMEOUT = "connectionTimeout"; - public static final String MAX_CONNECTION = "maxConnection"; - public static final String RETRY_POLICY = "retryPolicy"; - public static final String SOCKET_TIMEOUT = "socketTimeout"; - public static final String USE_REAPER = "useReaper"; - public static final String USER_AGENT = "userAgent"; - public static final String STREAM_NAME = "streamName"; - public static final String PARTITION_KEY = "partitionKey"; + public static final String CONNECTION_TIMEOUT = "connectionTimeout"; + public static final String MAX_CONNECTION = "maxConnection"; + public static final String RETRY_POLICY = "retryPolicy"; + public static final String SOCKET_TIMEOUT = "socketTimeout"; + public static final String USE_REAPER = "useReaper"; + public static final String USER_AGENT = "userAgent"; + public static final String STREAM_NAME = "streamName"; + public static final String PARTITION_KEY = "partitionKey"; public static final String REGION = "region"; public static final String AWS_ACCESS_KEY = "awsAccessKey"; public static final String AWS_SECRET_KEY = "awsSecretKey"; + private KinesisProperties() { + } } diff --git a/src/test/java/com/zanox/vertx/mods/ByteArraySerializerIT.java b/src/test/java/com/zanox/vertx/mods/ByteArraySerializerIT.java index e370b9a..5ececa4 100644 --- a/src/test/java/com/zanox/vertx/mods/ByteArraySerializerIT.java +++ b/src/test/java/com/zanox/vertx/mods/ByteArraySerializerIT.java @@ -1,16 +1,12 @@ package com.zanox.vertx.mods; -import com.zanox.vertx.mods.internal.EventProperties; -import org.junit.Test; import org.vertx.java.core.AsyncResult; import org.vertx.java.core.AsyncResultHandler; -import org.vertx.java.core.Handler; -import org.vertx.java.core.eventbus.Message; import org.vertx.java.core.json.JsonObject; import org.vertx.testtools.TestVerticle; -import static org.junit.Assert.*; -import static org.vertx.testtools.VertxAssert.testComplete; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ByteArraySerializerIT extends TestVerticle { diff --git a/src/test/java/com/zanox/vertx/mods/KinesisEventProcessorTest.java b/src/test/java/com/zanox/vertx/mods/KinesisEventProcessorTest.java index f7e069c..2bcdea0 100644 --- a/src/test/java/com/zanox/vertx/mods/KinesisEventProcessorTest.java +++ b/src/test/java/com/zanox/vertx/mods/KinesisEventProcessorTest.java @@ -25,7 +25,6 @@ import org.vertx.java.core.json.JsonObject; import org.vertx.java.core.logging.Logger; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) @@ -41,7 +40,7 @@ public class KinesisEventProcessorTest { private KinesisMessageProcessor kinesisMessageProcessor; @Test - public void sendMessageToKinesis() throws Exception{ + public void sendMessageToKinesis() throws Exception { KinesisMessageProcessor kinesisMessageProcessorSpy = spy(kinesisMessageProcessor); JsonObject jsonObjectMock = mock(JsonObject.class);