From b0a393228656d34046935b6b46de6527f04bc641 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 21 Nov 2024 14:36:54 -0800 Subject: [PATCH] migrate KTableSourceNode to use ProcesserSupplier#stores --- .../apache/kafka/streams/StreamsBuilder.java | 7 +- .../org/apache/kafka/streams/Topology.java | 8 +- .../internals/InternalStreamsBuilder.java | 9 +- .../kstream/internals/KStreamImpl.java | 7 +- .../kstream/internals/KStreamImplJoin.java | 2 +- .../streams/kstream/internals/KTableImpl.java | 7 +- .../kstream/internals/KTableSource.java | 27 +++-- .../internals/MaterializedInternal.java | 11 +- .../internals/graph/GlobalStoreNode.java | 11 +- .../internals/graph/TableSourceNode.java | 34 ++---- .../internals/InternalTopologyBuilder.java | 41 ++++--- .../internals/StoreBuilderWrapper.java | 10 +- .../StoreDelegatingProcessorSupplier.java | 46 ++++++++ .../processor/internals/StoreFactory.java | 75 +++++++++++++ .../internals/graph/TableSourceNodeTest.java | 16 +-- .../internals/GlobalStreamThreadTest.java | 7 +- .../InternalTopologyBuilderTest.java | 101 +++++++++--------- .../kafka/test/MockKeyValueStoreBuilder.java | 2 +- 18 files changed, 278 insertions(+), 143 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 2879436e5000..78c700c5e6b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -384,7 +384,8 @@ public synchronized GlobalKTable globalTable(final String topic, final MaterializedInternal> materializedInternal = new MaterializedInternal<>( Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), - internalStreamsBuilder, topic + "-"); + internalStreamsBuilder, topic + "-", + true /* force materializing global tables */); return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal); } @@ -517,7 +518,7 @@ public synchronized GlobalKTable globalTable(final String topic, */ public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) { Objects.requireNonNull(builder, "builder can't be null"); - internalStreamsBuilder.addStateStore(new StoreBuilderWrapper(builder)); + internalStreamsBuilder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(builder)); return this; } @@ -556,7 +557,7 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder< Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); internalStreamsBuilder.addGlobalStore( - new StoreBuilderWrapper(storeBuilder), + StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), topic, new ConsumedInternal<>(consumed), stateUpdateSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 6b45d70a35cc..c8865005810c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -34,7 +34,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; -import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; +import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.state.StoreBuilder; import java.util.Set; @@ -851,14 +851,13 @@ public synchronized Topology addGlobalStore(final StoreBuilder sto final String processorName, final ProcessorSupplier stateUpdateSupplier) { internalTopologyBuilder.addGlobalStore( - new StoreBuilderWrapper(storeBuilder), sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, - stateUpdateSupplier, + new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, Set.of(storeBuilder)), true ); return this; @@ -897,14 +896,13 @@ public synchronized Topology addGlobalStore(final StoreBuilder sto final String processorName, final ProcessorSupplier stateUpdateSupplier) { internalTopologyBuilder.addGlobalStore( - new StoreBuilderWrapper(storeBuilder), sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, - stateUpdateSupplier, + new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, Set.of(storeBuilder)), true ); return this; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 92dde06e9c05..1e148ac047c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -140,7 +140,7 @@ public KTable table(final String topic, final String tableSourceName = named .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME); - final KTableSource tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName()); + final KTableSource tableSource = new KTableSource<>(materialized); final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, tableSourceName); final TableSourceNode tableSourceNode = TableSourceNode.tableSourceNodeBuilder() @@ -148,7 +148,6 @@ public KTable table(final String topic, .withSourceName(sourceName) .withNodeName(tableSourceName) .withConsumedInternal(consumed) - .withMaterializedInternal(materialized) .withProcessorParameters(processorParameters) .build(); tableSourceNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier); @@ -186,9 +185,7 @@ public GlobalKTable globalTable(final String topic, final String processorName = named .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME); - // enforce store name as queryable name to always materialize global table stores - final String storeName = materialized.storeName(); - final KTableSource tableSource = new KTableSource<>(storeName, storeName); + final KTableSource tableSource = new KTableSource<>(materialized); final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, processorName); @@ -197,12 +194,12 @@ public GlobalKTable globalTable(final String topic, .isGlobalKTable(true) .withSourceName(sourceName) .withConsumedInternal(consumed) - .withMaterializedInternal(materialized) .withProcessorParameters(processorParameters) .build(); addGraphNode(root, tableSourceNode); + final String storeName = materialized.storeName(); return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<>(storeName), materialized.queryableStoreName()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index a23c5ad4b0be..dbe780d3df50 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -660,10 +660,7 @@ public KTable toTable(final Named named, subTopologySourceNodes = this.subTopologySourceNodes; } - final KTableSource tableSource = new KTableSource<>( - materializedInternal.storeName(), - materializedInternal.queryableStoreName() - ); + final KTableSource tableSource = new KTableSource<>(materializedInternal); final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, name); final GraphNode tableNode = new StreamToTableNode<>( name, @@ -1173,7 +1170,7 @@ private KStream doStreamTableJoin(final KTable table, bufferStoreName = Optional.of(name + "-Buffer"); final RocksDBTimeOrderedKeyValueBuffer.Builder storeBuilder = new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joinedInternal.gracePeriod(), name); - builder.addStateStore(new StoreBuilderWrapper(storeBuilder)); + builder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder)); } final ProcessorSupplier processorSupplier = new KStreamKTableJoin<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index 394c13005880..12bb6c19db8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -262,7 +262,7 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, private static StoreFactory joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde) { - return new StoreBuilderWrapper(Stores.windowStoreBuilder( + return StoreBuilderWrapper.wrapStoreBuilder(Stores.windowStoreBuilder( storeSupplier, keySerde, valueSerde diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 09efdb780069..2c75167f019e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -590,7 +590,7 @@ public KTable suppress(final Suppressed suppressed) { final ProcessorGraphNode> node = new TableSuppressNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - new StoreBuilderWrapper(storeBuilder) + StoreBuilderWrapper.wrapStoreBuilder(storeBuilder) ); node.setOutputVersioned(false); @@ -1227,10 +1227,7 @@ private KTable doJoinOnForeignKey(final KTable forei materializedInternal.withKeySerde(keySerde); } - final KTableSource resultProcessorSupplier = new KTableSource<>( - materializedInternal.storeName(), - materializedInternal.queryableStoreName() - ); + final KTableSource resultProcessorSupplier = new KTableSource<>(materializedInternal); final StoreFactory resultStore = new KeyValueStoreMaterializer<>(materializedInternal); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index b29a4fa51f13..fa17f8e50bb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -16,21 +16,24 @@ */ package org.apache.kafka.streams.kstream.internals; +import java.util.Set; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Objects; - import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; @@ -40,15 +43,16 @@ public class KTableSource implements ProcessorSupplier> materialized) { + this.storeName = materialized.storeName(); + this.queryableName = materialized.queryableStoreName(); this.sendOldValues = false; + this.storeFactory = new KeyValueStoreMaterializer<>(materialized); } public String queryableName() { @@ -60,6 +64,15 @@ public Processor> get() { return new KTableSourceProcessor(); } + @Override + public Set> stores() { + if (materialized()) { + return Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory)); + } else { + return null; + } + } + // when source ktable requires sending old values, we just // need to set the queryable name as the store name to enforce materialization public void enableSendingOldValues() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index cf6ce76f8d56..d6cd130ba6db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -39,12 +39,19 @@ public MaterializedInternal(final Materialized materialized) { public MaterializedInternal(final Materialized materialized, final InternalNameProvider nameProvider, final String generatedStorePrefix) { + this(materialized, nameProvider, generatedStorePrefix, false); + } + + public MaterializedInternal(final Materialized materialized, + final InternalNameProvider nameProvider, + final String generatedStorePrefix, + final boolean forceQueryable) { super(materialized); // if storeName is not provided, the corresponding KTable would never be queryable; // but we still need to provide an internal name for it in case we materialize. - queryable = storeName() != null; - if (!queryable && nameProvider != null) { + queryable = forceQueryable || storeName() != null; + if (storeName() == null && nameProvider != null) { storeName = nameProvider.newStoreName(generatedStorePrefix); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java index df6e7c263e6f..7145001deb47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.streams.kstream.internals.graph; +import java.util.Set; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.processor.internals.StoreFactory; public class GlobalStoreNode extends StateStoreNode { @@ -52,15 +54,16 @@ public GlobalStoreNode(final StoreFactory storeBuilder, @Override public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { storeBuilder.withLoggingDisabled(); - topologyBuilder.addGlobalStore(storeBuilder, - sourceName, + topologyBuilder.addGlobalStore(sourceName, consumed.timestampExtractor(), consumed.keyDeserializer(), consumed.valueDeserializer(), topic, processorName, - stateUpdateSupplier, - reprocessOnRestore); + new StoreDelegatingProcessorSupplier<>( + stateUpdateSupplier, + Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeBuilder)) + ), reprocessOnRestore); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java index f0f8e0dcb4a9..5e776a5c733d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java @@ -17,15 +17,10 @@ package org.apache.kafka.streams.kstream.internals.graph; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.KTableSource; -import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; -import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.StoreFactory; -import org.apache.kafka.streams.state.KeyValueStore; import java.util.Collections; import java.util.Iterator; @@ -36,7 +31,6 @@ */ public class TableSourceNode extends SourceGraphNode { - private final MaterializedInternal materializedInternal; private final ProcessorParameters processorParameters; private final String sourceName; private final boolean isGlobalKTable; @@ -46,7 +40,6 @@ private TableSourceNode(final String nodeName, final String sourceName, final String topic, final ConsumedInternal consumedInternal, - final MaterializedInternal materializedInternal, final ProcessorParameters processorParameters, final boolean isGlobalKTable) { @@ -57,7 +50,6 @@ private TableSourceNode(final String nodeName, this.sourceName = sourceName; this.isGlobalKTable = isGlobalKTable; this.processorParameters = processorParameters; - this.materializedInternal = materializedInternal; } @@ -68,7 +60,6 @@ public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicFor @Override public String toString() { return "TableSourceNode{" + - "materializedInternal=" + materializedInternal + ", processorParameters=" + processorParameters + ", sourceName='" + sourceName + '\'' + ", isGlobalKTable=" + isGlobalKTable + @@ -93,12 +84,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { throw new IllegalStateException("A table source node must have a single topic as input"); } - final StoreFactory storeFactory = - new KeyValueStoreMaterializer<>((MaterializedInternal>) materializedInternal); - if (isGlobalKTable) { topologyBuilder.addGlobalStore( - storeFactory, sourceName, consumedInternal().timestampExtractor(), consumedInternal().keyDeserializer(), @@ -116,16 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { consumedInternal().valueDeserializer(), topicName); - topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName); + processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName}); - // only add state store if the source KTable should be materialized + // if the KTableSource should not be materialized, stores will be null or empty final KTableSource tableSource = (KTableSource) processorParameters.processorSupplier(); - if (tableSource.materialized()) { - topologyBuilder.addStateStore(storeFactory, nodeName()); - + if (tableSource.stores() != null) { if (shouldReuseSourceTopicForChangelog) { - storeFactory.withLoggingDisabled(); - topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName); + tableSource.stores().forEach(store -> { + store.withLoggingDisabled(); + topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName); + }); } } } @@ -138,7 +125,6 @@ public static final class TableSourceNodeBuilder { private String sourceName; private String topic; private ConsumedInternal consumedInternal; - private MaterializedInternal materializedInternal; private ProcessorParameters processorParameters; private boolean isGlobalKTable = false; @@ -155,11 +141,6 @@ public TableSourceNodeBuilder withTopic(final String topic) { return this; } - public TableSourceNodeBuilder withMaterializedInternal(final MaterializedInternal materializedInternal) { - this.materializedInternal = materializedInternal; - return this; - } - public TableSourceNodeBuilder withConsumedInternal(final ConsumedInternal consumedInternal) { this.consumedInternal = consumedInternal; return this; @@ -185,7 +166,6 @@ public TableSourceNode build() { sourceName, topic, consumedInternal, - materializedInternal, processorParameters, isGlobalKTable); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 1d9e595c47e1..eb8d808a230a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyConfig; +import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.StateStore; @@ -583,7 +584,7 @@ public final void addProcessor(final String name, public final void addStateStore(final StoreBuilder storeBuilder, final String... processorNames) { - addStateStore(new StoreBuilderWrapper(storeBuilder), false, processorNames); + addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames); } public final void addStateStore(final StoreFactory storeFactory, @@ -614,8 +615,7 @@ public final void addStateStore(final StoreFactory storeFactory, nodeGroups = null; } - public final void addGlobalStore(final StoreFactory storeFactory, - final String sourceName, + public final void addGlobalStore(final String sourceName, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valueDeserializer, @@ -623,8 +623,15 @@ public final void addGlobalStore(final StoreFactory storeFactory, final String processorName, final ProcessorSupplier stateUpdateSupplier, final boolean reprocessOnRestore) { - Objects.requireNonNull(storeFactory, "store builder must not be null"); ApiUtils.checkSupplier(stateUpdateSupplier); + final Set> stores = stateUpdateSupplier.stores(); + if (stores == null || stores.size() != 1) { + throw new IllegalArgumentException( + "Global stores must pass in suppliers with exactly one store but got " + + (stores != null ? stores.size() : 0)); + } + final StoreFactory storeFactory = + StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next()); validateGlobalStoreArguments(sourceName, topic, processorName, @@ -2081,8 +2088,8 @@ public int compare(final TopologyDescription.Subtopology subtopology1, private static final SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator(); public static final class TopologyDescription implements org.apache.kafka.streams.TopologyDescription { - private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); - private final TreeSet globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); + private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); + private final TreeSet globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); private final String namedTopology; public TopologyDescription() { @@ -2093,21 +2100,21 @@ public TopologyDescription(final String namedTopology) { this.namedTopology = namedTopology; } - public void addSubtopology(final TopologyDescription.Subtopology subtopology) { + public void addSubtopology(final Subtopology subtopology) { subtopologies.add(subtopology); } - public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) { + public void addGlobalStore(final GlobalStore globalStore) { globalStores.add(globalStore); } @Override - public Set subtopologies() { + public Set subtopologies() { return Collections.unmodifiableSet(subtopologies); } @Override - public Set globalStores() { + public Set globalStores() { return Collections.unmodifiableSet(globalStores); } @@ -2120,17 +2127,17 @@ public String toString() { } else { sb.append("Topology: ").append(namedTopology).append(":\n "); } - final TopologyDescription.Subtopology[] sortedSubtopologies = - subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[0]); - final TopologyDescription.GlobalStore[] sortedGlobalStores = + final Subtopology[] sortedSubtopologies = + subtopologies.descendingSet().toArray(new Subtopology[0]); + final GlobalStore[] sortedGlobalStores = globalStores.descendingSet().toArray(new GlobalStore[0]); int expectedId = 0; int subtopologiesIndex = sortedSubtopologies.length - 1; int globalStoresIndex = sortedGlobalStores.length - 1; while (subtopologiesIndex != -1 && globalStoresIndex != -1) { sb.append(" "); - final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; - final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; + final Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; + final GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; if (subtopology.id() == expectedId) { sb.append(subtopology); subtopologiesIndex--; @@ -2141,13 +2148,13 @@ public String toString() { expectedId++; } while (subtopologiesIndex != -1) { - final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; + final Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; sb.append(" "); sb.append(subtopology); subtopologiesIndex--; } while (globalStoresIndex != -1) { - final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; + final GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; sb.append(" "); sb.append(globalStore); globalStoresIndex--; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java index b8522b8e2cd9..4648533af1df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java @@ -38,7 +38,15 @@ public class StoreBuilderWrapper implements StoreFactory { private final StoreBuilder builder; private final Set connectedProcessorNames = new HashSet<>(); - public StoreBuilderWrapper(final StoreBuilder builder) { + public static StoreFactory wrapStoreBuilder(final StoreBuilder builder) { + if (builder instanceof FactoryWrappingStoreBuilder) { + return ((FactoryWrappingStoreBuilder) builder).storeFactory(); + } else { + return new StoreBuilderWrapper(builder); + } + } + + private StoreBuilderWrapper(final StoreBuilder builder) { this.builder = builder; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java new file mode 100644 index 000000000000..80e9cd0b5d95 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Set; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.state.StoreBuilder; + +public class StoreDelegatingProcessorSupplier implements ProcessorSupplier { + + private final ProcessorSupplier delegate; + private final Set> stores; + + public StoreDelegatingProcessorSupplier( + final ProcessorSupplier delegate, + final Set> stores + ) { + this.delegate = delegate; + this.stores = stores; + } + + @Override + public Set> stores() { + return stores; + } + + @Override + public Processor get() { + return delegate.get(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java index b05c334c27f5..145b7c56521c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; +import org.apache.kafka.streams.state.StoreBuilder; /** * What! Another mechanism for obtaining a {@link StateStore}? This isn't just @@ -75,4 +76,78 @@ default void configure(final StreamsConfig config) { boolean isCompatibleWith(StoreFactory storeFactory); + class FactoryWrappingStoreBuilder implements StoreBuilder { + + private final StoreFactory storeFactory; + + public FactoryWrappingStoreBuilder(final StoreFactory storeFactory) { + this.storeFactory = storeFactory; + } + + public StoreFactory storeFactory() { + return storeFactory; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final FactoryWrappingStoreBuilder that = (FactoryWrappingStoreBuilder) o; + + return storeFactory.isCompatibleWith(that.storeFactory); + } + + @Override + public int hashCode() { + return storeFactory.hashCode(); + } + + @Override + public StoreBuilder withCachingEnabled() { + throw new IllegalStateException("Should not try to modify StoreBuilder wrapper"); + } + + @Override + public StoreBuilder withCachingDisabled() { + throw new IllegalStateException("Should not try to modify StoreBuilder wrapper"); + } + + @Override + public StoreBuilder withLoggingEnabled(final Map config) { + throw new IllegalStateException("Should not try to modify StoreBuilder wrapper"); + } + + @Override + public StoreBuilder withLoggingDisabled() { + storeFactory.withLoggingDisabled(); + return this; + } + + @SuppressWarnings("unchecked") + @Override + public T build() { + return (T) storeFactory.build(); + } + + @Override + public Map logConfig() { + return storeFactory.logConfig(); + } + + @Override + public boolean loggingEnabled() { + return storeFactory.loggingEnabled(); + } + + @Override + public String name() { + return storeFactory.name(); + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java index 2988e14e720e..0d5c5138d68b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java @@ -16,6 +16,11 @@ */ package org.apache.kafka.streams.kstream.internals.graph; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; @@ -23,17 +28,13 @@ import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; - +import org.apache.kafka.streams.state.KeyValueStore; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class TableSourceNodeTest { @@ -59,12 +60,13 @@ public void shouldConnectStateStoreToChangelogTopic() { private void buildTableSourceNode(final boolean shouldReuseSourceTopicForChangelog) { final TableSourceNodeBuilder tableSourceNodeBuilder = TableSourceNode.tableSourceNodeBuilder(); + final MaterializedInternal> + materializedInternal = new MaterializedInternal<>(Materialized.as(STORE_NAME)); final TableSourceNode tableSourceNode = tableSourceNodeBuilder .withTopic(TOPIC) - .withMaterializedInternal(new MaterializedInternal<>(Materialized.as(STORE_NAME))) .withConsumedInternal(new ConsumedInternal<>(Consumed.as("node-name"))) .withProcessorParameters( - new ProcessorParameters<>(new KTableSource<>(STORE_NAME, STORE_NAME), null)) + new ProcessorParameters<>(new KTableSource<>(materializedInternal), null)) .build(); tableSourceNode.reuseSourceTopicForChangeLog(shouldReuseSourceTopicForChangelog); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index e6b409fed4c9..e2864fa444bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.TestUtils; @@ -108,15 +109,17 @@ public void process(final Record record) { } }; + final StoreFactory storeFactory = + new KeyValueStoreMaterializer<>(materialized).withLoggingDisabled(); + final StoreBuilder storeBuilder = new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory); builder.addGlobalStore( - new KeyValueStoreMaterializer<>(materialized).withLoggingDisabled(), "sourceName", null, null, null, GLOBAL_STORE_TOPIC_NAME, "processorName", - processorSupplier, + new StoreDelegatingProcessorSupplier<>(processorSupplier, Set.of(storeBuilder)), false ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 057c2aa1e303..ee091cac157c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -85,7 +85,8 @@ public class InternalTopologyBuilderTest { private final Serde stringSerde = Serdes.String(); private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); - private final StoreFactory storeBuilder = new MockKeyValueStoreBuilder("testStore", false).asFactory(); + private final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("testStore", false); + private final StoreFactory storeFactory = new MockKeyValueStoreBuilder("testStore", false).asFactory(); @Test public void shouldAddSourceWithOffsetReset() { @@ -215,7 +216,6 @@ public void testAddGlobalStoreWithBadSupplier() { final IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, () -> builder.addGlobalStore( - new MockKeyValueStoreBuilder("global-store", false).asFactory().withLoggingDisabled(), "globalSource", null, null, @@ -321,18 +321,20 @@ public void testPatternAndNameSourceTopics() { @Test public void testPatternSourceTopicsWithGlobalTopics() { + final StoreBuilder storeBuilder = + new MockKeyValueStoreBuilder("global-store", false) + .withLoggingDisabled(); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, Pattern.compile("topic-1")); builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-2")); builder.addGlobalStore( - new MockKeyValueStoreBuilder("global-store", false).asFactory().withLoggingDisabled(), "globalSource", null, null, null, "globalTopic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(storeBuilder)), false ); builder.initializeSubscription(); @@ -346,18 +348,20 @@ public void testPatternSourceTopicsWithGlobalTopics() { @Test public void testNameSourceTopicsWithGlobalTopics() { + final StoreBuilder storeBuilder = + new MockKeyValueStoreBuilder("global-store", false) + .withLoggingDisabled(); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addSource(null, "source-2", null, null, null, "topic-2"); builder.addGlobalStore( - new MockKeyValueStoreBuilder("global-store", false).asFactory().withLoggingDisabled(), "globalSource", null, null, null, "globalTopic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(storeBuilder)), false ); builder.initializeSubscription(); @@ -417,14 +421,14 @@ public void testNamedTopicMatchesAlreadyProvidedPattern() { @Test public void testAddStateStoreWithNonExistingProcessor() { - assertThrows(TopologyException.class, () -> builder.addStateStore(storeBuilder, "no-such-processor")); + assertThrows(TopologyException.class, () -> builder.addStateStore(storeFactory, "no-such-processor")); } @Test public void testAddStateStoreWithSource() { builder.addSource(null, "source-1", null, null, null, "topic-1"); try { - builder.addStateStore(storeBuilder, "source-1"); + builder.addStateStore(storeFactory, "source-1"); fail("Should throw TopologyException with store cannot be added to source"); } catch (final TopologyException expected) { /* ok */ } } @@ -434,7 +438,7 @@ public void testAddStateStoreWithSink() { builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addSink("sink-1", "topic-1", null, null, null, "source-1"); try { - builder.addStateStore(storeBuilder, "sink-1"); + builder.addStateStore(storeFactory, "sink-1"); fail("Should throw TopologyException with store cannot be added to sink"); } catch (final TopologyException expected) { /* ok */ } } @@ -444,7 +448,7 @@ public void shouldNotAllowToAddStoresWithSameName() { final StoreBuilder> otherBuilder = new MockKeyValueStoreBuilder("testStore", false); - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); final TopologyException exception = assertThrows( TopologyException.class, @@ -459,24 +463,23 @@ public void shouldNotAllowToAddStoresWithSameName() { @Test public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() { - final StoreFactory globalBuilder = - new MockKeyValueStoreBuilder("testStore", false).asFactory().withLoggingDisabled(); + final StoreBuilder globalBuilder = + new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); builder.addGlobalStore( - globalBuilder, "global-store", null, null, null, "global-topic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(globalBuilder)), false ); final TopologyException exception = assertThrows( TopologyException.class, - () -> builder.addStateStore(storeBuilder) + () -> builder.addStateStore(storeFactory) ); assertThat( @@ -487,22 +490,21 @@ public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() { @Test public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() { - final StoreFactory globalBuilder = - new MockKeyValueStoreBuilder("testStore", false).asFactory().withLoggingDisabled(); + final StoreBuilder globalBuilder = + new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); final TopologyException exception = assertThrows( TopologyException.class, () -> builder.addGlobalStore( - globalBuilder, "global-store", null, null, null, "global-topic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(globalBuilder)), false ) ); @@ -515,34 +517,32 @@ public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() { @Test public void shouldNotAllowToAddGlobalStoresWithSameName() { - final StoreFactory firstGlobalBuilder = - new MockKeyValueStoreBuilder("testStore", false).asFactory().withLoggingDisabled(); - final StoreFactory secondGlobalBuilder = - new MockKeyValueStoreBuilder("testStore", false).asFactory().withLoggingDisabled(); + final StoreBuilder> firstGlobalBuilder = + new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); + final StoreBuilder> secondGlobalBuilder = + new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); builder.addGlobalStore( - firstGlobalBuilder, "global-store", null, null, null, "global-topic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(firstGlobalBuilder)), false ); final TopologyException exception = assertThrows( TopologyException.class, () -> builder.addGlobalStore( - secondGlobalBuilder, "global-store-2", null, null, null, "global-topic", "global-processor-2", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)), false ) ); @@ -555,35 +555,35 @@ public void shouldNotAllowToAddGlobalStoresWithSameName() { @Test public void testAddStateStore() { - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); assertEquals(0, builder.buildTopology().stateStores().size()); - builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); final List suppliers = builder.buildTopology().stateStores(); assertEquals(1, suppliers.size()); - assertEquals(storeBuilder.name(), suppliers.get(0).name()); + assertEquals(storeFactory.name(), suppliers.get(0).name()); } @Test public void testStateStoreNamesForSubtopology() { - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); builder.addSource(null, "source-2", null, null, null, "topic-2"); builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2"); builder.buildTopology(); final Set stateStoreNames = builder.stateStoreNamesForSubtopology(0); - assertThat(stateStoreNames, equalTo(Set.of(storeBuilder.name()))); + assertThat(stateStoreNames, equalTo(Set.of(storeFactory.name()))); final Set emptyStoreNames = builder.stateStoreNamesForSubtopology(1); assertThat(emptyStoreNames, equalTo(Set.of())); @@ -597,13 +597,13 @@ public void shouldAllowAddingSameStoreBuilderMultipleTimes() { builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-2", storeBuilder.name()); + builder.connectProcessorAndStateStores("processor-2", storeFactory.name()); assertEquals(1, builder.buildTopology().stateStores().size()); } @@ -753,15 +753,16 @@ public void shouldAllowIncrementalBuilds() { assertNotEquals(oldNodeGroups, newNodeGroups); oldNodeGroups = newNodeGroups; + + final StoreBuilder globalBuilder = new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(); builder.addGlobalStore( - new MockKeyValueStoreBuilder("global-store", false).asFactory().withLoggingDisabled(), "globalSource", null, null, null, "globalTopic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(globalBuilder)), false ); newNodeGroups = builder.nodeGroups(); @@ -869,7 +870,7 @@ private Set nodeNames(final Collection> nodes) public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() { builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); - builder.addStateStore(storeBuilder, "processor"); + builder.addStateStore(storeFactory, "processor"); final Map> stateStoreNameToSourceTopic = builder.stateStoreNameToFullSourceTopicNames(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("testStore")); @@ -879,7 +880,7 @@ public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() { public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() { builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); - builder.addStateStore(storeBuilder, "processor"); + builder.addStateStore(storeFactory, "processor"); final Map> stateStoreNameToSourceTopic = builder.stateStoreNameToFullSourceTopicNames(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("testStore")); @@ -891,7 +892,7 @@ public void shouldCorrectlyMapStateStoreToInternalTopics() { builder.addInternalTopic("internal-topic", InternalTopicProperties.empty()); builder.addSource(null, "source", null, null, null, "internal-topic"); builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); - builder.addStateStore(storeBuilder, "processor"); + builder.addStateStore(storeFactory, "processor"); final Map> stateStoreNameToSourceTopic = builder.stateStoreNameToFullSourceTopicNames(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("testStore")); @@ -965,7 +966,7 @@ public void shouldAddInternalTopicConfigForNonWindowNonVersionedStores() { builder.setApplicationId("appId"); builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); - builder.addStateStore(storeBuilder, "processor"); + builder.addStateStore(storeFactory, "processor"); builder.buildTopology(); final Map topicGroups = builder.subtopologyToTopicsInfo(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); @@ -1173,7 +1174,7 @@ public void shouldSortProcessorNodesCorrectly() { public void shouldConnectRegexMatchedTopicsToStateStore() { builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+")); builder.addProcessor("my-processor", new MockApiProcessorSupplier<>(), "ingest"); - builder.addStateStore(storeBuilder, "my-processor"); + builder.addStateStore(storeFactory, "my-processor"); final Set updatedTopics = new HashSet<>(); @@ -1185,7 +1186,7 @@ public void shouldConnectRegexMatchedTopicsToStateStore() { builder.setApplicationId("test-app"); final Map> stateStoreAndTopics = builder.stateStoreNameToFullSourceTopicNames(); - final List topics = stateStoreAndTopics.get(storeBuilder.name()); + final List topics = stateStoreAndTopics.get(storeFactory.name()); assertEquals(2, topics.size(), "Expected to contain two topics"); @@ -1198,14 +1199,13 @@ public void shouldConnectRegexMatchedTopicsToStateStore() { public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { final String sameNameForSourceAndProcessor = "sameName"; assertThrows(TopologyException.class, () -> builder.addGlobalStore( - storeBuilder, sameNameForSourceAndProcessor, null, null, null, "anyTopicName", sameNameForSourceAndProcessor, - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(storeBuilder)), false )); } @@ -1341,16 +1341,17 @@ public void shouldHaveCorrectInternalTopicConfigWhenInternalTopicPropertiesAreNo public void shouldConnectGlobalStateStoreToInputTopic() { final String globalStoreName = "global-store"; final String globalTopic = "global-topic"; + final StoreBuilder storeBuilder = + new MockKeyValueStoreBuilder(globalStoreName, false).withLoggingDisabled(); builder.setApplicationId("X"); builder.addGlobalStore( - new MockKeyValueStoreBuilder(globalStoreName, false).asFactory().withLoggingDisabled(), "globalSource", null, null, null, globalTopic, "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(storeBuilder)), false ); builder.initializeSubscription(); diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java index 2faf89b16223..15c896ad0762 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java @@ -39,6 +39,6 @@ public MockKeyValueStore build() { } public StoreFactory asFactory() { - return new StoreBuilderWrapper(this); + return StoreBuilderWrapper.wrapStoreBuilder(this); } }