Skip to content

Commit

Permalink
Initial support for Elasticsearch 5.x via REST client
Browse files Browse the repository at this point in the history
  • Loading branch information
aecio committed Apr 18, 2017
1 parent 391dbb8 commit 176a6cd
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 5 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
compile 'nz.ac.waikato.cms.weka:weka-stable:3.6.13'
compile 'org.apache.lucene:lucene-core:4.10.4'
compile 'org.elasticsearch:elasticsearch:1.4.4'
compile 'org.elasticsearch.client:rest:5.3.0'
compile 'io.dropwizard.metrics:metrics-core:3.1.3'
compile 'io.dropwizard.metrics:metrics-json:3.1.3'
compile 'io.dropwizard.metrics:metrics-jvm:3.1.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.List;
import java.util.TreeSet;

import org.apache.http.annotation.NotThreadSafe;
//import org.apache.http.annotation.NotThreadSafe;
import org.apache.http.client.CookieStore;
import org.apache.http.cookie.Cookie;
import org.apache.http.cookie.CookieIdentityComparator;
Expand All @@ -33,7 +33,7 @@
* HttpComponents Changes: removed synchronization
*
*/
@NotThreadSafe
//@NotThreadSafe
public class LocalCookieStore implements CookieStore, Serializable {

private static final long serialVersionUID = -7581093305228232025L;
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/focusedCrawler/target/TargetStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import focusedCrawler.target.classifier.TargetClassifierException;
import focusedCrawler.target.classifier.TargetClassifierFactory;
import focusedCrawler.target.model.Page;
import focusedCrawler.target.repository.ElasticSearchRestTargetRepository;
import focusedCrawler.target.repository.ElasticSearchTargetRepository;
import focusedCrawler.target.repository.FileSystemTargetRepository;
import focusedCrawler.target.repository.FileSystemTargetRepository.DataFormat;
Expand Down Expand Up @@ -186,9 +187,14 @@ else if(dataFormat.equals("ELASTICSEARCH")) {
if(indexName == null) {
throw new IllegalArgumentException("ElasticSearch index name not provided!");
}
ElasticSearchConfig esconfig = config.getElasticSearchConfig();
targetRepository = new ElasticSearchTargetRepository(esconfig, indexName, "target");
negativeRepository = new ElasticSearchTargetRepository(esconfig, indexName, "negative");
ElasticSearchConfig esconfig = config.getElasticSearchConfig();
if (esconfig.getRestApiHosts() == null) {
targetRepository = new ElasticSearchTargetRepository(esconfig, indexName, "target");
negativeRepository = new ElasticSearchTargetRepository(esconfig, indexName, "negative");
} else {
targetRepository = new ElasticSearchRestTargetRepository(esconfig, indexName, "target");
negativeRepository = new ElasticSearchRestTargetRepository(esconfig, indexName, "negative");
}
}
else {
throw new IllegalArgumentException("Invalid data format provided: "+dataFormat);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package focusedCrawler.target.repository;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import focusedCrawler.target.model.Page;
import focusedCrawler.target.model.TargetModelElasticSearch;
import focusedCrawler.target.repository.elasticsearch.ElasticSearchConfig;

public class ElasticSearchRestTargetRepository implements TargetRepository {

private static final Map<String, String> EMPTY_MAP = Collections.<String, String>emptyMap();
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestTargetRepository.class);
private static final ObjectMapper mapper = new ObjectMapper();

static {
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}

private RestClient client;
private String typeName;
private String indexName;

public ElasticSearchRestTargetRepository(ElasticSearchConfig config,
String indexName,
String typeName) {
this.indexName = indexName;
this.typeName = typeName;
this.client = createRestClient(config);
this.createIndexMapping(indexName);
}

private void createIndexMapping(String indexName) {

String indexEndpoint = "/" + indexName;
boolean exists = false;
String esVersion = "5.x.x";
try {
Response existsResponse = client.performRequest("HEAD", indexEndpoint);
exists = (existsResponse.getStatusLine().getStatusCode() == 200);

Response rootResponse = client.performRequest("GET", "/");
String json = EntityUtils.toString(rootResponse.getEntity());
String versionNumber = mapper.readTree(json).path("version").path("number").asText();
if (versionNumber != null && !versionNumber.isEmpty()) {
esVersion = versionNumber;
}
logger.info("Elasticsearch version: {}", esVersion);
} catch (IOException e) {
throw new RuntimeException(
"Failed to check whether index already exists in Elasticsearch.", e);
}

if (!exists) {
final String targetMapping1x = ""
+ "{"
+ " \"properties\": {"
+ " \"domain\": {\"type\": \"string\",\"index\": \"not_analyzed\"},"
+ " \"words\": {\"type\": \"string\",\"index\": \"not_analyzed\"},"
+ " \"wordsMeta\": {\"type\": \"string\",\"index\": \"not_analyzed\"},"
+ " \"retrieved\": {\"type\": \"date\",\"format\": \"dateOptionalTime\"},"
+ " \"text\": {\"type\": \"string\"},"
+ " \"title\": {\"type\": \"string\"},"
+ " \"url\": {\"type\": \"string\",\"index\": \"not_analyzed\"},"
+ " \"topPrivateDomain\": {\"type\": \"string\",\"index\": \"not_analyzed\"}"
+ " }"
+ "}";

final String pageMapping5x =""
+ "{"
+ " \"properties\": {"
+ " \"domain\": {\"type\": \"keyword\",\"index\": true},"
+ " \"words\": {\"type\": \"keyword\",\"index\": true},"
+ " \"wordsMeta\": {\"type\": \"keyword\",\"index\": true},"
+ " \"retrieved\": {\"type\": \"date\",\"format\": \"dateOptionalTime\"},"
+ " \"text\": {\"type\": \"text\"},"
+ " \"title\": {\"type\": \"text\"},"
+ " \"url\": {\"type\": \"keyword\",\"index\":true},"
+ " \"topPrivateDomain\": {\"type\": \"keyword\",\"index\": true}"
+ " }"
+ "}";

String pageProperties = esVersion.startsWith("5.") ? pageMapping5x : targetMapping1x;

String mapping =
"{"
+ " \"mappings\": {"
+ " \"target\": "+ pageProperties + ","
+ " \"negative\": "+ pageProperties
+ " }"
+ "}";

try {
AbstractHttpEntity entity = createJsonEntity(mapping);
Response response = client.performRequest("PUT", indexEndpoint, EMPTY_MAP, entity);
if (response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(
"Failed to create index in Elasticsearch." + response.toString());
}
} catch (IOException e) {
throw new RuntimeException("Failed to create index in Elasticsearch.", e);
}
}
}

private AbstractHttpEntity createJsonEntity(String mapping) {
return new NStringEntity(mapping, ContentType.APPLICATION_JSON);
}

public boolean insert(Page target) {
return index(target);
}

private boolean index(Page page) {

TargetModelElasticSearch data = new TargetModelElasticSearch(page);

String docId = encodeUrl(page.getURL().toString());
String endpoint = "/" + indexName + "/" + typeName + "/" + docId;
AbstractHttpEntity entity = createJsonEntity(serializeAsJson(data));
try {
Response response = client.performRequest("PUT", endpoint, EMPTY_MAP, entity);
return response.getStatusLine().getStatusCode() == 201;
} catch (IOException e) {
throw new RuntimeException("Failed to index page.", e);
}
}

private String encodeUrl(String url) {
try {
return URLEncoder.encode(url, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("Failed to URL encode string: "+url, e);
}
}

private String serializeAsJson(Object model) {
String targetAsJson;
try {
targetAsJson = mapper.writeValueAsString(model);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize TargetModel to JSON.", e);
}
return targetAsJson;
}

public RestClient createRestClient(ElasticSearchConfig config) {

List<String> esHosts = config.getRestApiHosts();
List<HttpHost> hosts = new ArrayList<>();
for (String host : esHosts) {
try {
URL url = new URL(host);
hosts.add(new HttpHost(url.getHost(), url.getPort()));
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to initialize Elasticsearch REST client. "
+ "Invalid host: " + host, e);
}
}

HttpHost[] httpHostsArray = (HttpHost[]) hosts.toArray(new HttpHost[hosts.size()]);

client = RestClient.builder(httpHostsArray)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(config.getRestConnectTimeout())
.setSocketTimeout(config.getRestSocketTimeout());
}
})
.setMaxRetryTimeoutMillis(config.getRestMaxRetryTimeoutMillis())
.build();

logger.info("Initialized Elasticsearch REST client for hosts: "+Arrays.toString(httpHostsArray));
return client;
}

@Override
public void close() {
try {
if (client != null) {
client.close();
}
} catch (IOException e) {
throw new RuntimeException("Failed to close Elasticsearch REST client", e);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package focusedCrawler.target.repository.elasticsearch;

import java.util.List;

import com.fasterxml.jackson.annotation.JsonProperty;

public class ElasticSearchConfig {

//
// Elasticsearch Transport Client parameters
//
@JsonProperty("target_storage.data_format.elasticsearch.host")
private String host = "localhost";

Expand All @@ -13,6 +18,21 @@ public class ElasticSearchConfig {
@JsonProperty("target_storage.data_format.elasticsearch.cluster_name")
private String clusterName = "elasticsearch";

//
// Elasticsearch REST API parameters
//
@JsonProperty("target_storage.data_format.elasticsearch.rest.hosts")
private List<String> restApiHosts = null;

@JsonProperty("target_storage.data_format.elasticsearch.rest.connect_timeout")
private int restConnectTimeout = 30000;

@JsonProperty("target_storage.data_format.elasticsearch.rest.socket_timeout")
private int restSocketTimeout = 30000;

@JsonProperty("target_storage.data_format.elasticsearch.rest.max_retry_timeout_millis")
private int restMaxRetryTimeoutMillis = 60000;

public ElasticSearchConfig() { }

public ElasticSearchConfig(String hostname, int port, String clusterName) {
Expand All @@ -32,4 +52,21 @@ public int getPort() {
public String getClusterName() {
return clusterName;
}

public List<String> getRestApiHosts() {
return restApiHosts;
}

public int getRestConnectTimeout() {
return restConnectTimeout;
}

public int getRestSocketTimeout() {
return restSocketTimeout;
}

public int getRestMaxRetryTimeoutMillis() {
return restMaxRetryTimeoutMillis;
}

}

0 comments on commit 176a6cd

Please sign in to comment.