Skip to content

Commit

Permalink
KAFKA-18026: KIP-1112, migrate foreign-key joins to use ProcesserSupp…
Browse files Browse the repository at this point in the history
…lier#stores (apache#18194)

Convert FKJ processors to implementing the #stores method

Reviewers: Guozhang Wang <[email protected]>
  • Loading branch information
ableegoldman authored Dec 16, 2024
1 parent 408ce19 commit bac8928
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1247,16 +1247,16 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei

final String subscriptionStoreName = renamed
.suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME);
builder.addStateStore(
new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde));
final StoreFactory subscriptionStoreFactory =
new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde);

final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
"-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode =
new StatefulProcessorNode<>(
subscriptionReceiveName,
new ProcessorParameters<>(
new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreName, combinedKeySchema),
new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
subscriptionReceiveName),
new String[]{subscriptionStoreName}
);
Expand All @@ -1278,12 +1278,11 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei

final String foreignTableJoinName = renamed
.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR);
final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>(
final ProcessorGraphNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>(
new ProcessorParameters<>(
new ForeignTableJoinProcessorSupplier<>(subscriptionStoreName, combinedKeySchema),
new ForeignTableJoinProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema),
foreignTableJoinName
),
new String[]{subscriptionStoreName}
)
);
builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignTableJoinNode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,39 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;

public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class);
private final String storeName;
private final StoreFactory subscriptionStoreFactory;
private final CombinedKeySchema<KO, K> keySchema;
private boolean useVersionedSemantics = false;

public ForeignTableJoinProcessorSupplier(
final String storeName,
final CombinedKeySchema<KO, K> keySchema) {

this.storeName = storeName;
public ForeignTableJoinProcessorSupplier(final StoreFactory subscriptionStoreFactory,
final CombinedKeySchema<KO, K> keySchema) {
this.subscriptionStoreFactory = subscriptionStoreFactory;
this.keySchema = keySchema;
}

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(subscriptionStoreFactory));
}

@Override
public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
return new KTableKTableJoinProcessor();
Expand Down Expand Up @@ -80,7 +88,7 @@ public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> cont
internalProcessorContext.taskId().toString(),
internalProcessorContext.metrics()
);
subscriptionStore = internalProcessorContext.getStateStore(storeName);
subscriptionStore = internalProcessorContext.getStateStore(subscriptionStoreFactory.storeName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,42 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Set;

public class SubscriptionReceiveProcessorSupplier<K, KO>
implements ProcessorSupplier<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionReceiveProcessorSupplier.class);

private final String storeName;
private final StoreFactory subscriptionStoreFactory;
private final CombinedKeySchema<KO, K> keySchema;

public SubscriptionReceiveProcessorSupplier(
final String storeName,
final CombinedKeySchema<KO, K> keySchema) {
public SubscriptionReceiveProcessorSupplier(final StoreFactory subscriptionStoreFactory,
final CombinedKeySchema<KO, K> keySchema) {

this.storeName = storeName;
this.subscriptionStoreFactory = subscriptionStoreFactory;
this.keySchema = keySchema;
}

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(new FactoryWrappingStoreBuilder<>(subscriptionStoreFactory));
}

@Override
public Processor<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> get() {

return new ContextualProcessor<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>>() {
return new ContextualProcessor<>() {

private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
private Sensor droppedRecordsSensor;
Expand All @@ -68,7 +78,7 @@ public void init(final ProcessorContext<CombinedKey<KO, K>, Change<ValueAndTimes
internalProcessorContext.taskId().toString(),
internalProcessorContext.metrics()
);
store = internalProcessorContext.getStateStore(storeName);
store = internalProcessorContext.getStateStore(subscriptionStoreFactory.storeName());

keySchema.init(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

public class ForeignTableJoinNode<K, V> extends StatefulProcessorNode<K, V> implements VersionedSemanticsGraphNode {
public class ForeignTableJoinNode<K, V> extends ProcessorGraphNode<K, V> implements VersionedSemanticsGraphNode {

public ForeignTableJoinNode(final ProcessorParameters<K, V, ?, ?> processorParameters,
final String[] storeNames) {
super(processorParameters.processorName(), processorParameters, storeNames);
public ForeignTableJoinNode(final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(processorParameters.processorName(), processorParameters);
}

@SuppressWarnings("unchecked")
@Override
public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) {
final ProcessorSupplier<?, ?, ?, ?> processorSupplier = processorParameters().processorSupplier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
Expand Down Expand Up @@ -2153,6 +2154,84 @@ public void shouldWrapProcessorsForTableTableOuterJoin() {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3));
}

@Test
public void shouldWrapProcessorsForForeignKeyInnerJoin() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);

final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);

final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

final KTable<String, String> left = builder.table("input1", Consumed.as("input1"));
final KTable<String, String> right = builder.table("input2", Consumed.as("input2"));

left.join(right,
value -> value,
(v1, v2) -> v1 + v2,
TableJoined.as("join"),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-store").withValueSerde(Serdes.String()))
.toStream(Named.as("toStream"))
.to("output", Produced.as("sink"));

builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(9));
assertThat(counter.wrappedProcessorNames().toString(), counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"input1",
"input2",
"join-foreign-join-subscription",
"join-subscription-join-foreign",
"join-subscription-registration-processor",
"join-subscription-receive",
"join-result",
"join-subscription-response-resolver",
"toStream"
));

assertThat(counter.numUniqueStateStores(), CoreMatchers.is(4)); // table1, table2, subscription store, and join materialized
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(5));
}

@Test
public void shouldWrapProcessorsForForeignKeyLeftJoin() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);

final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);

final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

final KTable<String, String> left = builder.table("input1", Consumed.as("input1"));
final KTable<String, String> right = builder.table("input2", Consumed.as("input2"));

left.leftJoin(right,
value -> value,
(v1, v2) -> v1 + v2,
TableJoined.as("l-join"),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-store").withValueSerde(Serdes.String()))
.toStream(Named.as("toStream")) // 6
.to("output", Produced.as("sink"));

builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(9));
assertThat(counter.wrappedProcessorNames().toString(), counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
"input1",
"input2",
"l-join-foreign-join-subscription",
"l-join-subscription-join-foreign",
"l-join-subscription-registration-processor",
"l-join-subscription-receive",
"l-join-result",
"l-join-subscription-response-resolver",
"toStream"
));

assertThat(counter.numUniqueStateStores(), CoreMatchers.is(4)); // table1, table2, subscription store, and join materialized
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(5));
}

@Test
public void shouldAllowStreamsFromSameTopic() {
builder.stream("topic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
Expand Down Expand Up @@ -71,10 +71,13 @@ public void setUp() {
context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 0), stateDir);

final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
processor = new ForeignTableJoinProcessorSupplier<String, String, String>(storeBuilder().name(), COMBINED_KEY_SCHEMA).get();
processor = new ForeignTableJoinProcessorSupplier<String, String, String>(
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder()),
COMBINED_KEY_SCHEMA
).get();
stateStore = storeBuilder.build();
context.addStateStore(stateStore);
stateStore.init((StateStoreContext) context, stateStore);
stateStore.init(context, stateStore);
processor.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
Expand Down Expand Up @@ -507,7 +508,10 @@ public void shouldPropagateNullIfNoFKValAvailableV1() {
private SubscriptionReceiveProcessorSupplier<String, String> supplier(
final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder) {

return new SubscriptionReceiveProcessorSupplier<>(storeBuilder.name(), COMBINED_KEY_SCHEMA);
return new SubscriptionReceiveProcessorSupplier<>(
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder),
COMBINED_KEY_SCHEMA
);
}

private StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder() {
Expand Down

0 comments on commit bac8928

Please sign in to comment.