Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding slack sink connector #60

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ A Hazelcast Jet connector for consuming data from Twitter stream sources in Jet
Tests to check compatibility of the XA support in your JMS broker or
JDBC database with Jet's fault tolerance.

### [Slack Connector](slack)

A Hazelcast Jet slack connector to post processed messages to slack channels using slack sink in the Jet pipelines.

## Snapshot Releases

To access snapshot builds add the following `dependency` and `repository` declarations
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ include 'mongodb'
include 'debezium'
include 'twitter'
include 'xa-test'
include 'slack'

project(':elasticsearch-5').projectDir = file('elasticsearch/elasticsearch-5')
project(':elasticsearch-6').projectDir = file('elasticsearch/elasticsearch-6')
Expand Down
15 changes: 15 additions & 0 deletions slack/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
id 'java'
}


sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.12'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
170 changes: 170 additions & 0 deletions slack/src/main/java/com/hazelcast/jet/contrib/slack/SlackSinks.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet.contrib.slack;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;

/**
* Contains the methods to create Slack sinks.
*/
public final class SlackSinks {

private static final String URL = "https://slack.com/api/chat.postMessage";

private static final int RETRY_COUNT = 5;

private SlackSinks() {
}

/**
* Creates a sink to send messages to requested channel id.
* Here the sink will be created with the default name as slack-sink.
*
* @param accessToken String Bearer token to authenticate the slack web api requests
* @param slackChannelId String Unique channel id to send messages to the slack channel.
* @return
*/
public static Sink sink(String accessToken, String slackChannelId) {
return sink("slack-sink", accessToken, slackChannelId);
}

/**
* Creates a slack sink to send messages to the slack channel with the given slack channel name.
* Internally apache {@link org.apache.http.client.HttpClient} is used with
* custom {@link HttpRequestRetryHandler} to post the messages to slack channels.
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
* Following error causes {@link ConnectTimeoutException}, {@link HttpHostConnectException}, {@link IOException}
* are handled through custom retry handler.The max retry count has been configured to 5.
* Other error causes will lead to the breakage of the jet pipeline.
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
*
* @param accessToken String Bearer token to authenticate the slack web api requests
* @param slackChannelId String Unique channel id to send messages to the slack channel.
* @param slackChannelName String channel name to be used as sink name
* @return
*/
public static Sink sink(String accessToken, String slackChannelId, String slackChannelName) {
return SinkBuilder.sinkBuilder(slackChannelName,
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
ctx -> new SlackContext(() -> HttpClients.custom().setRetryHandler(httpRetryHandler())
.build(), accessToken, slackChannelId))
.<String>receiveFn((ctx, item) -> ctx.receiveFn(item)).destroyFn(ctx -> ctx.destroy()).build();
}

private static HttpRequestRetryHandler httpRetryHandler() {
return (exception, executionCount, context) -> {
if (executionCount >= RETRY_COUNT) {
// Do not retry if over max retry count
return false;
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
}
if (exception instanceof UnknownHostException) {
// Unknown host
return false;
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
}
if (exception instanceof ConnectTimeoutException) {
// Connection refused
return true;
}
if (exception instanceof HttpHostConnectException) {
// connection exception
return true;
}
if (exception instanceof IOException) {
// Interrupted Io exceptions
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
boolean idempotent = !(request instanceof HttpEntityEnclosingRequest);
if (idempotent) {
// Retry if the request is considered idempotent
return true;
}
return false;
};
}

private static final class SlackContext {

private final SupplierEx<CloseableHttpClient> httpClientSupplierEx;
private final CloseableHttpClient closeableHttpClient;
private final String accessToken;
private final String channel;

private SlackContext(SupplierEx<CloseableHttpClient> bulkRequestSupplier, String accessToken, String channel) {
this.httpClientSupplierEx = bulkRequestSupplier;
this.closeableHttpClient = bulkRequestSupplier.get();
this.accessToken = accessToken;
this.channel = channel;
}

private String receiveFn(String message) {
HttpPost request = new HttpPost(URL);
CloseableHttpResponse response = null;
String result = "";
try {
// add request headers
request.addHeader("Authorization", String.format("Bearer %s", accessToken));
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("channel", channel));
urlParameters.add(new BasicNameValuePair("text", message));
request.setEntity(new UrlEncodedFormEntity(urlParameters));
response = (CloseableHttpResponse) closeableHttpClient.execute(request);
result = EntityUtils.toString(response.getEntity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We return the result, but the returned value is ignored. Is there something worthwhile to check?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the simple to pojo class to parse the response. In error scenarios sample message will look like this { "ok": false, "error": "not_authed" } If the Response.ok false means then throwing the RuntimeException.

} catch (UnsupportedEncodingException var) {
throw ExceptionUtil.rethrow(var);
} catch (ClientProtocolException var) {
throw ExceptionUtil.rethrow(var);
} catch (IOException var) {
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
throw ExceptionUtil.rethrow(var);
} finally {
try {
request.releaseConnection();
response.close();
} catch (Exception var) {
throw ExceptionUtil.rethrow(var);
}
}
return result;
}

private void destroy() throws IOException {
closeableHttpClient.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Connector for Hazelcast Jet to push messages to Slack sink.
*/
package com.hazelcast.jet.contrib.slack;
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.jet.contrib.slack;

import com.hazelcast.function.Functions;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sources;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public final class SlackSInksTest extends JetTestSupport {
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved

private JetInstance jet;
private Properties credentials;

@Before
public void setup() {
jet = createJetMember();
}

@Test
public void testSlackSink() {
List<String> text = jet.getList("text");
text.add("hello world hello world hazelcast");
text.add("sample message to slack channel");

Pipeline pipeline = Pipeline.create();
BatchStage<Map.Entry<String, Long>> tweets = pipeline
.readFrom(Sources.<String>list("text"))
.flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.groupingKey(Functions.wholeItem())
.aggregate(AggregateOperations.counting());

Pipeline p = Pipeline.create();
p.readFrom(Sources.<String>list("text"))
.writeTo(SlackSinks.sink(System.getenv("ACCESS_TOKEN"), System.getenv("CHANNEL_ID")));
jet.newJob(p).join();
}

@Test
public void retryTests() {
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved

HttpRequestRetryHandler myRetryHandler = new HttpRequestRetryHandler() {
@Override
public boolean retryRequest(
IOException exception,
int executionCount,
HttpContext context) {
if (executionCount >= 5) {
// Do not retry if over max retry count
return false;
}
if (exception instanceof InterruptedIOException) {
// Timeout
return false;
}
if (exception instanceof UnknownHostException) {
// Unknown host
return false;
}
if (exception instanceof ConnectTimeoutException) {
// Connection refused
return true;
}
if (exception instanceof HttpHostConnectException) {
// connection exception
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
boolean idempotent = !(request instanceof HttpEntityEnclosingRequest);
if (idempotent) {
// Retry if the request is considered idempotent
return true;
}
return false;
}

};

CloseableHttpClient closeableHttpClient = HttpClients.custom()
.setRetryHandler(myRetryHandler)
.build();
HttpPost request = new HttpPost("http://localhost:8080/hello");
CloseableHttpResponse response = null;
String result = "";
try {
// add request headers
request.addHeader("Authorization", String.format("Bearer %s",System.getenv("ACCESS_TOKEN")));
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("channel", "random"));
urlParameters.add(new BasicNameValuePair("text", "message"));
request.setEntity(new UrlEncodedFormEntity(urlParameters));
response = (CloseableHttpResponse) closeableHttpClient.execute(request);
result = EntityUtils.toString(response.getEntity());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (ClientProtocolException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
request.releaseConnection();
response.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}