diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java index a3494088..20c47a4c 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java @@ -2,13 +2,12 @@ import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; +import java.time.Instant; import org.apache.avro.specific.SpecificRecord; -import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; -import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; /** * Transformer class for the deduplication mechanism on keys of a given topic. @@ -16,7 +15,7 @@ * @param The type of the value */ public class DedupKeyProcessor - implements Processor> { + implements Processor> { /** * Kstream context for this transformer. @@ -26,12 +25,12 @@ public class DedupKeyProcessor /** * Window store containing all the records seen on the given window. */ - private TimestampedKeyValueStore dedupTimestampedStore; + private WindowStore dedupWindowStore; /** * Window store name, initialized @ construction. */ - private final String dedupStoreName; + private final String windowStoreName; /** * Retention window for the statestore. Used for fetching data. @@ -41,52 +40,46 @@ public class DedupKeyProcessor /** * Constructor. * - * @param dedupStoreName The name of the constructor + * @param windowStoreName The name of the constructor * @param retentionWindowDuration The retentionWindow Duration */ - public DedupKeyProcessor(String dedupStoreName, Duration retentionWindowDuration) { - this.dedupStoreName = dedupStoreName; + public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuration) { + this.windowStoreName = windowStoreName; this.retentionWindowDuration = retentionWindowDuration; } @Override public void init(ProcessorContext> context) { processorContext = context; - - dedupTimestampedStore = this.processorContext.getStateStore(dedupStoreName); - - processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME, - currentTimestamp -> { - try (var iterator = dedupTimestampedStore.all()) { - while (iterator.hasNext()) { - var currentRecord = iterator.next(); - if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis() - < currentTimestamp) { - dedupTimestampedStore.delete(currentRecord.key); - } - } - } - }); + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); } @Override public void process(Record message) { - String key = message.key(); - try { - // Retrieve the matching key in the statestore and return null if found (signaling a duplicate) - if (dedupTimestampedStore.get(key) == null) { - // First time we see this record, store entry in the window store and forward the record to the output - dedupTimestampedStore.put(key, - ValueAndTimestamp.make(key, processorContext.currentStreamTimeMs())); + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch(message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.key().equals(currentKeyValue.value)) { + return; + } + } } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.key(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); } catch (Exception e) { processorContext.forward(ProcessingResult.wrapRecordFailure(e, message, - "Couldn't figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); + "Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); } } - } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java index 3df508f8..ce3e1dee 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java @@ -3,23 +3,23 @@ import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; +import java.time.Instant; import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; -import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; -import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; + /** - * Transformer class for the deduplication mechanism on keys of a given topic. + * Transformer class for the deduplication mechanism on predicate of a given topic. * * @param The type of the key * @param The type of the value */ public class DedupWithPredicateProcessor - implements Processor> { + implements Processor> { /** * Kstream context for this transformer. @@ -29,12 +29,12 @@ public class DedupWithPredicateProcessor /** * Window store containing all the records seen on the given window. */ - private TimestampedKeyValueStore dedupTimestampedStore; + private WindowStore dedupWindowStore; /** * Window store name, initialized @ construction. */ - private final String dedupStoreName; + private final String windowStoreName; /** * Retention window for the statestore. Used for fetching data. @@ -49,13 +49,13 @@ public class DedupWithPredicateProcessor /** * Constructor. * - * @param dedupStoreName Name of the deduplication state store + * @param windowStoreName Name of the deduplication state store * @param retentionWindowDuration Retention window duration * @param deduplicationKeyExtractor Deduplication function */ - public DedupWithPredicateProcessor(String dedupStoreName, Duration retentionWindowDuration, + public DedupWithPredicateProcessor(String windowStoreName, Duration retentionWindowDuration, Function deduplicationKeyExtractor) { - this.dedupStoreName = dedupStoreName; + this.windowStoreName = windowStoreName; this.retentionWindowDuration = retentionWindowDuration; this.deduplicationKeyExtractor = deduplicationKeyExtractor; } @@ -64,38 +64,37 @@ public DedupWithPredicateProcessor(String dedupStoreName, Duration retentionWind public void init(ProcessorContext> context) { this.processorContext = context; - dedupTimestampedStore = this.processorContext.getStateStore(dedupStoreName); - - processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME, - currentTimestamp -> { - try (var iterator = dedupTimestampedStore.all()) { - while (iterator.hasNext()) { - var currentRecord = iterator.next(); - if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis() - < currentTimestamp) { - dedupTimestampedStore.delete(currentRecord.key); - } - } - } - }); + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); } @Override public void process(Record message) { - try { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); String identifier = deduplicationKeyExtractor.apply(message.value()); - // Retrieve the matching identifier in the statestore and return null if found it (signaling a duplicate) - if (dedupTimestampedStore.get(identifier) == null) { - // First time we see this record, store entry in the window store and forward the record to the output - dedupTimestampedStore.put(identifier, - ValueAndTimestamp.make(message.value(), message.timestamp())); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch(identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) { + return; + } + } } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, message.value(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + } catch (Exception e) { processorContext.forward(ProcessingResult.wrapRecordFailure(e, message, - "Couldn't figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); + "Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); } } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java index 3924c491..cb6460a2 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java @@ -11,7 +11,6 @@ import org.apache.kafka.streams.kstream.Repartitioned; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.WindowStore; /** @@ -69,18 +68,16 @@ public static KStream> StreamsBuilder streamsBuilder, KStream initialStream, String storeName, String repartitionName, Duration windowDuration) { - StoreBuilder> dedupStore = - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(), - Serdes.String()); - streamsBuilder.addStateStore(dedupStore); - + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), Serdes.String()); + streamsBuilder.addStateStore(dedupWindowStore); var repartitioned = initialStream.repartition( - Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()) - .withName(repartitionName)); + Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()) + .withName(repartitionName)); return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration), - storeName); + storeName); } /** @@ -192,11 +189,11 @@ public static KStream> StreamsBuilder streamsBuilder, KStream initialStream, String storeName, String repartitionName, Duration windowDuration, Function deduplicationKeyExtractor) { - StoreBuilder> dedupStore = - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(), - SerdesUtils.getSerdesForValue()); - streamsBuilder.addStateStore(dedupStore); + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), SerdesUtils.getSerdesForValue()); + streamsBuilder.addStateStore(dedupWindowStore); var repartitioned = initialStream.repartition( Repartitioned.with(Serdes.String(), SerdesUtils.getSerdesForValue()) diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java index 2dad34bd..aa26546f 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java @@ -1,45 +1,111 @@ package com.michelin.kstreamplify.deduplication; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) class DedupKeyProcessorTest { + private DedupKeyProcessor processor; + @Mock private ProcessorContext> context; @Mock - private TimestampedKeyValueStore dedupTimestampedStore; + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupKeyProcessor<>("testStore", Duration.ofHours(1)); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + @Test void shouldProcessNewRecord() { - String key = "some-key"; - when(context.getStateStore("dedupStoreName")).thenReturn(dedupTimestampedStore); - when(dedupTimestampedStore.get(key)).thenReturn(null); + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); + // Create a test record + final Record record = new Record<>("key", kafkaError, 0); - DedupKeyProcessor dedupKeyProcessor = new DedupKeyProcessor<>("dedupStoreName", Duration.ZERO); - dedupKeyProcessor.init(context); + processor.process(record); - KafkaError value = new KafkaError(); - Record record = new Record<>(key, value, 0); - dedupKeyProcessor.process(record); + verify(windowStore).put("key", "key", record.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(record.value()))); - verify(dedupTimestampedStore).put(eq(key), any()); } + + @Test + void shouldProcessDuplicate() { + + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); + // Create a test record + final Record record = new Record<>("key", kafkaError, 0L); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, "key")); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(record); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); + // Create a test record + Record record = new Record<>("key", new KafkaError(), 0L); + + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + // Call the process method + processor.process(record); + + verify(context).forward(argThat(arg -> arg.value().getError().getContextMessage() + .equals("Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } + } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java index faa070d2..33f79694 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java @@ -1,43 +1,113 @@ package com.michelin.kstreamplify.deduplication; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) class DedupKeyValueProcessorTest { + private DedupKeyValueProcessor processor; + @Mock private ProcessorContext> context; @Mock private WindowStore windowStore; + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupKeyValueProcessor<>("testStore", Duration.ofHours(1)); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test void shouldProcessNewRecord() { - String key = "some-key"; - KafkaError value = new KafkaError(); - Record record = new Record<>(key, value, 0); + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); - when(context.getStateStore("dedupStoreName")).thenReturn(windowStore); + // Create a test record + final Record record = new Record<>("key", kafkaError, 0); - DedupKeyValueProcessor dedupKeyValueProcessor = new DedupKeyValueProcessor<>("dedupStoreName", - Duration.ZERO); - dedupKeyValueProcessor.init(context); - dedupKeyValueProcessor.process(record); + processor.process(record); verify(windowStore).put(record.key(), record.value(), record.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(record.value()))); + + } + + @Test + void shouldProcessDuplicate() { + + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); + + // Create a test record + final Record record = new Record<>("key", kafkaError, 0L); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, kafkaError)); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(record); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); + + // Create a test record + final Record record = new Record<>("key", kafkaError, 0L); + + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + // Call the process method + processor.process(record); + + verify(context).forward(argThat(arg -> arg.value().getError().getContextMessage() + .equals("Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java index a80bd38f..4da364e3 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -1,8 +1,9 @@ package com.michelin.kstreamplify.deduplication; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -12,10 +13,11 @@ import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; -import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -31,7 +33,10 @@ class DedupWithPredicateProcessorTest { private ProcessorContext> context; @Mock - private TimestampedKeyValueStore store; + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; @BeforeEach void setUp() { @@ -39,55 +44,69 @@ void setUp() { processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract); // Stub the context.getStateStore method to return the mock store - when(context.getStateStore("testStore")).thenReturn(store); + when(context.getStateStore("testStore")).thenReturn(windowStore); processor.init(context); } @Test - void shouldProcessFirstTime() { - // Create a test record - Record record = new Record<>("key", new KafkaError(), 0L); + void shouldProcessNewRecord() { - // Stub store.get to return null, indicating it's the first time - when(store.get(any())).thenReturn(null); + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); + + // Create a test record + final Record record = new Record<>("key", kafkaError, 0); - // Call the process method processor.process(record); - verify(store).put(eq(""), argThat(arg -> arg.value().equals(record.value()))); + verify(windowStore).put("", record.value(), record.timestamp()); verify(context).forward(argThat(arg -> arg.value().getValue().equals(record.value()))); + } @Test void shouldProcessDuplicate() { + + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); // Create a test record - Record record = new Record<>("key", new KafkaError(), 0L); + final Record record = new Record<>("key", kafkaError, 0L); - // Stub store.get to return a value, indicating a duplicate - when(store.get("")).thenReturn(ValueAndTimestamp.make(new KafkaError(), 0L)); + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, kafkaError)); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); // Call the process method processor.process(record); - verify(store, never()).put(any(), any()); + verify(windowStore, never()).put(anyString(), any(), anyLong()); verify(context, never()).forward(any()); } @Test void shouldThrowException() { + + // Create a KafkaError + final KafkaError kafkaError = new KafkaError(); // Create a test record Record record = new Record<>("key", new KafkaError(), 0L); - when(store.get(any())).thenReturn(null); - doThrow(new RuntimeException("Exception...")).when(store).put(any(), any()); + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); // Call the process method processor.process(record); verify(context).forward(argThat(arg -> arg.value().getError().getContextMessage() - .equals("Couldn't figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); + .equals("Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); } public static class TestKeyExtractor {