From 36a12b3aec34a4ceddd11a8ece6adb0df883c964 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 26 Sep 2023 15:36:17 -0500 Subject: [PATCH] Improve logging for failed documents in the OpenSearch sink Signed-off-by: Taylor Gray --- .../plugins/sink/opensearch/OpenSearchSink.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 04147619b7..d732033a4b 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -27,7 +27,6 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; @@ -35,12 +34,13 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.dlq.DlqProvider; import org.opensearch.dataprepper.plugins.dlq.DlqWriter; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapperFactory; -import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingCompressedBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingUncompressedBulkRequest; @@ -75,8 +75,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE; - @DataPrepperPlugin(name = "opensearch", pluginType = Sink.class) public class OpenSearchSink extends AbstractSink> { public static final String BULKREQUEST_LATENCY = "bulkRequestLatency"; @@ -387,7 +385,7 @@ private void logFailureForDlqObjects(final List dlqObjects, final Thr BulkOperationWriter.dlqObjectToString(dlqObject), message)); dlqObject.releaseEventHandle(true); } catch (final IOException e) { - LOG.error(SENSITIVE, "DLQ failure for Document[{}]", dlqObject.getFailedData(), e); + LOG.error("Failed to write a document to the DLQ", e); dlqObject.releaseEventHandle(false); } }); @@ -399,13 +397,17 @@ private void logFailureForDlqObjects(final List dlqObjects, final Thr }); } catch (final IOException e) { dlqObjects.forEach(dlqObject -> { - LOG.error(SENSITIVE, "DLQ failure for Document[{}]", dlqObject.getFailedData(), e); + LOG.error("Failed to write a document to the DLQ", e); dlqObject.releaseEventHandle(false); }); } } else { dlqObjects.forEach(dlqObject -> { - LOG.warn(SENSITIVE, "Document [{}] has failure. DLQ not configured", dlqObject.getFailedData(), failure); + + final FailedDlqData failedDlqData = (FailedDlqData) dlqObject.getFailedData(); + + final String message = failure == null ? failedDlqData.getMessage() : failure.getMessage(); + LOG.warn("Document failed to write to OpenSearch with error code {}. Configure a DLQ to write failed documents to: {}", failedDlqData.getStatus(), message); dlqObject.releaseEventHandle(false); }); }