Skip to content
This repository has been archived by the owner on Aug 10, 2021. It is now read-only.

Commit

Permalink
Merge branch 'josh/fixes'
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Kuhn committed Dec 12, 2014
2 parents 844b7e8 + 640f754 commit 23291db
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-river-rethinkdb</artifactId>
<version>1.0.0</version>
<version>1.0.1</version>
<packaging>jar</packaging>
<name>Elasticsearch RethinkDB River plugin</name>
<description>A River for indexing RethinkDB databases and staying synced</description>
Expand Down
61 changes: 45 additions & 16 deletions src/main/java/org/elasticsearch/river/rethinkdb/FeedWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import com.rethinkdb.RethinkDB;
import com.rethinkdb.RethinkDBConnection;
import com.rethinkdb.RethinkDBException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -108,20 +111,36 @@ private boolean updateES(Map<String, Object> change) {
client.prepareIndex(
changeRecord.targetIndex,
changeRecord.targetType,
(String) newVal.get(primaryKey))
newVal.get(primaryKey).toString())
.setSource(newVal)
.execute();
return false;
}else{
client.prepareDelete(
changeRecord.targetIndex,
changeRecord.targetType,
(String) oldVal.get(primaryKey))
oldVal.get(primaryKey).toString())
.execute();
return true;
}
}

private int synchronizeBulk(BulkRequestBuilder bulkRequest, HashSet<String> failureReasons) {
int failed = 0;
BulkResponse response = bulkRequest.get();
if (response.hasFailures()) {
logger.error("Encountered errors backfilling");
logger.error(response.buildFailureMessage());
for(BulkItemResponse ir : response.getItems()){
if (ir.isFailed()) {
failed++;
failureReasons.add(ir.getFailureMessage());
}
}
}
return failed;
}

private void backfill() throws IOException {
RethinkDBConnection backfillConnection = r.connect(river.hostname, river.port, river.authKey);
backfillConnection.use(changeRecord.db);
Expand All @@ -131,31 +150,41 @@ private void backfill() throws IOException {
// inserted while we're backfilling
int totalSize = r.table(changeRecord.table).count().run(backfillConnection).intValue();
BulkRequestBuilder bulkRequest = client.prepareBulk();
int i = 0;
int oldTenthile = 0, newTenthile;
int attempted = 0, failed = 0;
HashSet<String> failureReasons = new HashSet<>();
int oldDecile = 0, newDecile;
Cursor cursor = r.table(changeRecord.table).runForCursor(backfillConnection);
while (cursor.hasNext()){
Map<String, Object> doc = (Map<String, Object>) cursor.next();
newTenthile = (i * 100) / totalSize / 10;
if (newTenthile != oldTenthile) {
logger.info("backfill {}0% complete ({} documents)", newTenthile, i);
oldTenthile = newTenthile;
newDecile = (attempted * 100) / totalSize / 10;
if (newDecile != oldDecile) {
logger.info("backfill {}0% complete ({} documents)", newDecile, attempted);
oldDecile = newDecile;
}
if (i % 100 == 0) {
bulkRequest.execute();
if (attempted > 0 && attempted % 1000 == 0) {
failed += synchronizeBulk(bulkRequest, failureReasons);
bulkRequest = client.prepareBulk();
}
bulkRequest.add(client.prepareIndex(
changeRecord.targetIndex,
changeRecord.targetType,
(String) doc.get(primaryKey))
doc.get(primaryKey).toString())
.setSource(doc)
);
i += 1;
attempted += 1;
}
if (bulkRequest.numberOfActions() > 0) {
failed += synchronizeBulk(bulkRequest, failureReasons);
}
if (failed > 0) {
logger.info("Attempted to backfill {} items, {} succeeded and {} failed.",
attempted, attempted - failed, failed);
logger.info("Unique failure reasons were: {}", failureReasons.toString());
backfillRequired = true;
} else {
logger.info("Backfilled {} items. Turning off backfill in settings", attempted);
backfillRequired = false;
}
bulkRequest.execute();
logger.info("Backfilled {} items. Turning off backfill in settings", i);
backfillRequired = false;
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject("rethinkdb")
Expand All @@ -181,7 +210,7 @@ private void backfill() throws IOException {
@SuppressWarnings("unchecked")
private String getPrimaryKey() {
Map<String, Object> tableInfo = (Map) r.db(changeRecord.db).table(changeRecord.table).info().run(connection);
return (String) tableInfo.get("primary_key");
return tableInfo.get("primary_key").toString();
}

private boolean isRecoverableError(RethinkDBException exc){
Expand Down

0 comments on commit 23291db

Please sign in to comment.