Skip to content

Commit

Permalink
Code formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
SaschaMoellering committed Aug 28, 2014
1 parent a448fc4 commit 9878e4a
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 158 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.zanox.vertx.mods</groupId>
Expand Down Expand Up @@ -175,7 +176,6 @@
<plugin>



<groupId>io.vertx</groupId>
<artifactId>vertx-maven-plugin</artifactId>
<version>${maven.vertx.plugin.version}</version>
Expand Down
6 changes: 0 additions & 6 deletions src/main/README.txt

This file was deleted.

4 changes: 2 additions & 2 deletions src/main/assembly/mod.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">

<id>mod</id>
Expand Down
216 changes: 106 additions & 110 deletions src/main/java/com/zanox/vertx/mods/KinesisMessageProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.zanox.vertx.mods;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.*;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.retry.RetryPolicy;
Expand All @@ -44,128 +45,123 @@
*/
public class KinesisMessageProcessor extends BusModBase implements Handler<Message<JsonObject>> {

private AmazonKinesisAsyncClient kinesisAsyncClient;
private String streamName, partitionKey, region;
private AmazonKinesisAsyncClient kinesisAsyncClient;
private String streamName, partitionKey, region;

@Override
public void handle(Message<JsonObject> jsonObjectMessage) {
try {
sendMessageToKinesis(jsonObjectMessage);
}
@Override
public void handle(Message<JsonObject> jsonObjectMessage) {
try {
sendMessageToKinesis(jsonObjectMessage);
} catch (KinesisException exc) {
logger.error(exc);
}
}

catch (KinesisException exc) {
logger.error(exc);
}
}
@Override
public void start() {
super.start();

@Override
public void start() {
super.start();
kinesisAsyncClient = createClient();

kinesisAsyncClient = createClient();
// Get the address of EventBus where the message was published
final String address = getMandatoryStringConfig("address");

// Get the address of EventBus where the message was published
final String address = getMandatoryStringConfig("address");
vertx.eventBus().registerHandler(address, this);
}

vertx.eventBus().registerHandler(address, this);
}
@Override
public void stop() {
if (kinesisAsyncClient != null) {
kinesisAsyncClient.shutdown();
}
}

@Override
public void stop() {
if (kinesisAsyncClient != null) {
kinesisAsyncClient.shutdown();
}
}
private AmazonKinesisAsyncClient createClient() {

private AmazonKinesisAsyncClient createClient() {
// Building Kinesis configuration
int connectionTimeout = getOptionalIntConfig(CONNECTION_TIMEOUT, ClientConfiguration.DEFAULT_CONNECTION_TIMEOUT);
int maxConnection = getOptionalIntConfig(MAX_CONNECTION, ClientConfiguration.DEFAULT_MAX_CONNECTIONS);

// Building Kinesis configuration
int connectionTimeout = getOptionalIntConfig(CONNECTION_TIMEOUT, ClientConfiguration.DEFAULT_CONNECTION_TIMEOUT);
int maxConnection = getOptionalIntConfig(MAX_CONNECTION, ClientConfiguration.DEFAULT_MAX_CONNECTIONS);
// TODO: replace default retry policy
RetryPolicy retryPolicy = ClientConfiguration.DEFAULT_RETRY_POLICY;
int socketTimeout = getOptionalIntConfig(SOCKET_TIMEOUT, ClientConfiguration.DEFAULT_SOCKET_TIMEOUT);
boolean useReaper = getOptionalBooleanConfig(USE_REAPER, ClientConfiguration.DEFAULT_USE_REAPER);
String userAgent = getOptionalStringConfig(USER_AGENT, ClientConfiguration.DEFAULT_USER_AGENT);

// TODO: replace default retry policy
RetryPolicy retryPolicy = ClientConfiguration.DEFAULT_RETRY_POLICY;
int socketTimeout = getOptionalIntConfig(SOCKET_TIMEOUT, ClientConfiguration.DEFAULT_SOCKET_TIMEOUT);
boolean useReaper = getOptionalBooleanConfig(USE_REAPER, ClientConfiguration.DEFAULT_USE_REAPER);
String userAgent = getOptionalStringConfig(USER_AGENT, ClientConfiguration.DEFAULT_USER_AGENT);

streamName = getMandatoryStringConfig(STREAM_NAME);
partitionKey = getMandatoryStringConfig(PARTITION_KEY);
region = getMandatoryStringConfig(REGION);

streamName = getMandatoryStringConfig(STREAM_NAME);
partitionKey = getMandatoryStringConfig(PARTITION_KEY);
region = getMandatoryStringConfig(REGION);
logger.info(" --- Stream name: " + streamName);
logger.info(" --- Partition key: " + partitionKey);
logger.info(" --- Region: " + region);

logger.info(" --- Stream name: " + streamName);
logger.info(" --- Partition key: " + partitionKey);
logger.info(" --- Region: " + region);
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setConnectionTimeout(connectionTimeout);
clientConfiguration.setMaxConnections(maxConnection);
clientConfiguration.setRetryPolicy(retryPolicy);
clientConfiguration.setSocketTimeout(socketTimeout);
clientConfiguration.setUseReaper(useReaper);
clientConfiguration.setUserAgent(userAgent);

ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setConnectionTimeout(connectionTimeout);
clientConfiguration.setMaxConnections(maxConnection);
clientConfiguration.setRetryPolicy(retryPolicy);
clientConfiguration.setSocketTimeout(socketTimeout);
clientConfiguration.setUseReaper(useReaper);
clientConfiguration.setUserAgent(userAgent);

// Reading credentials from Classpath
// Reading credentials from Classpath
// the file is called AwsCredentials.properties
// Properties are:
// - accessKey
// - secretKey
AWSCredentialsProvider awsCredentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

// Configuring Kinesis-client with configuration
AmazonKinesisAsyncClient kinesisAsyncClient = new AmazonKinesisAsyncClient(awsCredentialsProvider, clientConfiguration);
Region awsRegion = RegionUtils.getRegion(region);
kinesisAsyncClient.setRegion(awsRegion);

return kinesisAsyncClient;
}

protected void sendMessageToKinesis(Message<JsonObject> event) throws KinesisException {
if (kinesisAsyncClient == null) {
throw new KinesisException("AmazonKinesisAsyncClient is not initialized");
}

if(!isValid(event.body().getString(PAYLOAD))) {
logger.error("Invalid message provided.");
return;
}

JsonObject object = event.body();
logger.debug(" --- Got event " + event.toString());
logger.debug(" --- Got body + " + object.toString());

byte [] payload = object.getBinary(PAYLOAD);

if (payload == null) {
logger.debug(" --- Payload is null, trying to get the payload as String");
payload = object.getString(PAYLOAD).getBytes();
}
logger.debug("Binary payload size: " + payload.length);

PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(streamName);
putRecordRequest.setPartitionKey(partitionKey);

logger.info("Writing to streamName " + streamName + " using partitionkey " + partitionKey);

putRecordRequest.setData(ByteBuffer.wrap(payload));

Future<PutRecordResult> futureResult = kinesisAsyncClient.putRecordAsync(putRecordRequest);
try
{
PutRecordResult recordResult = futureResult.get();
logger.info("Sent message to Kinesis: " + recordResult.toString());
sendOK(event);
}

catch (InterruptedException | ExecutionException iexc) {
logger.error(iexc);
sendError(event, "Failed sending message to Kinesis", iexc);
}
}

private boolean isValid(String str) {
return str != null && !str.isEmpty();
}
// Properties are:
// - accessKey
// - secretKey
AWSCredentialsProvider awsCredentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

// Configuring Kinesis-client with configuration
AmazonKinesisAsyncClient kinesisAsyncClient = new AmazonKinesisAsyncClient(awsCredentialsProvider, clientConfiguration);
Region awsRegion = RegionUtils.getRegion(region);
kinesisAsyncClient.setRegion(awsRegion);

return kinesisAsyncClient;
}

protected void sendMessageToKinesis(Message<JsonObject> event) throws KinesisException {
if (kinesisAsyncClient == null) {
throw new KinesisException("AmazonKinesisAsyncClient is not initialized");
}

if (!isValid(event.body().getString(PAYLOAD))) {
logger.error("Invalid message provided.");
return;
}

JsonObject object = event.body();
logger.debug(" --- Got event " + event.toString());
logger.debug(" --- Got body + " + object.toString());

byte[] payload = object.getBinary(PAYLOAD);

if (payload == null) {
logger.debug(" --- Payload is null, trying to get the payload as String");
payload = object.getString(PAYLOAD).getBytes();
}
logger.debug("Binary payload size: " + payload.length);

PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(streamName);
putRecordRequest.setPartitionKey(partitionKey);

logger.info("Writing to streamName " + streamName + " using partitionkey " + partitionKey);

putRecordRequest.setData(ByteBuffer.wrap(payload));

Future<PutRecordResult> futureResult = kinesisAsyncClient.putRecordAsync(putRecordRequest);
try {
PutRecordResult recordResult = futureResult.get();
logger.info("Sent message to Kinesis: " + recordResult.toString());
sendOK(event);
} catch (InterruptedException | ExecutionException iexc) {
logger.error(iexc);
sendError(event, "Failed sending message to Kinesis", iexc);
}
}

private boolean isValid(String str) {
return str != null && !str.isEmpty();
}
}
30 changes: 15 additions & 15 deletions src/main/java/com/zanox/vertx/mods/exception/KinesisException.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@

public class KinesisException extends Exception {

public KinesisException() {
super(); //To change body of overridden methods use File | Settings | File Templates.
}
public KinesisException() {
super(); //To change body of overridden methods use File | Settings | File Templates.
}

public KinesisException(String message) {
super(message); //To change body of overridden methods use File | Settings | File Templates.
}
public KinesisException(String message) {
super(message); //To change body of overridden methods use File | Settings | File Templates.
}

public KinesisException(String message, Throwable cause) {
super(message, cause); //To change body of overridden methods use File | Settings | File Templates.
}
public KinesisException(String message, Throwable cause) {
super(message, cause); //To change body of overridden methods use File | Settings | File Templates.
}

public KinesisException(Throwable cause) {
super(cause); //To change body of overridden methods use File | Settings | File Templates.
}
public KinesisException(Throwable cause) {
super(cause); //To change body of overridden methods use File | Settings | File Templates.
}

protected KinesisException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace); //To change body of overridden methods use File | Settings | File Templates.
}
protected KinesisException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace); //To change body of overridden methods use File | Settings | File Templates.
}
}
10 changes: 5 additions & 5 deletions src/main/java/com/zanox/vertx/mods/internal/EventProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

public class EventProperties {

/* Non-instantiable class */
private EventProperties() {}

public static final String HEADER = "header";
public static final String PAYLOAD = "payload";
public static final String HEADER = "header";
public static final String PAYLOAD = "payload";
/* Non-instantiable class */
private EventProperties() {
}

}
20 changes: 10 additions & 10 deletions src/main/java/com/zanox/vertx/mods/internal/KinesisProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@

public class KinesisProperties {

private KinesisProperties() {}

public static final String CONNECTION_TIMEOUT = "connectionTimeout";
public static final String MAX_CONNECTION = "maxConnection";
public static final String RETRY_POLICY = "retryPolicy";
public static final String SOCKET_TIMEOUT = "socketTimeout";
public static final String USE_REAPER = "useReaper";
public static final String USER_AGENT = "userAgent";
public static final String STREAM_NAME = "streamName";
public static final String PARTITION_KEY = "partitionKey";
public static final String CONNECTION_TIMEOUT = "connectionTimeout";
public static final String MAX_CONNECTION = "maxConnection";
public static final String RETRY_POLICY = "retryPolicy";
public static final String SOCKET_TIMEOUT = "socketTimeout";
public static final String USE_REAPER = "useReaper";
public static final String USER_AGENT = "userAgent";
public static final String STREAM_NAME = "streamName";
public static final String PARTITION_KEY = "partitionKey";
public static final String REGION = "region";
public static final String AWS_ACCESS_KEY = "awsAccessKey";
public static final String AWS_SECRET_KEY = "awsSecretKey";
private KinesisProperties() {
}
}
8 changes: 2 additions & 6 deletions src/test/java/com/zanox/vertx/mods/ByteArraySerializerIT.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package com.zanox.vertx.mods;

import com.zanox.vertx.mods.internal.EventProperties;
import org.junit.Test;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonObject;
import org.vertx.testtools.TestVerticle;

import static org.junit.Assert.*;
import static org.vertx.testtools.VertxAssert.testComplete;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class ByteArraySerializerIT extends TestVerticle {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -41,7 +40,7 @@ public class KinesisEventProcessorTest {
private KinesisMessageProcessor kinesisMessageProcessor;

@Test
public void sendMessageToKinesis() throws Exception{
public void sendMessageToKinesis() throws Exception {
KinesisMessageProcessor kinesisMessageProcessorSpy = spy(kinesisMessageProcessor);

JsonObject jsonObjectMock = mock(JsonObject.class);
Expand Down

0 comments on commit 9878e4a

Please sign in to comment.