diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index e661d78fefcd5..aa07714167738 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -1247,8 +1247,8 @@ private KTable doJoinOnForeignKey(final KTable forei final String subscriptionStoreName = renamed .suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME); - builder.addStateStore( - new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde)); + final StoreFactory subscriptionStoreFactory = + new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde); final String subscriptionReceiveName = renamed.suffixWithOrElseGet( "-subscription-receive", builder, SUBSCRIPTION_PROCESSOR); @@ -1256,7 +1256,7 @@ private KTable doJoinOnForeignKey(final KTable forei new StatefulProcessorNode<>( subscriptionReceiveName, new ProcessorParameters<>( - new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreName, combinedKeySchema), + new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema), subscriptionReceiveName), new String[]{subscriptionStoreName} ); @@ -1278,12 +1278,11 @@ private KTable doJoinOnForeignKey(final KTable forei final String foreignTableJoinName = renamed .suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR); - final StatefulProcessorNode> foreignTableJoinNode = new ForeignTableJoinNode<>( + final ProcessorGraphNode> foreignTableJoinNode = new ForeignTableJoinNode<>( new ProcessorParameters<>( - new ForeignTableJoinProcessorSupplier<>(subscriptionStoreName, combinedKeySchema), + new ForeignTableJoinProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema), foreignTableJoinName - ), - new String[]{subscriptionStoreName} + ) ); builder.addGraphNode(((KTableImpl) foreignKeyTable).graphNode, foreignTableJoinNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java index 7c3e982a8ede3..a8b8228ed552f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java @@ -28,8 +28,11 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -37,22 +40,27 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Set; public class ForeignTableJoinProcessorSupplier implements ProcessorSupplier, K, SubscriptionResponseWrapper> { private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class); - private final String storeName; + private final StoreFactory subscriptionStoreFactory; private final CombinedKeySchema keySchema; private boolean useVersionedSemantics = false; - public ForeignTableJoinProcessorSupplier( - final String storeName, - final CombinedKeySchema keySchema) { - - this.storeName = storeName; + public ForeignTableJoinProcessorSupplier(final StoreFactory subscriptionStoreFactory, + final CombinedKeySchema keySchema) { + this.subscriptionStoreFactory = subscriptionStoreFactory; this.keySchema = keySchema; } + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(subscriptionStoreFactory)); + } + @Override public Processor, K, SubscriptionResponseWrapper> get() { return new KTableKTableJoinProcessor(); @@ -80,7 +88,7 @@ public void init(final ProcessorContext> cont internalProcessorContext.taskId().toString(), internalProcessorContext.metrics() ); - subscriptionStore = internalProcessorContext.getStateStore(storeName); + subscriptionStore = internalProcessorContext.getStateStore(subscriptionStoreFactory.storeName()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java index a935797ad180e..825283e98f581 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java @@ -28,32 +28,42 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Set; + public class SubscriptionReceiveProcessorSupplier implements ProcessorSupplier, CombinedKey, Change>>> { private static final Logger LOG = LoggerFactory.getLogger(SubscriptionReceiveProcessorSupplier.class); - private final String storeName; + private final StoreFactory subscriptionStoreFactory; private final CombinedKeySchema keySchema; - public SubscriptionReceiveProcessorSupplier( - final String storeName, - final CombinedKeySchema keySchema) { + public SubscriptionReceiveProcessorSupplier(final StoreFactory subscriptionStoreFactory, + final CombinedKeySchema keySchema) { - this.storeName = storeName; + this.subscriptionStoreFactory = subscriptionStoreFactory; this.keySchema = keySchema; } + @Override + public Set> stores() { + return Collections.singleton(new FactoryWrappingStoreBuilder<>(subscriptionStoreFactory)); + } + @Override public Processor, CombinedKey, Change>>> get() { - return new ContextualProcessor, CombinedKey, Change>>>() { + return new ContextualProcessor<>() { private TimestampedKeyValueStore> store; private Sensor droppedRecordsSensor; @@ -68,7 +78,7 @@ public void init(final ProcessorContext, Change extends StatefulProcessorNode implements VersionedSemanticsGraphNode { +public class ForeignTableJoinNode extends ProcessorGraphNode implements VersionedSemanticsGraphNode { - public ForeignTableJoinNode(final ProcessorParameters processorParameters, - final String[] storeNames) { - super(processorParameters.processorName(), processorParameters, storeNames); + public ForeignTableJoinNode(final ProcessorParameters processorParameters) { + super(processorParameters.processorName(), processorParameters); } - @SuppressWarnings("unchecked") @Override public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) { final ProcessorSupplier processorSupplier = processorParameters().processorSupplier(); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index be38f0514929c..b20bec101bca5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.Processor; @@ -2153,6 +2154,84 @@ public void shouldWrapProcessorsForTableTableOuterJoin() { assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3)); } + @Test + public void shouldWrapProcessorsForForeignKeyInnerJoin() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + final KTable left = builder.table("input1", Consumed.as("input1")); + final KTable right = builder.table("input2", Consumed.as("input2")); + + left.join(right, + value -> value, + (v1, v2) -> v1 + v2, + TableJoined.as("join"), + Materialized.>as("materialized-store").withValueSerde(Serdes.String())) + .toStream(Named.as("toStream")) + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(9)); + assertThat(counter.wrappedProcessorNames().toString(), counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "input1", + "input2", + "join-foreign-join-subscription", + "join-subscription-join-foreign", + "join-subscription-registration-processor", + "join-subscription-receive", + "join-result", + "join-subscription-response-resolver", + "toStream" + )); + + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(4)); // table1, table2, subscription store, and join materialized + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(5)); + } + + @Test + public void shouldWrapProcessorsForForeignKeyLeftJoin() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + final KTable left = builder.table("input1", Consumed.as("input1")); + final KTable right = builder.table("input2", Consumed.as("input2")); + + left.leftJoin(right, + value -> value, + (v1, v2) -> v1 + v2, + TableJoined.as("l-join"), + Materialized.>as("materialized-store").withValueSerde(Serdes.String())) + .toStream(Named.as("toStream")) // 6 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(9)); + assertThat(counter.wrappedProcessorNames().toString(), counter.wrappedProcessorNames(), Matchers.containsInAnyOrder( + "input1", + "input2", + "l-join-foreign-join-subscription", + "l-join-subscription-join-foreign", + "l-join-subscription-registration-processor", + "l-join-subscription-receive", + "l-join-result", + "l-join-subscription-response-resolver", + "toStream" + )); + + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(4)); // table1, table2, subscription store, and join materialized + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(5)); + } + @Test public void shouldAllowStreamsFromSameTopic() { builder.stream("topic"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java index 4ce76eee0cfdc..766cb520a0c53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java @@ -21,10 +21,10 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.internals.Change; -import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -71,10 +71,13 @@ public void setUp() { context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 0), stateDir); final StoreBuilder>> storeBuilder = storeBuilder(); - processor = new ForeignTableJoinProcessorSupplier(storeBuilder().name(), COMBINED_KEY_SCHEMA).get(); + processor = new ForeignTableJoinProcessorSupplier( + StoreBuilderWrapper.wrapStoreBuilder(storeBuilder()), + COMBINED_KEY_SCHEMA + ).get(); stateStore = storeBuilder.build(); context.addStateStore(stateStore); - stateStore.init((StateStoreContext) context, stateStore); + stateStore.init(context, stateStore); processor.init(context); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java index 72e4eb23e4594..aabcddb10f8aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -507,7 +508,10 @@ public void shouldPropagateNullIfNoFKValAvailableV1() { private SubscriptionReceiveProcessorSupplier supplier( final StoreBuilder>> storeBuilder) { - return new SubscriptionReceiveProcessorSupplier<>(storeBuilder.name(), COMBINED_KEY_SCHEMA); + return new SubscriptionReceiveProcessorSupplier<>( + StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), + COMBINED_KEY_SCHEMA + ); } private StoreBuilder>> storeBuilder() {