Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Oct 4, 2023
1 parent 7260d58 commit 71fc541
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;

import com.fasterxml.jackson.databind.JsonNode;

Expand Down Expand Up @@ -45,15 +46,15 @@ public class BulkOperationWrapper {

private final EventHandle eventHandle;
private final BulkOperation bulkOperation;
private final JsonNode jsonNode;
private final SerializedJson jsonNode;

public BulkOperationWrapper(final BulkOperation bulkOperation) {
this.bulkOperation = bulkOperation;
this.eventHandle = null;
this.jsonNode = null;
}

public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final JsonNode jsonNode) {
public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode) {
checkNotNull(bulkOperation);
this.bulkOperation = bulkOperation;
this.eventHandle = eventHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
Expand Down Expand Up @@ -258,6 +259,61 @@ public boolean isReady() {
return initialized;
}

private BulkOperation getBulkOperationForAction(final String action, final SerializedJson document, final String indexName, final JsonNode jsonNode) {
BulkOperation bulkOperation;
final Optional<String> docId = document.getDocumentId();
final Optional<String> routing = document.getRoutingField();

if (StringUtils.equals(action, BulkAction.CREATE.toString())) {
final CreateOperation.Builder<Object> createOperationBuilder =
new CreateOperation.Builder<>()
.index(indexName)
.document(document);
docId.ifPresent(createOperationBuilder::id);
routing.ifPresent(createOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
.create(createOperationBuilder.build())
.build();
return bulkOperation;
}
if (StringUtils.equals(action, BulkAction.UPDATE.toString()) ||
StringUtils.equals(action, BulkAction.UPSERT.toString())) {
final UpdateOperation.Builder<Object> updateOperationBuilder = (action.toLowerCase() == BulkAction.UPSERT.toString()) ?
new UpdateOperation.Builder<>()
.index(indexName)
.document(jsonNode)
.upsert(jsonNode) :
new UpdateOperation.Builder<>()
.index(indexName)
.document(jsonNode);
docId.ifPresent(updateOperationBuilder::id);
routing.ifPresent(updateOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
.update(updateOperationBuilder.build())
.build();
return bulkOperation;
}
if (StringUtils.equals(action, BulkAction.DELETE.toString())) {
final DeleteOperation.Builder deleteOperationBuilder =
new DeleteOperation.Builder().index(indexName);
docId.ifPresent(deleteOperationBuilder::id);
routing.ifPresent(deleteOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
.delete(deleteOperationBuilder.build())
.build();
return bulkOperation;
}
// Default to "index"
final IndexOperation.Builder<Object> indexOperationBuilder =
new IndexOperation.Builder<>().index(indexName).document(document);
docId.ifPresent(indexOperationBuilder::id);
routing.ifPresent(indexOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
.index(indexOperationBuilder.build())
.build();
return bulkOperation;
}

@Override
public void doOutput(final Collection<Record<Event>> records) {
final long threadId = Thread.currentThread().getId();
Expand All @@ -274,8 +330,6 @@ public void doOutput(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event event = record.getData();
final SerializedJson document = getDocument(event);
final Optional<String> docId = document.getDocumentId();
final Optional<String> routing = document.getRoutingField();
String indexName = configuredIndexAlias;
try {
indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator));
Expand All @@ -292,8 +346,6 @@ public void doOutput(final Collection<Record<Event>> records) {
continue;
}

BulkOperation bulkOperation;

String eventAction = action;
if (actions != null) {
for (final Map<String, Object> actionEntry: actions) {
Expand All @@ -314,55 +366,15 @@ public void doOutput(final Collection<Record<Event>> records) {
continue;
}

if (StringUtils.equalsIgnoreCase(eventAction, BulkAction.CREATE.toString())) {
final CreateOperation.Builder<Object> createOperationBuilder =
new CreateOperation.Builder<>()
.index(indexName)
.document(document);
docId.ifPresent(createOperationBuilder::id);
routing.ifPresent(createOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
.create(createOperationBuilder.build())
.build();
} else if (StringUtils.equalsIgnoreCase(eventAction, BulkAction.UPDATE.toString()) || StringUtils.equalsIgnoreCase(eventAction, BulkAction.UPSERT.toString())) {
try {
final UpdateOperation.Builder<Object> updateOperationBuilder = (eventAction.toLowerCase() == BulkAction.UPSERT.toString()) ?
new UpdateOperation.Builder<>()
.index(indexName)
.document(event.getJsonNode())
.upsert(event.getJsonNode()) :
new UpdateOperation.Builder<>()
.index(indexName)
.document(event.getJsonNode());
docId.ifPresent(updateOperationBuilder::id);
routing.ifPresent(updateOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
.update(updateOperationBuilder.build())
.build();
} catch (Exception e){
bulkOperation = null;
}
} else if (StringUtils.equalsIgnoreCase(eventAction, BulkAction.DELETE.toString())) {
final DeleteOperation.Builder deleteOperationBuilder = new DeleteOperation.Builder()
.index(indexName);
docId.ifPresent(deleteOperationBuilder::id);
routing.ifPresent(deleteOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
.delete(deleteOperationBuilder.build())
.build();
} else {
// Default to "index"
final IndexOperation.Builder<Object> indexOperationBuilder = new IndexOperation.Builder<>()
.index(indexName)
.document(document);
docId.ifPresent(indexOperationBuilder::id);
routing.ifPresent(indexOperationBuilder::routing);
bulkOperation = new BulkOperation.Builder()
.index(indexOperationBuilder.build())
.build();
SerializedJson serializedJsonNode = null;
if (StringUtils.equals(action, BulkAction.UPDATE.toString()) ||
StringUtils.equals(action, BulkAction.UPSERT.toString()) ||
StringUtils.equals(action, BulkAction.DELETE.toString())) {
serializedJsonNode = SerializedJson.fromJsonNode(event.getJsonNode(), document);
}
BulkOperation bulkOperation = getBulkOperationForAction(eventAction, document, indexName, event.getJsonNode());

BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), event.getJsonNode());
BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode);
final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper);
if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) {
flushBatch(bulkRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ private long estimateBulkOperationSize(BulkOperationWrapper bulkOperation) {
if (anyDocument == null)
return OPERATION_OVERHEAD;

if (anyDocument instanceof JsonNode) {
return OPERATION_OVERHEAD + ((JsonNode)anyDocument).toString().length();
}

if (!(anyDocument instanceof SizedDocument)) {
throw new IllegalArgumentException("Only SizedDocument is permitted for accumulating bulk requests. " + bulkOperation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import com.fasterxml.jackson.databind.JsonNode;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -30,5 +31,8 @@ static SerializedJson fromStringAndOptionals(String jsonString, String docId, St
return new SerializedJsonImpl(jsonString.getBytes(StandardCharsets.UTF_8), docId, routingField);
}

static SerializedJson fromJsonNode(final JsonNode jsonNode, SerializedJson document) {
return new SerializedJsonNode(jsonNode, document);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.Serializable;
import java.util.Optional;

class SerializedJsonNode implements SerializedJson, Serializable {
private byte[] document;
private JsonNode jsonNode;
private String documentId = null;
private String routingField = null;

public SerializedJsonNode(final JsonNode jsonNode, SerializedJson doc) {
this.jsonNode = jsonNode;
this.documentId = doc.getDocumentId().get();
this.routingField = doc.getRoutingField().get();
this.document = jsonNode.toString().getBytes();
}

public SerializedJsonNode(final JsonNode jsonNode) {
this.jsonNode = jsonNode;
this.document = jsonNode.toString().getBytes();
}

@Override
public long getDocumentSize() {
return document.length;
}

@Override
public byte[] getSerializedJson() {
return document;
}

@Override
public Optional<String> getDocumentId() {
return Optional.ofNullable(documentId);
}

@Override
public Optional<String> getRoutingField() {
return Optional.ofNullable(routingField);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Random;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.lang3.RandomStringUtils;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;

class SerializedJsonNodeTest {
private int documentSize;
private byte[] documentBytes;
private String documentId;
private String routingField;
private JsonNode jsonNode;
private SerializedJson document;
private String jsonString;

@BeforeEach
void setUp() {
Random random = new Random();
jsonString = "{\"key\":\"value\"}";
documentSize = jsonString.length();

ObjectMapper objectMapper = new ObjectMapper();
try {
jsonNode = objectMapper.readTree(jsonString);
} catch (Exception e) {
jsonNode = null;
}
documentId = RandomStringUtils.randomAlphabetic(10);
routingField = RandomStringUtils.randomAlphabetic(10);
document = SerializedJson.fromStringAndOptionals(jsonString, documentId, routingField);
}

private SerializedJsonNode createObjectUnderTest() {
return new SerializedJsonNode(jsonNode, document);
}

@Test
void getDocumentSize_returns_size_of_the_document_byte_array() {
assertThat(createObjectUnderTest().getDocumentSize(), equalTo((long) documentSize));
}

@Test
void getSerializedJson_returns_the_document_byte_array_and_fields() {
assertThat(createObjectUnderTest().getSerializedJson(), equalTo(jsonString.getBytes()));
assertThat(createObjectUnderTest().getDocumentId().get(), equalTo(documentId));
assertThat(createObjectUnderTest().getRoutingField().get(), equalTo(routingField));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.lang3.RandomStringUtils;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -26,13 +28,32 @@ void fromString_throws_if_the_jsonString_is_null() {

@Test
void fromString_returns_SerializedJsonImpl_with_correctValues() {
String documentId = RandomStringUtils.randomAlphabetic(10);
String routingField = RandomStringUtils.randomAlphabetic(10);
SerializedJson serializedJson = SerializedJson.fromStringAndOptionals("{}", documentId, routingField);
assertThat(serializedJson, instanceOf(SerializedJsonImpl.class));
String documentId = RandomStringUtils.randomAlphabetic(10);
String routingField = RandomStringUtils.randomAlphabetic(10);
SerializedJson serializedJson = SerializedJson.fromStringAndOptionals("{}", documentId, routingField);
assertThat(serializedJson, instanceOf(SerializedJsonImpl.class));
assertThat(serializedJson.getDocumentId().get(), equalTo(documentId));
assertThat(serializedJson.getRoutingField().get(), equalTo(routingField));
assertThat(serializedJson.getSerializedJson(), equalTo("{}".getBytes()));
}

@Test
void fromString_returns_SerializedJsonNode_with_correctValues() {
String documentId = RandomStringUtils.randomAlphabetic(10);
String routingField = RandomStringUtils.randomAlphabetic(10);
final String jsonString = "{\"key\":\"value\"}";
JsonNode jsonNode;
ObjectMapper objectMapper = new ObjectMapper();
try {
jsonNode = objectMapper.readTree(jsonString);
} catch (Exception e) {
jsonNode = null;
}
SerializedJson document = SerializedJson.fromStringAndOptionals(jsonString, documentId, routingField);
SerializedJson serializedJson = SerializedJson.fromJsonNode(jsonNode, document);
assertThat(serializedJson, instanceOf(SerializedJsonNode.class));
assertThat(serializedJson.getDocumentId().get(), equalTo(documentId));
assertThat(serializedJson.getRoutingField().get(), equalTo(routingField));
assertThat(serializedJson.getSerializedJson(), equalTo(jsonString.getBytes()));
}
}

0 comments on commit 71fc541

Please sign in to comment.