Skip to content

Commit

Permalink
* Added additional method with slackChannelName to name the sink.
Browse files Browse the repository at this point in the history
* Added retry http handler to handle some of the error causes.
* Variable names are updated as per review comments.
* Addressed other review comments.

Signed-off-by: Rajasekhar Kanugula <[email protected]>
  • Loading branch information
Kanugula committed Mar 30, 2020
1 parent 4687154 commit c545228
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ A Hazelcast Jet connector for consuming data from Twitter stream sources in Jet
Tests to check compatibility of the XA support in your JMS broker or
JDBC database with Jet's fault tolerance.

### [Slack Connector](slack)
### [Slack Connector](slack)

A Hazelcast Jet slack connector to post processed messages to slack channels using slack sink in the Jet pipelines.

Expand Down
120 changes: 100 additions & 20 deletions slack/src/main/java/com/hazelcast/jet/contrib/slack/SlackSinks.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@
package com.hazelcast.jet.contrib.slack;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -39,28 +49,82 @@ public final class SlackSinks {

private static final String URL = "https://slack.com/api/chat.postMessage";

private static final int RETRY_COUNT = 5;

private SlackSinks() {
}

/**
* Creates a sink to send messages to the slack channel>
* Creates a sink to send messages to requested channel id.
* Here the sink will be created with the default name as slack-sink.
*
* @param accessToken String Bearer token to authenticate the slack web api requests
* @param slackChannelId String Unique channel id to send messages to the slack channel.
* @return
*/
public static Sink sink(String accessToken, String slackChannelId) {
return sink("slack-sink", accessToken, slackChannelId);
}

/**
* Creates a slack sink to send messages to the slack channel with the given slack channel name.
* Internally apache {@link org.apache.http.client.HttpClient} is used with
* custom {@link HttpRequestRetryHandler} to post the messages to slack channels.
* Following error causes {@link ConnectTimeoutException}, {@link HttpHostConnectException}, {@link IOException}
* are handled through custom retry handler.The max retry count has been configured to 5.
* Other error causes will lead to the breakage of the jet pipeline.
*
* @param accessToken String Bearer token to authenticate the slack web api requests
* @param channel String Unique channel id to send messages to the slack channel.
* @param slackChannelId String Unique channel id to send messages to the slack channel.
* @param slackChannelName String channel name to be used as sink name
* @return
*/
public static Sink sink(String accessToken, String channel) {
return SinkBuilder.sinkBuilder("slack-sink",
ctx -> new SlackContext(() -> HttpClients.createDefault(), accessToken, channel))
public static Sink sink(String accessToken, String slackChannelId, String slackChannelName) {
return SinkBuilder.sinkBuilder(slackChannelName,
ctx -> new SlackContext(() -> HttpClients.custom().setRetryHandler(httpRetryHandler())
.build(), accessToken, slackChannelId))
.<String>receiveFn((ctx, item) -> ctx.receiveFn(item)).destroyFn(ctx -> ctx.destroy()).build();
}

private static HttpRequestRetryHandler httpRetryHandler() {
return (exception, executionCount, context) -> {
if (executionCount >= RETRY_COUNT) {
// Do not retry if over max retry count
return false;
}
if (exception instanceof UnknownHostException) {
// Unknown host
return false;
}
if (exception instanceof ConnectTimeoutException) {
// Connection refused
return true;
}
if (exception instanceof HttpHostConnectException) {
// connection exception
return true;
}
if (exception instanceof IOException) {
// Interrupted Io exceptions
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
boolean idempotent = !(request instanceof HttpEntityEnclosingRequest);
if (idempotent) {
// Retry if the request is considered idempotent
return true;
}
return false;
};
}

private static final class SlackContext {

private final SupplierEx<CloseableHttpClient> httpClientSupplierEx;
private final CloseableHttpClient closeableHttpClient;
private final String accessToken;
private final String channel;
private final SupplierEx<CloseableHttpClient> httpClientSupplierEx;
private final CloseableHttpClient closeableHttpClient;
private final String accessToken;
private final String channel;

private SlackContext(SupplierEx<CloseableHttpClient> bulkRequestSupplier, String accessToken, String channel) {
this.httpClientSupplierEx = bulkRequestSupplier;
Expand All @@ -69,17 +133,33 @@ private SlackContext(SupplierEx<CloseableHttpClient> bulkRequestSupplier, String
this.channel = channel;
}

private String receiveFn(String message) throws IOException {
HttpPost request = new HttpPost(URL);
// add request headers
request.addHeader("Authorization", String.format("Bearer %s", accessToken));
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("channel", channel));
urlParameters.add(new BasicNameValuePair("text", message));
request.setEntity(new UrlEncodedFormEntity(urlParameters));
CloseableHttpResponse response = (CloseableHttpResponse) closeableHttpClient.execute(request);
String result = EntityUtils.toString(response.getEntity());
request.releaseConnection();
private String receiveFn(String message) {
HttpPost request = new HttpPost(URL);
CloseableHttpResponse response = null;
String result = "";
try {
// add request headers
request.addHeader("Authorization", String.format("Bearer %s", accessToken));
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("channel", channel));
urlParameters.add(new BasicNameValuePair("text", message));
request.setEntity(new UrlEncodedFormEntity(urlParameters));
response = (CloseableHttpResponse) closeableHttpClient.execute(request);
result = EntityUtils.toString(response.getEntity());
} catch (UnsupportedEncodingException var) {
throw ExceptionUtil.rethrow(var);
} catch (ClientProtocolException var) {
throw ExceptionUtil.rethrow(var);
} catch (IOException var) {
throw ExceptionUtil.rethrow(var);
} finally {
try {
request.releaseConnection();
response.close();
} catch (Exception var) {
throw ExceptionUtil.rethrow(var);
}
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,30 @@
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sources;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -59,4 +80,76 @@ public void testSlackSink() {
.writeTo(SlackSinks.sink(System.getenv("ACCESS_TOKEN"), System.getenv("CHANNEL_ID")));
jet.newJob(p).join();
}

@Test
public void retryTests() {

HttpRequestRetryHandler myRetryHandler = new HttpRequestRetryHandler() {
@Override
public boolean retryRequest(
IOException exception,
int executionCount,
HttpContext context) {
if (executionCount >= 5) {
// Do not retry if over max retry count
return false;
}
if (exception instanceof InterruptedIOException) {
// Timeout
return false;
}
if (exception instanceof UnknownHostException) {
// Unknown host
return false;
}
if (exception instanceof ConnectTimeoutException) {
// Connection refused
return true;
}
if (exception instanceof HttpHostConnectException) {
// connection exception
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
boolean idempotent = !(request instanceof HttpEntityEnclosingRequest);
if (idempotent) {
// Retry if the request is considered idempotent
return true;
}
return false;
}

};

CloseableHttpClient closeableHttpClient = HttpClients.custom()
.setRetryHandler(myRetryHandler)
.build();
HttpPost request = new HttpPost("http://localhost:8080/hello");
CloseableHttpResponse response = null;
String result = "";
try {
// add request headers
request.addHeader("Authorization", String.format("Bearer %s",System.getenv("ACCESS_TOKEN")));
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("channel", "random"));
urlParameters.add(new BasicNameValuePair("text", "message"));
request.setEntity(new UrlEncodedFormEntity(urlParameters));
response = (CloseableHttpResponse) closeableHttpClient.execute(request);
result = EntityUtils.toString(response.getEntity());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (ClientProtocolException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
request.releaseConnection();
response.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

0 comments on commit c545228

Please sign in to comment.