From 1f155482150bf92b4a4178e92afdd177611f128e Mon Sep 17 00:00:00 2001 From: a1exsh Date: Thu, 19 Oct 2023 13:53:33 +0200 Subject: [PATCH] Add unprocessable event policy on consumption (#1561) This allows the user to choose between the current implementation of skipping the unprocessable events, or sending those events to the dead letter queue. --- .../nakadi/webservice/BinaryEndToEndAT.java | 6 +- .../webservice/BinaryEventPublisherAT.java | 16 +- .../nakadi/webservice/UserJourneyAT.java | 13 +- .../nakadi/webservice/hila/HilaAT.java | 95 ++++++++---- .../nakadi/webservice/hila/StreamBatch.java | 129 +++++++++------- .../subscription/StreamingContext.java | 52 ++++++- .../SubscriptionStreamerFactory.java | 19 ++- .../subscription/state/StreamingState.java | 60 +++++++- .../NakadiDeadLetterQueueInitialization.java | 74 ++++++++++ app/src/main/resources/application.yml | 5 + .../dead_letter_queue_event_types.json | 49 ++++++ .../DeadLetterAnnotationValidator.java | 65 +++++++- .../domain/UnprocessableEventPolicy.java | 6 + .../DeadLetterQueueStoreException.java | 12 ++ .../DeadLetterAnnotationValidatorTest.java | 139 +++++++++++++++++- .../service/publishing/EventPublisher.java | 15 ++ 16 files changed, 637 insertions(+), 118 deletions(-) create mode 100644 app/src/main/java/org/zalando/nakadi/service/NakadiDeadLetterQueueInitialization.java create mode 100644 app/src/main/resources/dead_letter_queue_event_types.json create mode 100644 core-common/src/main/java/org/zalando/nakadi/domain/UnprocessableEventPolicy.java create mode 100644 core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/DeadLetterQueueStoreException.java diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java index b8a4c8391a..47a1b906f1 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java @@ -8,6 +8,7 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.http.HttpStatus; +import org.json.JSONObject; import org.junit.Assert; import org.junit.Test; import org.springframework.core.io.DefaultResourceLoader; @@ -33,7 +34,6 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.List; -import java.util.Map; import static com.jayway.restassured.RestAssured.given; import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; @@ -119,10 +119,10 @@ public void shouldPublishAvroAndConsumeJsonAndAvro() throws IOException { final TestStreamingClient client1 = TestStreamingClient.create(subscription1.getId()).start(); TestUtils.waitFor(() -> Assert.assertEquals(1, client1.getJsonBatches().size())); - final Map jsonEvent = client1.getJsonBatches().get(0).getEvents().get(0); + final JSONObject jsonEvent = client1.getJsonBatches().get(0).getEvents().get(0); Assert.assertEquals("bar", jsonEvent.get("foo")); - final Map metadata = (Map) jsonEvent.get("metadata"); + final JSONObject metadata = jsonEvent.getJSONObject("metadata"); Assert.assertEquals("CE8C9EBC-3F19-4B9D-A453-08AD2EDA6028", metadata.get("eid")); Assert.assertEquals("2.0.0", metadata.get("version")); Assert.assertEquals(testETName, metadata.get("event_type")); diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEventPublisherAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEventPublisherAT.java index db7eb7e82e..8cfcf951ed 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEventPublisherAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEventPublisherAT.java @@ -3,6 +3,7 @@ import org.apache.http.HttpStatus; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import org.json.JSONObject; import org.junit.Assert; import org.junit.Test; import org.zalando.nakadi.domain.Subscription; @@ -12,7 +13,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import static com.jayway.restassured.RestAssured.given; @@ -43,7 +43,7 @@ public void testNakadiAccessLogInAvro() throws Exception { // So the test is only looking for a valid event. Assert.assertEquals( NAKADI_ACCESS_LOG, - ((Map) event.get("metadata")).get("event_type")); + event.getJSONObject("metadata").get("event_type")); Assert.assertNotNull(event.get("method")); Assert.assertNotNull(event.get("path")); Assert.assertNotNull(event.get("query")); @@ -56,7 +56,7 @@ public void testNakadiAccessLogInAvro() throws Exception { Assert.assertNotNull(event.get("content_encoding")); Assert.assertNotNull(event.get("request_length")); Assert.assertNotNull(event.get("response_length")); - Assert.assertNull(event.get("random_key")); + Assert.assertFalse(event.has("random_key")); } @Test @@ -75,7 +75,7 @@ public void testNakadiSubscriptionLogInAvro() throws Exception { // So the test is only looking for a valid event. Assert.assertEquals( NAKADI_SUBSCRIPTION_LOG, - ((Map) event.get("metadata")).get("event_type")); + event.getJSONObject("metadata").get("event_type")); Assert.assertEquals("created", event.get("status")); Assert.assertNotNull(event.get("subscription_id")); } @@ -96,7 +96,7 @@ public void testNakadiEventTypeLogInAvro() throws Exception { // So the test is only looking for a valid event. Assert.assertEquals( NAKADI_EVENT_TYPE_LOG, - ((Map) event.get("metadata")).get("event_type")); + event.getJSONObject("metadata").get("event_type")); Assert.assertNotNull(event.get("event_type")); Assert.assertNotNull(event.get("status")); Assert.assertNotNull(event.get("category")); @@ -120,7 +120,7 @@ public void testNakadiBatchPublishedInAvro() throws Exception { // So the test is only looking for a valid event. Assert.assertEquals( NAKADI_BATCH_PUBLISHED, - ((Map) event.get("metadata")).get("event_type")); + event.getJSONObject("metadata").get("event_type")); Assert.assertNotNull(event.get("event_type")); Assert.assertNotNull(event.get("app")); Assert.assertNotNull(event.get("app_hashed")); @@ -145,7 +145,7 @@ public void testNakadiDataStreamedInAvro() throws Exception { final var event = events.get(0); // All acceptance tests are run against same instance, so the exact event that is consumed is unpredictable. // So the test is only looking for a valid event. - final var metadata = (Map) event.get("metadata"); + final var metadata = event.getJSONObject("metadata"); Assert.assertEquals(NAKADI_DATA_STREAMED, metadata.get("event_type")); Assert.assertNotNull(metadata.get("occurred_at")); Assert.assertNotNull(metadata.get("received_at")); @@ -162,7 +162,7 @@ public void testNakadiDataStreamedInAvro() throws Exception { Assert.assertNotNull(event.get("batches_streamed")); } - private List consumeEvent(final TestStreamingClient client) { + private List consumeEvent(final TestStreamingClient client) { TestUtils.waitFor(() -> MatcherAssert.assertThat( client.getJsonBatches().size(), Matchers.greaterThanOrEqualTo(1)), 10000); return client.getJsonBatches().get(0).getEvents(); diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java index af9f4a1082..5b46afe7d2 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; import com.jayway.restassured.RestAssured; @@ -12,6 +11,7 @@ import com.jayway.restassured.response.Header; import com.jayway.restassured.specification.RequestSpecification; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; @@ -29,7 +29,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import static java.util.stream.LongStream.rangeClosed; @@ -51,7 +50,6 @@ import static org.zalando.nakadi.utils.TestUtils.randomUUID; import static org.zalando.nakadi.utils.TestUtils.randomValidEventTypeName; import static org.zalando.nakadi.utils.TestUtils.waitFor; -import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; @@ -351,11 +349,11 @@ public void userJourneyHila() throws InterruptedException, IOException { final SubscriptionCursor cursor = new SubscriptionCursor("0", TestUtils.toTimelineOffset(i), eventTypeName, ""); final StreamBatch expectedBatch = new StreamBatch(cursor, - ImmutableList.of(ImmutableMap.of("foo", "bar" + i)), + new JSONArray().put(new JSONObject().put("foo", "bar" + i)), i == 0 ? new StreamMetadata("Stream started") : null); final StreamBatch batch = batches.get(i); - assertThat(batch, equalToBatchIgnoringToken(expectedBatch)); + assertThat(batch, StreamBatch.equalToBatchIgnoringToken(expectedBatch)); } // as we didn't commit, there should be still 4 unconsumed events @@ -450,8 +448,9 @@ public void userJourneyAvroTransition() throws InterruptedException, IOException // validate the events metadata for (final StreamBatch batch : batches) { - final Map metadata = (Map) batch.getEvents().get(0).get("metadata"); - assertThat(metadata.get("version"), equalTo(validatedWithJsonSchemaVersion)); + assertThat( + batch.getEvents().get(0).getJSONObject("metadata").getString("version"), + equalTo(validatedWithJsonSchemaVersion)); } // delete subscription diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 4603552897..86384c0f23 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -4,19 +4,19 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.http.HttpStatus; +import org.json.JSONObject; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.zalando.nakadi.annotations.validation.DeadLetterAnnotationValidator; import org.zalando.nakadi.config.JsonConfig; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.domain.SubscriptionBase; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; +import org.zalando.nakadi.domain.UnprocessableEventPolicy; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.util.ThreadUtils; import org.zalando.nakadi.utils.JsonTestHelper; @@ -49,16 +49,18 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.zalando.nakadi.annotations.validation.DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT; +import static org.zalando.nakadi.annotations.validation.DeadLetterAnnotationValidator. + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY; import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.END; import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.AUTO; import static org.zalando.nakadi.domain.SubscriptionEventTypeStats.Partition.AssignmentType.DIRECT; import static org.zalando.nakadi.utils.TestUtils.waitFor; -import static org.zalando.nakadi.webservice.hila.StreamBatch.MatcherIgnoringToken.equalToBatchIgnoringToken; -import static org.zalando.nakadi.webservice.hila.StreamBatch.singleEventBatch; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.commitCursors; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscriptionForEventType; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.getNumberOfAssignedStreams; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent; @@ -151,11 +153,16 @@ public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() .create(URL, subscription.getId(), "stream_limit=2") .start(); waitFor(() -> assertThat(client.getJsonBatches(), hasSize(2))); - assertThat(client.getJsonBatches().get(0), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000000", eventType.getName(), ImmutableMap.of("foo", "bar0"), - "Stream started"))); - assertThat(client.getJsonBatches().get(1), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000001", eventType.getName(), ImmutableMap.of("foo", "bar1")))); + assertThat( + client.getJsonBatches().get(0), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.singleEventBatch("0", "001-0001-000000000000000000", eventType.getName(), + new JSONObject().put("foo", "bar0"), "Stream started"))); + assertThat( + client.getJsonBatches().get(1), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.singleEventBatch("0", "001-0001-000000000000000001", eventType.getName(), + new JSONObject().put("foo", "bar1")))); // commit offset that will also trigger session closing as we reached stream_limit and committed commitCursors(subscription.getId(), ImmutableList.of(client.getJsonBatches().get(1).getCursor()), @@ -167,14 +174,18 @@ public void whenOffsetIsCommittedNextSessionStartsFromNextEventAfterCommitted() waitFor(() -> assertThat(client.getJsonBatches(), hasSize(2))); // check that we have read the next two events with correct offsets - assertThat(client.getJsonBatches().get(0), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000002", eventType.getName(), - ImmutableMap.of("foo", "bar2"), "Stream started"))); - assertThat(client.getJsonBatches().get(1), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000003", eventType.getName(), ImmutableMap.of("foo", "bar3")))); + assertThat( + client.getJsonBatches().get(0), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.singleEventBatch("0", "001-0001-000000000000000002", eventType.getName(), + new JSONObject().put("foo", "bar2"), "Stream started"))); + assertThat( + client.getJsonBatches().get(1), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.singleEventBatch("0", "001-0001-000000000000000003", eventType.getName(), + new JSONObject().put("foo", "bar3")))); } - @Test(timeout = 5000) public void whenNoEventsThenFirstOffsetIsBEGIN() { final TestStreamingClient client = TestStreamingClient @@ -251,8 +262,11 @@ public void whenCommitTimeoutReachedSessionIsClosed() { waitFor(() -> assertThat(client.getJsonBatches(), hasSize(2)), 10000); waitFor(() -> assertThat(client.isRunning(), is(false)), 10000); - assertThat(client.getJsonBatches().get(1), equalToBatchIgnoringToken(singleEventBatch("0", - "001-0001-000000000000000000", eventType.getName(), ImmutableMap.of(), "Commit timeout reached"))); + assertThat( + client.getJsonBatches().get(1), + StreamBatch.equalToBatchIgnoringToken( + StreamBatch.emptyBatch("0", "001-0001-000000000000000000", eventType.getName(), + "Commit timeout reached"))); } @Test(timeout = 15000) @@ -605,7 +619,7 @@ public void whenPatchThenCursorsAreInitializedAndPatched() throws Exception { @Test(timeout = 15000) public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEventSkipped() throws IOException { - final Subscription subscription = createAutoDLQSubscription(eventType); + final Subscription subscription = createAutoDLQSubscription(eventType, UnprocessableEventPolicy.SKIP_EVENT); final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "batch_limit=3&commit_timeout=1") .start(); @@ -646,7 +660,7 @@ public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEvent @Test(timeout = 15000) public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws IOException { - final Subscription subscription = createAutoDLQSubscription(eventType); + final Subscription subscription = createAutoDLQSubscription(eventType, UnprocessableEventPolicy.SKIP_EVENT); final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1") .start(); @@ -712,7 +726,7 @@ public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws @Test(timeout = 20_000) public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBatchSize() throws InterruptedException, IOException { - final Subscription subscription = createAutoDLQSubscription(eventType); + final Subscription subscription = createAutoDLQSubscription(eventType, UnprocessableEventPolicy.SKIP_EVENT); final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1&stream_limit=20") .start(); @@ -753,13 +767,29 @@ public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBa } @Test(timeout = 20_000) - public void shouldSkipDeadLetterdAndConsumptionToBeContinued() throws IOException { + public void shouldSkipPoisonPillAndDeadLetterFoundInTheQueueLater() throws IOException, InterruptedException { + final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(4); publishBusinessEventWithUserDefinedPartition(eventType.getName(), - 50, i -> String.format("{\"foo\":\"bar%d\"}", i), i -> String.valueOf(i % 4)); + 50, i -> String.format("bar%d", i), i -> String.valueOf(i % 4)); + + final String poisonPillValue = "bar10"; - final Subscription subscription = createAutoDLQSubscription(eventType); + final Subscription subscription = + createAutoDLQSubscription(eventType, UnprocessableEventPolicy.DEAD_LETTER_QUEUE); + + // start looking for events in the DLQ store event type already (reading from END) + final Subscription dlqStoreEventTypeSub = createSubscriptionForEventType("nakadi.dead.letter.queue"); + final TestStreamingClient dlqStoreClient = TestStreamingClient.create(URL, + dlqStoreEventTypeSub.getId(), "batch_limit=1&stream_timeout=15"); + dlqStoreClient.startWithAutocommit(batches -> + Assert.assertTrue("failed event should be found in the dead letter queue", + batches.stream() + .flatMap(b -> b.getEvents().stream()) + .anyMatch(e -> + subscription.getId().equals(e.get("subscription_id")) && + poisonPillValue.equals(e.getJSONObject("event").getString("foo"))))); final AtomicReference cursorWithPoisonPill = new AtomicReference<>(); while (true) { @@ -767,8 +797,7 @@ public void shouldSkipDeadLetterdAndConsumptionToBeContinued() throws IOExceptio URL, subscription.getId(), "batch_limit=3&commit_timeout=1&stream_timeout=2"); client.start(streamBatch -> { if (streamBatch.getEvents().stream() - .anyMatch(event -> event.get("foo").equals("{\"foo\":\"bar10\"}"))) { - // skipp commit to introduce poison pill + .anyMatch(event -> poisonPillValue.equals(event.getString("foo")))) { cursorWithPoisonPill.set(streamBatch.getCursor()); throw new RuntimeException("poison pill found"); } else { @@ -784,13 +813,15 @@ public void shouldSkipDeadLetterdAndConsumptionToBeContinued() throws IOExceptio waitFor(() -> Assert.assertFalse(client.isRunning())); if (client.getJsonBatches().stream() - .filter(streamBatch -> streamBatch.getCursor().getPartition() - .equals(cursorWithPoisonPill.get().getPartition())) + .filter(streamBatch -> cursorWithPoisonPill.get().getPartition() + .equals(streamBatch.getCursor().getPartition())) .anyMatch(streamBatch -> streamBatch.getCursor().getOffset() .compareTo(cursorWithPoisonPill.get().getOffset()) > 0)) { - return; + break; } } + + waitFor(() -> Assert.assertFalse(dlqStoreClient.isRunning())); } private static boolean isCommitTimeoutReached(final TestStreamingClient client) { @@ -799,12 +830,16 @@ private static boolean isCommitTimeoutReached(final TestStreamingClient client) .anyMatch(batch -> batch.getMetadata().getDebug().equals("Commit timeout reached")); } - private Subscription createAutoDLQSubscription(final EventType eventType) throws IOException { + private Subscription createAutoDLQSubscription(final EventType eventType, + final UnprocessableEventPolicy unprocessableEventPolicy) throws IOException { + final SubscriptionBase subscription = RandomSubscriptionBuilder.builder() .withEventType(eventType.getName()) .withStartFrom(BEGIN) .buildSubscriptionBase(); - subscription.setAnnotations(Map.of(DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3")); + subscription.setAnnotations(Map.of( + SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3", + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, unprocessableEventPolicy.toString())); return createSubscription(subscription); } } diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/StreamBatch.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/StreamBatch.java index 3a9ff59081..874b6ac686 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/StreamBatch.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/StreamBatch.java @@ -1,21 +1,30 @@ package org.zalando.nakadi.webservice.hila; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; import org.apache.commons.lang3.StringUtils; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.json.JSONArray; +import org.json.JSONObject; import org.zalando.nakadi.domain.StreamMetadata; import org.zalando.nakadi.repository.kafka.KafkaTestHelper; import org.zalando.nakadi.view.SubscriptionCursor; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; -import static java.util.Collections.unmodifiableList; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.isA; +import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONObjectAs; @Immutable public class StreamBatch { @@ -23,14 +32,25 @@ public class StreamBatch { private static final String DUMMY_TOKEN = "dummy-token"; private final SubscriptionCursor cursor; - private final List events; + private final List events; private final StreamMetadata metadata; + // used for reading from string with object mapper public StreamBatch(@JsonProperty("cursor") final SubscriptionCursor cursor, - @Nullable @JsonProperty("events") final List events, + @Nullable @JsonProperty("events") final List> events, @Nullable @JsonProperty("info") final StreamMetadata metadata) { this.cursor = cursor; - this.events = Optional.ofNullable(events).orElse(ImmutableList.of()); + this.events = Optional.ofNullable(events) + .map(evs -> evs.stream().map(JSONObject::new).collect(Collectors.toUnmodifiableList())) + .orElse(Collections.emptyList()); + this.metadata = metadata; + } + + public StreamBatch(final SubscriptionCursor cursor, final JSONArray eventsArray, final StreamMetadata metadata) { + this.cursor = cursor; + this.events = IntStream.range(0, eventsArray.length()) + .mapToObj(i -> eventsArray.getJSONObject(i)) + .collect(Collectors.toUnmodifiableList()); this.metadata = metadata; } @@ -38,8 +58,8 @@ public SubscriptionCursor getCursor() { return cursor; } - public List getEvents() { - return unmodifiableList(events); + public List getEvents() { + return events; } @Override @@ -69,23 +89,44 @@ public StreamMetadata getMetadata() { return metadata; } + public static StreamBatch emptyBatch(final String partition, final String offset, final String eventType, + final String debugInfo) { + + final String paddedOffset = StringUtils.leftPad(offset, KafkaTestHelper.CURSOR_OFFSET_LENGTH, '0'); + return new StreamBatch( + new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), + new JSONArray(), + new StreamMetadata(debugInfo)); + } + public static StreamBatch singleEventBatch(final String partition, final String offset, final String eventType, - final Map event, final String metadata) { + final JSONObject event) { + final String paddedOffset = StringUtils.leftPad(offset, KafkaTestHelper.CURSOR_OFFSET_LENGTH, '0'); - if (event.isEmpty()) { - return new StreamBatch(new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), - ImmutableList.of(), new StreamMetadata(metadata)); - } else { - return new StreamBatch(new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), - ImmutableList.of(event), new StreamMetadata(metadata)); - } + return new StreamBatch( + new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), + new JSONArray().put(event), + null); } public static StreamBatch singleEventBatch(final String partition, final String offset, final String eventType, - final Map event) { + final JSONObject event, final String debugInfo) { + + final String paddedOffset = StringUtils.leftPad(offset, KafkaTestHelper.CURSOR_OFFSET_LENGTH, '0'); + return new StreamBatch( + new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), + new JSONArray().put(event), + new StreamMetadata(debugInfo)); + } + + private static StreamBatch singleEventBatch(final String partition, final String offset, final String eventType, + final JSONObject event, final StreamMetadata metadata) { + final String paddedOffset = StringUtils.leftPad(offset, KafkaTestHelper.CURSOR_OFFSET_LENGTH, '0'); - return new StreamBatch(new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), - ImmutableList.of(event), null); + return new StreamBatch( + new SubscriptionCursor(partition, paddedOffset, eventType, DUMMY_TOKEN), + new JSONArray().put(event), + metadata); } @Override @@ -97,38 +138,20 @@ public String toString() { '}'; } - public static class MatcherIgnoringToken extends BaseMatcher { - - private final StreamBatch batch; - - public MatcherIgnoringToken(final StreamBatch batch) { - this.batch = batch; - } - - @Override - public boolean matches(final Object item) { - if (!(item instanceof StreamBatch)) { - return false; - } - final StreamBatch batchTocheck = (StreamBatch) item; - final SubscriptionCursor cursor = batch.getCursor(); - final SubscriptionCursor cursorToCheck = batchTocheck.getCursor(); - - return batch.getEvents().equals(batchTocheck.getEvents()) && - cursor.getPartition().equals(cursorToCheck.getPartition()) && - cursor.getOffset().equals(cursorToCheck.getOffset()) && - cursor.getEventType().equals(cursorToCheck.getEventType()) && - Optional.ofNullable(batch.getMetadata()) - .map(b -> b.equals(batchTocheck.getMetadata())) - .orElse(batchTocheck.getMetadata() == null); - } - - @Override - public void describeTo(final Description description) { - } - - public static MatcherIgnoringToken equalToBatchIgnoringToken(final StreamBatch batch) { - return new MatcherIgnoringToken(batch); - } + public static Matcher equalToBatchIgnoringToken(final StreamBatch batch) { + final SubscriptionCursor cursor = batch.getCursor(); + final List events = batch.getEvents(); + return allOf( + isA(StreamBatch.class), + hasProperty("cursor", allOf( + hasProperty("partition", equalTo(cursor.getPartition())), + hasProperty("offset", equalTo(cursor.getOffset())), + hasProperty("eventType", equalTo(cursor.getEventType())))), + hasProperty("events", events.isEmpty() + ? empty() + : contains(events.stream() + .map(e -> sameJSONObjectAs(e)) + .collect(Collectors.toList()))), + hasProperty("metadata", equalTo(batch.getMetadata()))); } } diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 8d1f6f762c..42bc82b653 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -13,6 +13,7 @@ import org.zalando.nakadi.domain.HeaderTag; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.Subscription; +import org.zalando.nakadi.domain.UnprocessableEventPolicy; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException; @@ -26,6 +27,7 @@ import org.zalando.nakadi.service.EventStreamWriter; import org.zalando.nakadi.service.EventTypeChangeListener; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.subscription.autocommit.AutocommitSupport; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; @@ -36,6 +38,7 @@ import org.zalando.nakadi.service.subscription.zk.ZkSubscription; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; +import org.zalando.nakadi.util.UUIDGenerator; import java.io.Closeable; import java.io.IOException; @@ -90,6 +93,10 @@ public class StreamingContext implements SubscriptionStreamer { private final EventTypeCache eventTypeCache; private static final Logger LOG = LoggerFactory.getLogger(StreamingContext.class); private final Integer maxEventSendCount; + private final UnprocessableEventPolicy unprocessableEventPolicy; + private final String deadLetterQueueEventTypeName; + private final EventPublisher eventPublisher; + private final UUIDGenerator uuidGenerator; private StreamingContext(final Builder builder) { this.out = builder.out; @@ -117,11 +124,19 @@ private StreamingContext(final Builder builder) { this.kafkaRecordDeserializer = builder.kafkaRecordDeserializer; this.eventTypeCache = builder.eventTypeCache; this.featureToggleService = builder.featureToggleService; + this.deadLetterQueueEventTypeName = builder.deadLetterQueueEventTypeName; + this.eventPublisher = builder.eventPublisher; + this.uuidGenerator = builder.uuidGenerator; this.maxEventSendCount = Optional.ofNullable(getSubscription().getAnnotations()) .map(ans -> ans.get(DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT)) .map(Integer::valueOf) .orElse(null); + + this.unprocessableEventPolicy = Optional.ofNullable(getSubscription().getAnnotations()) + .map(ans -> ans.get(DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY)) + .map(UnprocessableEventPolicy::valueOf) + .orElse(null); } public ConsumptionKpiCollector getKpiCollector() { @@ -400,6 +415,22 @@ public Integer getMaxEventSendCount() { return this.maxEventSendCount; } + public UnprocessableEventPolicy getUnprocessableEventPolicy() { + return this.unprocessableEventPolicy; + } + + public String getDeadLetterQueueEventTypeName() { + return this.deadLetterQueueEventTypeName; + } + + public EventPublisher getEventPublisher() { + return this.eventPublisher; + } + + public UUIDGenerator getUuidGenerator() { + return this.uuidGenerator; + } + public static final class Builder { private SubscriptionOutput out; private StreamParameters parameters; @@ -425,6 +456,9 @@ public static final class Builder { private KafkaRecordDeserializer kafkaRecordDeserializer; private EventTypeCache eventTypeCache; private FeatureToggleService featureToggleService; + private String deadLetterQueueEventTypeName; + private EventPublisher eventPublisher; + private UUIDGenerator uuidGenerator; public Builder setEventTypeCache(final EventTypeCache eventTypeCache) { this.eventTypeCache = eventTypeCache; @@ -546,11 +580,23 @@ public Builder setFeatureToggleService(final FeatureToggleService featureToggleS return this; } - public StreamingContext build() { - return new StreamingContext(this); + public Builder setDeadLetterQueueEventTypeName(final String deadLetterQueueEventTypeName) { + this.deadLetterQueueEventTypeName = deadLetterQueueEventTypeName; + return this; } + public Builder setEventPublisher(final EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + return this; + } - } + public Builder setUuidGenerator(final UUIDGenerator uuidGenerator) { + this.uuidGenerator = uuidGenerator; + return this; + } + public StreamingContext build() { + return new StreamingContext(this); + } + } } diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java index e39efdce5b..e6b5eea192 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java @@ -21,10 +21,12 @@ import org.zalando.nakadi.service.EventTypeChangeListener; import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.NakadiCursorComparator; +import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; +import org.zalando.nakadi.util.UUIDGenerator; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -50,6 +52,9 @@ public class SubscriptionStreamerFactory { private final ConsumptionKpiCollectorFactory consumptionKpiCollectorFactory; private final KafkaRecordDeserializer kafkaRecordDeserializer; private final FeatureToggleService featureToggleService; + private final EventPublisher eventPublisher; + private final UUIDGenerator uuidGenerator ; + private final String deadLetterQueueEventTypeName; @Autowired public SubscriptionStreamerFactory( @@ -68,8 +73,10 @@ public SubscriptionStreamerFactory( @Value("${nakadi.subscription.maxStreamMemoryBytes}") final long streamMemoryLimitBytes, final ConsumptionKpiCollectorFactory consumptionKpiCollectorFactory, final KafkaRecordDeserializer kafkaRecordDeserializer, - final FeatureToggleService featureToggleService - ) { + final FeatureToggleService featureToggleService, + final EventPublisher eventPublisher, + @Value("${nakadi.dlq.storeEventTypeName}") final String deadLetterQueueEventTypeName, + final UUIDGenerator uuidGenerator) { this.timelineService = timelineService; this.cursorTokenService = cursorTokenService; this.objectMapper = objectMapper; @@ -86,6 +93,9 @@ public SubscriptionStreamerFactory( this.consumptionKpiCollectorFactory = consumptionKpiCollectorFactory; this.kafkaRecordDeserializer = kafkaRecordDeserializer; this.featureToggleService = featureToggleService; + this.eventPublisher = eventPublisher; + this.deadLetterQueueEventTypeName = deadLetterQueueEventTypeName; + this.uuidGenerator = uuidGenerator; } public SubscriptionStreamer build( @@ -98,7 +108,6 @@ public SubscriptionStreamer build( final ZkSubscriptionClient zkClient = zkClientFactory.createClient( subscription, streamParameters.commitTimeoutMillis); - // Create streaming context return new StreamingContext.Builder() .setOut(output) .setStreamMemoryLimitBytes(streamMemoryLimitBytes) @@ -125,7 +134,9 @@ public SubscriptionStreamer build( .setKafkaRecordDeserializer(kafkaRecordDeserializer) .setEventTypeCache(eventTypeCache) .setFeatureToggleService(featureToggleService) + .setEventPublisher(eventPublisher) + .setDeadLetterQueueEventTypeName(deadLetterQueueEventTypeName) + .setUuidGenerator(uuidGenerator) .build(); } - } diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index fd998e03ca..27dae8f4b8 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -3,13 +3,19 @@ import com.codahale.metrics.Meter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.json.JSONArray; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.EventPublishingStatus; +import org.zalando.nakadi.domain.EventPublishResult; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.UnprocessableEventPolicy; +import org.zalando.nakadi.exceptions.runtime.DeadLetterQueueStoreException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; @@ -26,6 +32,8 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -42,7 +50,6 @@ import static java.util.stream.Collectors.groupingBy; - class StreamingState extends State { private static final Logger LOG = LoggerFactory.getLogger(StreamingState.class); @@ -300,11 +307,16 @@ private void streamToOutput(final boolean streamTimeoutReached) { partition != null && partition.isLookingForDeadLetter() && partition.getFailedCommitsCount() >= getContext().getMaxEventSendCount()) { + final ConsumedEvent failedEvent = toSend.remove(0); - // todo: skip or to dlq + LOG.warn("Skipping event {} from partition {} due to failed commits count {}", failedEvent.getPosition(), etp, partition.getFailedCommitsCount()); + if (getContext().getUnprocessableEventPolicy() == UnprocessableEventPolicy.DEAD_LETTER_QUEUE) { + sendToDeadLetterQueue(failedEvent, partition.getFailedCommitsCount()); + } + getAutocommit().addSkippedEvent(failedEvent.getPosition()); this.addTask(() -> getAutocommit().autocommit()); @@ -370,6 +382,49 @@ private void streamToOutput(final boolean streamTimeoutReached) { } } + private void sendToDeadLetterQueue(final ConsumedEvent event, final int failedCommitsCount) { + // TODO: how do we handle AVRO here? + + final String failedEventString = new String(event.getEvent(), StandardCharsets.UTF_8); + final JSONObject failedEvent = new JSONObject(failedEventString); + + final JSONObject errorMessage = + new JSONObject() + .put("message", "skipped due to failed commits count: " + failedCommitsCount); + + final SubscriptionCursorWithoutToken cursor = + getContext().getCursorConverter().convertToNoToken(event.getPosition()); + + final JSONObject deadLetter = + new JSONObject() + .put("subscription_id", getContext().getSubscription().getId()) + .put("event_type", cursor.getEventType()) + .put("partition", cursor.getPartition()) + .put("offset", cursor.getOffset()) + .put("error", errorMessage) + .put("event", failedEvent) + .put("metadata", + new JSONObject() + .put("eid", getContext().getUuidGenerator().randomUUID().toString()) + .put("occurred_at", Instant.now().toString())); + + final JSONArray deadLetterBatch = + new JSONArray().put(deadLetter); + + final EventPublishResult result; + try { + result = getContext().getEventPublisher().publishInternal( + deadLetterBatch.toString(), getContext().getDeadLetterQueueEventTypeName(), null); + } catch (final Exception e) { + throw new DeadLetterQueueStoreException("Failed to store an event to the dead letter queue", e); + } + + if (result.getStatus() != EventPublishingStatus.SUBMITTED) { + throw new DeadLetterQueueStoreException("Failed to store an event to the dead letter queue: " + + result.getResponses().get(0).getDetail()); + } + } + private int getBatchLimitEvents() { return getParameters().batchLimitEvents; } @@ -838,5 +893,4 @@ private void trackIdleness(final ZkSubscriptionClient.Topology topology) { } } } - } diff --git a/app/src/main/java/org/zalando/nakadi/service/NakadiDeadLetterQueueInitialization.java b/app/src/main/java/org/zalando/nakadi/service/NakadiDeadLetterQueueInitialization.java new file mode 100644 index 0000000000..f0a0b67dad --- /dev/null +++ b/app/src/main/java/org/zalando/nakadi/service/NakadiDeadLetterQueueInitialization.java @@ -0,0 +1,74 @@ +package org.zalando.nakadi.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +@Component +@ConfigurationProperties("nakadi.dlq") +public class NakadiDeadLetterQueueInitialization { + private static final Logger LOG = LoggerFactory.getLogger(NakadiDeadLetterQueueInitialization.class); + + private final SystemEventTypeInitializer systemEventTypeInitializer; + + private String storeEventTypeName; + private String owningApplication; + private String authDataType; + private String authValue; + + @Autowired + public NakadiDeadLetterQueueInitialization(final SystemEventTypeInitializer systemEventTypeInitializer) { + this.systemEventTypeInitializer = systemEventTypeInitializer; + } + + @EventListener + public void onApplicationEvent(final ContextRefreshedEvent event) throws IOException { + final Map replacements = new HashMap<>(); + replacements.put("store_event_type_name_placeholder", storeEventTypeName); + replacements.put("owning_application_placeholder", owningApplication); + replacements.put("auth_data_type_placeholder", authDataType); + replacements.put("auth_value_placeholder", authValue); + + systemEventTypeInitializer.createEventTypesFromResource("dead_letter_queue_event_types.json", replacements); + } + + public String getStoreEventTypeName() { + return storeEventTypeName; + } + + public void setStoreEventTypeName(final String storeEventTypeName) { + this.storeEventTypeName = storeEventTypeName; + } + + public String getOwningApplication() { + return owningApplication; + } + + public void setOwningApplication(final String owningApplication) { + this.owningApplication = owningApplication; + } + + public String getAuthDataType() { + return authDataType; + } + + public void setAuthDataType(final String authDataType) { + this.authDataType = authDataType; + } + + public String getAuthValue() { + return authValue; + } + + public void setAuthValue(final String authValue) { + this.authValue = authValue; + } +} diff --git a/app/src/main/resources/application.yml b/app/src/main/resources/application.yml index 292913dbb2..82da1d2afe 100644 --- a/app/src/main/resources/application.yml +++ b/app/src/main/resources/application.yml @@ -139,6 +139,11 @@ nakadi: nakadiDataStreamed: "nakadi.data.streamed" owning_application: "stups_nakadi" hasher.salt: "salt" + dlq: + storeEventTypeName: "nakadi.dead.letter.queue" + owningApplication: "stups_nakadi" + authDataType: "*" + authValue: "*" twintip: mapping: /api diff --git a/app/src/main/resources/dead_letter_queue_event_types.json b/app/src/main/resources/dead_letter_queue_event_types.json new file mode 100644 index 0000000000..b5aa93c54a --- /dev/null +++ b/app/src/main/resources/dead_letter_queue_event_types.json @@ -0,0 +1,49 @@ +[ + { + "name": "store_event_type_name_placeholder", + "owning_application": "owning_application_placeholder", + "category": "business", + "enrichment_strategies": [ + "metadata_enrichment" + ], + "partition_strategy": "random", + "cleanup_policy": "delete", + "ordering_key_fields": [], + "ordering_instance_ids": [], + "schema": { + "type": "json_schema", + "schema": "{\"type\": \"object\", \"additionalProperties\": false, \"properties\": {\"subscription_id\": {\"type\": \"string\"}, \"event_type\": {\"type\": \"string\"}, \"partition\": {\"type\": \"string\"}, \"offset\": {\"type\": \"string\"}, \"error\": {\"type\": \"object\", \"properties\": {\"message\": {\"type\": \"string\"}}, \"required\": [\"message\"]}, \"event\": {\"type\": \"object\", \"additionalProperties\": true}}, \"required\": [\"subscription_id\", \"event_type\", \"partition\", \"offset\", \"error\", \"event\"]}" + }, + "default_statistic": { + "messages_per_minute": 1, + "message_size": 1000, + "read_parallelism": 1, + "write_parallelism": 1 + }, + "options": { + "retention_time": 86400000 + }, + "compatibility_mode": "forward", + "audience": "component-internal", + "authorization": { + "admins": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "readers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "writers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ] + } + } +] diff --git a/core-common/src/main/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidator.java b/core-common/src/main/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidator.java index 9d151feb6a..52e754116b 100644 --- a/core-common/src/main/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidator.java +++ b/core-common/src/main/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidator.java @@ -1,5 +1,7 @@ package org.zalando.nakadi.annotations.validation; +import org.zalando.nakadi.domain.UnprocessableEventPolicy; + import javax.validation.ConstraintValidator; import javax.validation.ConstraintValidatorContext; import java.util.Map; @@ -10,20 +12,34 @@ public class DeadLetterAnnotationValidator implements public static final String SUBSCRIPTION_MAX_EVENT_SEND_COUNT = "nakadi.zalando.org/subscription-max-event-send-count"; + public static final String SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY = + "nakadi.zalando.org/subscription-unprocessable-event-policy"; + @Override public boolean isValid(final Map annotations, final ConstraintValidatorContext context) { if (annotations == null) { return true; } + if (!isValidMaxEventSendCount(annotations, context)) { + return false; + } + if (!isValidUnprocessableEventPolicy(annotations, context)) { + return false; + } + return true; + } - final String failedCommitCount = annotations.get(SUBSCRIPTION_MAX_EVENT_SEND_COUNT); - if (failedCommitCount == null) { + private boolean isValidMaxEventSendCount( + final Map annotations, final ConstraintValidatorContext context) { + + final String maxEventSendCount = annotations.get(SUBSCRIPTION_MAX_EVENT_SEND_COUNT); + if (maxEventSendCount == null) { return true; } final Integer commits; try { - commits = Integer.valueOf(failedCommitCount); + commits = Integer.valueOf(maxEventSendCount); } catch (final NumberFormatException e) { context.disableDefaultConstraintViolation(); context.buildConstraintViolationWithTemplate(SUBSCRIPTION_MAX_EVENT_SEND_COUNT + " must be an integer") @@ -39,7 +55,50 @@ public boolean isValid(final Map annotations, final ConstraintVa return false; } + final String unprocessableEventPolicy = annotations.get(SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY); + if (unprocessableEventPolicy == null) { + context.disableDefaultConstraintViolation(); + context + .buildConstraintViolationWithTemplate( + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY + " must be also set when setting " + + SUBSCRIPTION_MAX_EVENT_SEND_COUNT) + .addConstraintViolation(); + return false; + } + return true; } + private boolean isValidUnprocessableEventPolicy( + final Map annotations, final ConstraintValidatorContext context) { + + final String unprocessableEventPolicy = annotations.get(SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY); + if (unprocessableEventPolicy == null) { + return true; + } + + try { + UnprocessableEventPolicy.valueOf(unprocessableEventPolicy); + } catch (final IllegalArgumentException e) { + context.disableDefaultConstraintViolation(); + context + .buildConstraintViolationWithTemplate( + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY + " must be one of: SKIP_EVENT, DEAD_LETTER_QUEUE") + .addConstraintViolation(); + return false; + } + + final String maxEventSendCount = annotations.get(SUBSCRIPTION_MAX_EVENT_SEND_COUNT); + if (maxEventSendCount == null) { + context.disableDefaultConstraintViolation(); + context + .buildConstraintViolationWithTemplate( + SUBSCRIPTION_MAX_EVENT_SEND_COUNT + " must be also set when setting " + + SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY) + .addConstraintViolation(); + return false; + } + + return true; + } } diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/UnprocessableEventPolicy.java b/core-common/src/main/java/org/zalando/nakadi/domain/UnprocessableEventPolicy.java new file mode 100644 index 0000000000..8f07b5d68c --- /dev/null +++ b/core-common/src/main/java/org/zalando/nakadi/domain/UnprocessableEventPolicy.java @@ -0,0 +1,6 @@ +package org.zalando.nakadi.domain; + +public enum UnprocessableEventPolicy { + SKIP_EVENT, + DEAD_LETTER_QUEUE +} diff --git a/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/DeadLetterQueueStoreException.java b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/DeadLetterQueueStoreException.java new file mode 100644 index 0000000000..905dee2e6c --- /dev/null +++ b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/DeadLetterQueueStoreException.java @@ -0,0 +1,12 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class DeadLetterQueueStoreException extends RuntimeException { + + public DeadLetterQueueStoreException(final String msg) { + super(msg); + } + + public DeadLetterQueueStoreException(final String msg, final Throwable cause) { + super(msg, cause); + } +} diff --git a/core-common/src/test/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidatorTest.java b/core-common/src/test/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidatorTest.java index 6e7fd9896a..62b6555ce3 100644 --- a/core-common/src/test/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidatorTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/annotations/validation/DeadLetterAnnotationValidatorTest.java @@ -1,14 +1,145 @@ package org.zalando.nakadi.annotations.validation; +import org.junit.Before; import org.junit.Assert; import org.junit.Test; +import javax.validation.ConstraintViolation; +import javax.validation.Valid; +import javax.validation.Validation; +import javax.validation.Validator; +import java.util.Collections; +import java.util.Map; + public class DeadLetterAnnotationValidatorTest { + public static class TestClass { + @Valid + @DeadLetterValidAnnotations + private final Map< + @Valid @AnnotationKey String, + @Valid @AnnotationValue String> annotations; + + public TestClass(final Map annotations) { + this.annotations = annotations; + } + } + + private Validator validator; + + @Before + public void prepareValidator() { + validator = Validation.buildDefaultValidatorFactory().getValidator(); + } + + @Test + public void whenEmptyAnnotationsThenOK() { + Assert.assertTrue("No annotations is OK", + validator.validate(new TestClass(null)).isEmpty()); + + Assert.assertTrue("Empty annotations is OK", + validator.validate(new TestClass(Collections.emptyMap())).isEmpty()); + } + + @Test + public void whenValidMaxEventSendCountThenOK() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3", + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT"); + + Assert.assertTrue("Valid max event send count is OK", + validator.validate(new TestClass(annotations)).isEmpty()); + } + + @Test + public void whenInvalidMaxEventSendCountThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "not-a-number", + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT"); + + Assert.assertTrue("Invalid max event send count is rejected", + validator.validate(new TestClass(annotations)).stream().anyMatch( + r -> r.getMessage().contains( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT))); + } + + @Test + public void whenTooLowMaxEventSendCountThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "1", + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT"); + + Assert.assertTrue("Too low max event send count is rejected", + validator.validate(new TestClass(annotations)).stream().anyMatch( + r -> r.getMessage().contains( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT))); + } + + @Test + public void whenTooHighMaxEventSendCountThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "11", + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT"); + + Assert.assertTrue("Too high max event send count is rejected", + validator.validate(new TestClass(annotations)).stream().anyMatch( + r -> r.getMessage().contains( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT))); + } @Test - public void testIsValid() { - final DeadLetterAnnotationValidator deadLetterAnnotationValidator = new DeadLetterAnnotationValidator(); - Assert.assertTrue(deadLetterAnnotationValidator.isValid(null, null)); + public void whenExplicitlySetMaxEventSendCountSetAndNoPolicyThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3"); + + Assert.assertTrue("Unprocessable event policy must be set when setting max event send count", + validator.validate(new TestClass(annotations)).stream() + .map(ConstraintViolation::getMessage) + .anyMatch(m -> + m.contains(DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT) && + m.contains(DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY))); } -} \ No newline at end of file + @Test + public void whenSkipEventPolicyThenOK() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "SKIP_EVENT", + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3"); + + Assert.assertTrue("Skip event policy is OK", + validator.validate(new TestClass(annotations)).isEmpty()); + } + + @Test + public void whenDeadLetterQueuePolicyThenOK() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "DEAD_LETTER_QUEUE", + DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT, "3"); + + Assert.assertTrue("Dead letter queue policy is OK", + validator.validate(new TestClass(annotations)).isEmpty()); + } + + @Test + public void whenExplicitlySetPolicyAndNoMaxEventSendCountSetThenFail() { + final Map annotations = Map.of( + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "DEAD_LETTER_QUEUE"); + + Assert.assertTrue("Max event send count must be set when setting unprocessable event policy", + validator.validate(new TestClass(annotations)).stream() + .map(ConstraintViolation::getMessage) + .anyMatch(m -> + m.contains(DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY) && + m.contains(DeadLetterAnnotationValidator.SUBSCRIPTION_MAX_EVENT_SEND_COUNT))); + } + + @Test + public void whenUnknownPolicyThenFail() { + final Map annotations = + Map.of(DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY, "unknown"); + + Assert.assertTrue("Unknown unprocessable event policy is rejected", + validator.validate(new TestClass(annotations)).stream().anyMatch( + r -> r.getMessage().contains( + DeadLetterAnnotationValidator.SUBSCRIPTION_UNPROCESSABLE_EVENT_POLICY))); + } +} diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java index 5be8b4464b..a6ee6a7fd9 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java @@ -114,6 +114,21 @@ public EventPublishResult publish(final String events, return processInternal(events, eventTypeName, consumerTags, true, false); } + // no auth checks + public EventPublishResult publishInternal(final String events, + final String eventTypeName, + final Map consumerTags) + throws NoSuchEventTypeException, + InternalNakadiException, + EnrichmentException, + EventTypeTimeoutException, + AccessDeniedException, + PublishEventOwnershipException, + ServiceTemporarilyUnavailableException, + PartitioningException { + return processInternal(events, eventTypeName, consumerTags, false, false); + } + public EventPublishResult delete(final String events, final String eventTypeName) throws NoSuchEventTypeException, InternalNakadiException,