Skip to content

Commit

Permalink
Use window stores in deduplication processors (#134)
Browse files Browse the repository at this point in the history
* Deduplication improvement with WindowStore #69

Add a WindowStore to manage deduplication for DedupKeyProcessor and DedupWithPredicateProcessor

* fix the Predicate search #69

* remove unused imports
  • Loading branch information
sebastienviale authored Dec 21, 2023
1 parent ffc863a commit f631993
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@

import com.michelin.kstreamplify.error.ProcessingResult;
import java.time.Duration;
import java.time.Instant;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;

/**
* Transformer class for the deduplication mechanism on keys of a given topic.
*
* @param <V> The type of the value
*/
public class DedupKeyProcessor<V extends SpecificRecord>
implements Processor<String, V, String, ProcessingResult<V, V>> {
implements Processor<String, V, String, ProcessingResult<V, V>> {

/**
* Kstream context for this transformer.
Expand All @@ -26,12 +25,12 @@ public class DedupKeyProcessor<V extends SpecificRecord>
/**
* Window store containing all the records seen on the given window.
*/
private TimestampedKeyValueStore<String, String> dedupTimestampedStore;
private WindowStore<String, String> dedupWindowStore;

/**
* Window store name, initialized @ construction.
*/
private final String dedupStoreName;
private final String windowStoreName;

/**
* Retention window for the statestore. Used for fetching data.
Expand All @@ -41,52 +40,46 @@ public class DedupKeyProcessor<V extends SpecificRecord>
/**
* Constructor.
*
* @param dedupStoreName The name of the constructor
* @param windowStoreName The name of the constructor
* @param retentionWindowDuration The retentionWindow Duration
*/
public DedupKeyProcessor(String dedupStoreName, Duration retentionWindowDuration) {
this.dedupStoreName = dedupStoreName;
public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuration) {
this.windowStoreName = windowStoreName;
this.retentionWindowDuration = retentionWindowDuration;
}

@Override
public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
processorContext = context;

dedupTimestampedStore = this.processorContext.getStateStore(dedupStoreName);

processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME,
currentTimestamp -> {
try (var iterator = dedupTimestampedStore.all()) {
while (iterator.hasNext()) {
var currentRecord = iterator.next();
if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis()
< currentTimestamp) {
dedupTimestampedStore.delete(currentRecord.key);
}
}
}
});
dedupWindowStore = this.processorContext.getStateStore(windowStoreName);
}

@Override
public void process(Record<String, V> message) {
String key = message.key();
try {
// Retrieve the matching key in the statestore and return null if found (signaling a duplicate)
if (dedupTimestampedStore.get(key) == null) {

// First time we see this record, store entry in the window store and forward the record to the output
dedupTimestampedStore.put(key,
ValueAndTimestamp.make(key, processorContext.currentStreamTimeMs()));
try {
// Get the record timestamp
var currentInstant = Instant.ofEpochMilli(message.timestamp());

processorContext.forward(ProcessingResult.wrapRecordSuccess(message));
// Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate)
try (var resultIterator = dedupWindowStore.backwardFetch(message.key(),
currentInstant.minus(retentionWindowDuration),
currentInstant.plus(retentionWindowDuration))) {
while (resultIterator != null && resultIterator.hasNext()) {
var currentKeyValue = resultIterator.next();
if (message.key().equals(currentKeyValue.value)) {
return;
}
}
}

// First time we see this record, store entry in the window store and forward the record to the output
dedupWindowStore.put(message.key(), message.key(), message.timestamp());
processorContext.forward(ProcessingResult.wrapRecordSuccess(message));
} catch (Exception e) {
processorContext.forward(ProcessingResult.wrapRecordFailure(e, message,
"Couldn't figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
"Couldn't figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@

import com.michelin.kstreamplify.error.ProcessingResult;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;


/**
* Transformer class for the deduplication mechanism on keys of a given topic.
* Transformer class for the deduplication mechanism on predicate of a given topic.
*
* @param <K> The type of the key
* @param <V> The type of the value
*/
public class DedupWithPredicateProcessor<K, V extends SpecificRecord>
implements Processor<K, V, K, ProcessingResult<V, V>> {
implements Processor<K, V, K, ProcessingResult<V, V>> {

/**
* Kstream context for this transformer.
Expand All @@ -29,12 +29,12 @@ public class DedupWithPredicateProcessor<K, V extends SpecificRecord>
/**
* Window store containing all the records seen on the given window.
*/
private TimestampedKeyValueStore<String, V> dedupTimestampedStore;
private WindowStore<String, V> dedupWindowStore;

/**
* Window store name, initialized @ construction.
*/
private final String dedupStoreName;
private final String windowStoreName;

/**
* Retention window for the statestore. Used for fetching data.
Expand All @@ -49,13 +49,13 @@ public class DedupWithPredicateProcessor<K, V extends SpecificRecord>
/**
* Constructor.
*
* @param dedupStoreName Name of the deduplication state store
* @param windowStoreName Name of the deduplication state store
* @param retentionWindowDuration Retention window duration
* @param deduplicationKeyExtractor Deduplication function
*/
public DedupWithPredicateProcessor(String dedupStoreName, Duration retentionWindowDuration,
public DedupWithPredicateProcessor(String windowStoreName, Duration retentionWindowDuration,
Function<V, String> deduplicationKeyExtractor) {
this.dedupStoreName = dedupStoreName;
this.windowStoreName = windowStoreName;
this.retentionWindowDuration = retentionWindowDuration;
this.deduplicationKeyExtractor = deduplicationKeyExtractor;
}
Expand All @@ -64,38 +64,37 @@ public DedupWithPredicateProcessor(String dedupStoreName, Duration retentionWind
public void init(ProcessorContext<K, ProcessingResult<V, V>> context) {
this.processorContext = context;

dedupTimestampedStore = this.processorContext.getStateStore(dedupStoreName);

processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME,
currentTimestamp -> {
try (var iterator = dedupTimestampedStore.all()) {
while (iterator.hasNext()) {
var currentRecord = iterator.next();
if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis()
< currentTimestamp) {
dedupTimestampedStore.delete(currentRecord.key);
}
}
}
});
dedupWindowStore = this.processorContext.getStateStore(windowStoreName);
}

@Override
public void process(Record<K, V> message) {
try {

try {
// Get the record timestamp
var currentInstant = Instant.ofEpochMilli(message.timestamp());
String identifier = deduplicationKeyExtractor.apply(message.value());
// Retrieve the matching identifier in the statestore and return null if found it (signaling a duplicate)
if (dedupTimestampedStore.get(identifier) == null) {
// First time we see this record, store entry in the window store and forward the record to the output
dedupTimestampedStore.put(identifier,
ValueAndTimestamp.make(message.value(), message.timestamp()));
processorContext.forward(ProcessingResult.wrapRecordSuccess(message));

// Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate)
try (var resultIterator = dedupWindowStore.backwardFetch(identifier,
currentInstant.minus(retentionWindowDuration),
currentInstant.plus(retentionWindowDuration))) {
while (resultIterator != null && resultIterator.hasNext()) {
var currentKeyValue = resultIterator.next();
if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) {
return;
}
}
}

// First time we see this record, store entry in the window store and forward the record to the output
dedupWindowStore.put(identifier, message.value(), message.timestamp());
processorContext.forward(ProcessingResult.wrapRecordSuccess(message));

} catch (Exception e) {
processorContext.forward(ProcessingResult.wrapRecordFailure(e, message,
"Couldn't figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
"Couldn't figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;

/**
Expand Down Expand Up @@ -69,18 +68,16 @@ public static <V extends SpecificRecord> KStream<String, ProcessingResult<V, V>>
StreamsBuilder streamsBuilder, KStream<String, V> initialStream, String storeName,
String repartitionName, Duration windowDuration) {

StoreBuilder<TimestampedKeyValueStore<String, String>> dedupStore =
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(),
Serdes.String());
streamsBuilder.addStateStore(dedupStore);

StoreBuilder<WindowStore<String, String>> dedupWindowStore = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false),
Serdes.String(), Serdes.String());
streamsBuilder.addStateStore(dedupWindowStore);

var repartitioned = initialStream.repartition(
Repartitioned.with(Serdes.String(), SerdesUtils.<V>getSerdesForValue())
.withName(repartitionName));
Repartitioned.with(Serdes.String(), SerdesUtils.<V>getSerdesForValue())
.withName(repartitionName));
return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration),
storeName);
storeName);
}

/**
Expand Down Expand Up @@ -192,11 +189,11 @@ public static <V extends SpecificRecord> KStream<String, ProcessingResult<V, V>>
StreamsBuilder streamsBuilder, KStream<String, V> initialStream, String storeName,
String repartitionName, Duration windowDuration,
Function<V, String> deduplicationKeyExtractor) {
StoreBuilder<TimestampedKeyValueStore<String, V>> dedupStore =
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(),
SerdesUtils.getSerdesForValue());
streamsBuilder.addStateStore(dedupStore);

StoreBuilder<WindowStore<String, V>> dedupWindowStore = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false),
Serdes.String(), SerdesUtils.getSerdesForValue());
streamsBuilder.addStateStore(dedupWindowStore);

var repartitioned = initialStream.repartition(
Repartitioned.with(Serdes.String(), SerdesUtils.<V>getSerdesForValue())
Expand Down
Loading

0 comments on commit f631993

Please sign in to comment.