Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding slack sink connector #60

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ A Hazelcast Jet connector for consuming data from Twitter stream sources in Jet
Tests to check compatibility of the XA support in your JMS broker or
JDBC database with Jet's fault tolerance.

### [Slack Connector](slack)

A Hazelcast Jet slack connector to post processed messages to slack channels using slack sink in the Jet pipelines.

## Snapshot Releases

To access snapshot builds add the following `dependency` and `repository` declarations
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ subprojects {
dependencies {
compile group: 'com.hazelcast.jet', name: 'hazelcast-jet', version: jetVersion
compile group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'
compile 'com.fasterxml.jackson.core:jackson-core:2.10.3'
compile 'com.fasterxml.jackson.core:jackson-annotations:2.10.3'
compile 'com.fasterxml.jackson.core:jackson-databind:2.10.3'

testCompile "com.hazelcast.jet:hazelcast-jet-core:" + jetVersion + ":tests"
testCompile "com.hazelcast:hazelcast:" + hazelcastVersion + ":tests"
testCompile "javax.cache:cache-api:1.1.0"
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ include 'mongodb'
include 'debezium'
include 'twitter'
include 'xa-test'
include 'slack'

project(':elasticsearch-5').projectDir = file('elasticsearch/elasticsearch-5')
project(':elasticsearch-6').projectDir = file('elasticsearch/elasticsearch-6')
Expand Down
15 changes: 15 additions & 0 deletions slack/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
id 'java'
}


sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.12'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
161 changes: 161 additions & 0 deletions slack/src/main/java/com/hazelcast/jet/contrib/slack/SlackSinks.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet.contrib.slack;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.contrib.slack.util.SlackResponse;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SinkBuilder;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;

/**
* Contains the methods to create Slack sinks.
*/
public final class SlackSinks {

private static final String URL = "https://slack.com/api/chat.postMessage";

private static final int RETRY_COUNT = Integer.MAX_VALUE;

private static final ObjectMapper MAPPER = new ObjectMapper();

private SlackSinks() {
}

/**
* Creates a slack sink to send messages to the requested channelId.
* Internally Apache {@link org.apache.http.client.HttpClient} is used with
* custom {@link HttpRequestRetryHandler} to post the messages.
* Following exceptions {@link ConnectTimeoutException}, {@link HttpHostConnectException},
* {@link IOException}, {@link UnknownHostException}
* are handled through custom retry handler.
* Other error scenarios will lead to the failure of the Jet job.
*
* @param accessToken String Bearer token to authenticate the slack web api requests
* @param slackChannelId String Unique channel id to send messages to the slack channel.
* @return
*/
public static Sink sink(String accessToken, String slackChannelId) {
return SinkBuilder.sinkBuilder("slack(" + slackChannelId + ")",
ctx -> new SlackContext(() -> HttpClients.custom().setRetryHandler(httpRetryHandler())
.build(), accessToken, slackChannelId))
.<String>receiveFn((ctx, item) -> ctx.receiveFn(item)).destroyFn(ctx -> ctx.destroy()).build();
}

private static HttpRequestRetryHandler httpRetryHandler() {
return (exception, executionCount, context) -> {
if (executionCount >= RETRY_COUNT) {
// Do not retry if over max retry count
return false;
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
}
if (exception instanceof UnknownHostException) {
// Unknown host
return true;
}
if (exception instanceof ConnectTimeoutException) {
// Connection refused
return true;
}
if (exception instanceof HttpHostConnectException) {
// connection exception
return true;
}
if (exception instanceof IOException) {
// Interrupted Io exceptions
return true;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
boolean idempotent = !(request instanceof HttpEntityEnclosingRequest);
if (idempotent) {
// Retry if the request is considered idempotent
return true;
}
return false;
};
}

private static final class SlackContext {

private final SupplierEx<CloseableHttpClient> httpClientSupplierEx;
private final CloseableHttpClient closeableHttpClient;
private final String accessToken;
private final String channel;

private SlackContext(SupplierEx<CloseableHttpClient> bulkRequestSupplier, String accessToken, String channel) {
this.httpClientSupplierEx = bulkRequestSupplier;
this.closeableHttpClient = bulkRequestSupplier.get();
this.accessToken = accessToken;
this.channel = channel;
}

private String receiveFn(String message) {
HttpPost request = new HttpPost(URL);
CloseableHttpResponse response = null;
String result = "";
SlackResponse slackResponse;
try {
// add request headers
request.addHeader("Authorization", String.format("Bearer %s", accessToken));
List<NameValuePair> urlParameters = new ArrayList<>();
urlParameters.add(new BasicNameValuePair("channel", channel));
urlParameters.add(new BasicNameValuePair("text", message));
request.setEntity(new UrlEncodedFormEntity(urlParameters));
response = (CloseableHttpResponse) closeableHttpClient.execute(request);
result = EntityUtils.toString(response.getEntity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We return the result, but the returned value is ignored. Is there something worthwhile to check?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the simple to pojo class to parse the response. In error scenarios sample message will look like this { "ok": false, "error": "not_authed" } If the Response.ok false means then throwing the RuntimeException.

slackResponse = MAPPER.readValue(result, SlackResponse.class);
if (!slackResponse.isOk()) {
throw new RuntimeException(slackResponse.getError());
}
} catch (IOException var) {
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
throw ExceptionUtil.rethrow(var);
} finally {
try {
request.releaseConnection();
response.close();
} catch (Exception var) {
throw ExceptionUtil.rethrow(var);
}
}
return result;
}

private void destroy() throws IOException {
closeableHttpClient.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Connector for Hazelcast Jet to push messages to Slack sink.
*/
package com.hazelcast.jet.contrib.slack;
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet.contrib.slack.util;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import java.io.Serializable;

/**
*Plain object to represent the slack api response.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class SlackResponse implements Serializable {

private boolean ok;
private String error;

/**
* Returns the response execution status.
*
* @return
*/
public boolean isOk() {
return ok;
}

/**
* Setter metthod to set the response status.
* @param ok
*/
public void setOk(boolean ok) {
this.ok = ok;
}

/**
* Get the error message.
*
* @return
*/
public String getError() {
return error;
}

/**
* Setter method to set the error value.
* @param error
*/
public void setError(String error) {
this.error = error;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Common/Utility classes
*/
package com.hazelcast.jet.contrib.slack.util;
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.jet.contrib.slack;

import com.hazelcast.function.Functions;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sources;
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.Map;
import java.util.Properties;

public final class SlackSinksTest extends JetTestSupport {

private JetInstance jet;
private Properties credentials;

@Before
public void setup() {
jet = createJetMember();
}

@Test
public void testSlackSink() {
List<String> text = jet.getList("text");
text.add("hello world hello world hazelcast");
text.add("sample message to slack channel");

Pipeline pipeline = Pipeline.create();
BatchStage<Map.Entry<String, Long>> tweets = pipeline
.readFrom(Sources.<String>list("text"))
.flatMap(line -> Traversers.traverseArray(line.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.groupingKey(Functions.wholeItem())
.aggregate(AggregateOperations.counting());

Pipeline p = Pipeline.create();
p.readFrom(Sources.<String>list("text"))
.writeTo(SlackSinks.sink(System.getenv("ACCESS_TOKEN"), System.getenv("CHANNEL_ID")));
jet.newJob(p).join();
}
}