Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores #17903

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder, topic + "-");
internalStreamsBuilder, topic + "-",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you fix this line to put params on new lines?

true /* force materializing global tables */);

return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
}
Expand Down Expand Up @@ -517,7 +518,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
*/
public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder) {
Objects.requireNonNull(builder, "builder can't be null");
internalStreamsBuilder.addStateStore(new StoreBuilderWrapper(builder));
internalStreamsBuilder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(builder));
return this;
}

Expand Down Expand Up @@ -556,7 +557,7 @@ public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(
new StoreBuilderWrapper(storeBuilder),
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder),
topic,
new ConsumedInternal<>(consumed),
stateUpdateSupplier,
Expand Down
8 changes: 3 additions & 5 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Set;
Expand Down Expand Up @@ -851,14 +851,13 @@ public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> sto
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
new StoreBuilderWrapper(storeBuilder),
sourceName,
null,
keyDeserializer,
valueDeserializer,
topic,
processorName,
stateUpdateSupplier,
new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, Set.of(storeBuilder)),
true
);
return this;
Expand Down Expand Up @@ -897,14 +896,13 @@ public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> sto
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
new StoreBuilderWrapper(storeBuilder),
sourceName,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topic,
processorName,
stateUpdateSupplier,
new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, Set.of(storeBuilder)),
true
);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,14 @@ public <K, V> KTable<K, V> table(final String topic,
final String tableSourceName = named
.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);

final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
final KTableSource<K, V> tableSource = new KTableSource<>(materialized);
final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);

final TableSourceNode<K, V> tableSourceNode = TableSourceNode.<K, V>tableSourceNodeBuilder()
.withTopic(topic)
.withSourceName(sourceName)
.withNodeName(tableSourceName)
.withConsumedInternal(consumed)
.withMaterializedInternal(materialized)
.withProcessorParameters(processorParameters)
.build();
tableSourceNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier);
Expand Down Expand Up @@ -186,9 +185,7 @@ public <K, V> GlobalKTable<K, V> globalTable(final String topic,
final String processorName = named
.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);

// enforce store name as queryable name to always materialize global table stores
final String storeName = materialized.storeName();
final KTableSource<K, V> tableSource = new KTableSource<>(storeName, storeName);
final KTableSource<K, V> tableSource = new KTableSource<>(materialized);

final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, processorName);

Expand All @@ -197,12 +194,12 @@ public <K, V> GlobalKTable<K, V> globalTable(final String topic,
.isGlobalKTable(true)
.withSourceName(sourceName)
.withConsumedInternal(consumed)
.withMaterializedInternal(materialized)
.withProcessorParameters(processorParameters)
.build();

addGraphNode(root, tableSourceNode);

final String storeName = materialized.storeName();
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<>(storeName), materialized.queryableStoreName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,10 +660,7 @@ public KTable<K, V> toTable(final Named named,
subTopologySourceNodes = this.subTopologySourceNodes;
}

final KTableSource<K, V> tableSource = new KTableSource<>(
materializedInternal.storeName(),
materializedInternal.queryableStoreName()
);
final KTableSource<K, V> tableSource = new KTableSource<>(materializedInternal);
final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, name);
final GraphNode tableNode = new StreamToTableNode<>(
name,
Expand Down Expand Up @@ -1173,7 +1170,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
bufferStoreName = Optional.of(name + "-Buffer");
final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> storeBuilder =
new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joinedInternal.gracePeriod(), name);
builder.addStateStore(new StoreBuilderWrapper(storeBuilder));
builder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder));
}

final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
private static <K, V> StoreFactory joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new StoreBuilderWrapper(Stores.windowStoreBuilder(
return StoreBuilderWrapper.wrapStoreBuilder(Stores.windowStoreBuilder(
storeSupplier,
keySerde,
valueSerde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
new StoreBuilderWrapper(storeBuilder)
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder)
);
node.setOutputVersioned(false);

Expand Down Expand Up @@ -1227,10 +1227,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
materializedInternal.withKeySerde(keySerde);
}

final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(
materializedInternal.storeName(),
materializedInternal.queryableStoreName()
);
final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal);

final StoreFactory resultStore =
new KeyValueStoreMaterializer<>(materializedInternal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,24 @@
*/
package org.apache.kafka.streams.kstream.internals;

import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
Expand All @@ -40,15 +43,16 @@ public class KTableSource<KIn, VIn> implements ProcessorSupplier<KIn, VIn, KIn,
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);

private final String storeName;
private final StoreFactory storeFactory;
private String queryableName;
private boolean sendOldValues;

public KTableSource(final String storeName, final String queryableName) {
Objects.requireNonNull(storeName, "storeName can't be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably keep this check


this.storeName = storeName;
this.queryableName = queryableName;
public KTableSource(
final MaterializedInternal<KIn, VIn, KeyValueStore<Bytes, byte[]>> materialized) {
this.storeName = materialized.storeName();
this.queryableName = materialized.queryableStoreName();
this.sendOldValues = false;
this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
}

public String queryableName() {
Expand All @@ -60,6 +64,15 @@ public Processor<KIn, VIn, KIn, Change<VIn>> get() {
return new KTableSourceProcessor();
}

@Override
public Set<StoreBuilder<?>> stores() {
if (materialized()) {
return Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
} else {
return null;
}
}

// when source ktable requires sending old values, we just
// need to set the queryable name as the store name to enforce materialization
public void enableSendingOldValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,19 @@ public MaterializedInternal(final Materialized<K, V, S> materialized) {
public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix) {
this(materialized, nameProvider, generatedStorePrefix, false);
}

public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix,
final boolean forceQueryable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice refactor, this is way cleaner/easy to follow than randomly overriding the queryableStoreName for things like global stores

super(materialized);

// if storeName is not provided, the corresponding KTable would never be queryable;
// but we still need to provide an internal name for it in case we materialize.
queryable = storeName() != null;
if (!queryable && nameProvider != null) {
queryable = forceQueryable || storeName() != null;
if (storeName() == null && nameProvider != null) {
storeName = nameProvider.newStoreName(generatedStorePrefix);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals.graph;

import java.util.Set;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;

public class GlobalStoreNode<KIn, VIn, S extends StateStore> extends StateStoreNode<S> {
Expand Down Expand Up @@ -52,15 +54,16 @@ public GlobalStoreNode(final StoreFactory storeBuilder,
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
storeBuilder.withLoggingDisabled();
topologyBuilder.addGlobalStore(storeBuilder,
sourceName,
topologyBuilder.addGlobalStore(sourceName,
consumed.timestampExtractor(),
consumed.keyDeserializer(),
consumed.valueDeserializer(),
topic,
processorName,
stateUpdateSupplier,
reprocessOnRestore);
new StoreDelegatingProcessorSupplier<>(
stateUpdateSupplier,
Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeBuilder))
), reprocessOnRestore);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@

package org.apache.kafka.streams.kstream.internals.graph;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Collections;
import java.util.Iterator;
Expand All @@ -36,7 +31,6 @@
*/
public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {

private final MaterializedInternal<K, V, ?> materializedInternal;
private final ProcessorParameters<K, V, ?, ?> processorParameters;
private final String sourceName;
private final boolean isGlobalKTable;
Expand All @@ -46,7 +40,6 @@ private TableSourceNode(final String nodeName,
final String sourceName,
final String topic,
final ConsumedInternal<K, V> consumedInternal,
final MaterializedInternal<K, V, ?> materializedInternal,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final boolean isGlobalKTable) {

Expand All @@ -57,7 +50,6 @@ private TableSourceNode(final String nodeName,
this.sourceName = sourceName;
this.isGlobalKTable = isGlobalKTable;
this.processorParameters = processorParameters;
this.materializedInternal = materializedInternal;
}


Expand All @@ -68,7 +60,6 @@ public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicFor
@Override
public String toString() {
return "TableSourceNode{" +
"materializedInternal=" + materializedInternal +
", processorParameters=" + processorParameters +
", sourceName='" + sourceName + '\'' +
", isGlobalKTable=" + isGlobalKTable +
Expand All @@ -93,12 +84,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
throw new IllegalStateException("A table source node must have a single topic as input");
}

final StoreFactory storeFactory =
new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal);

if (isGlobalKTable) {
topologyBuilder.addGlobalStore(
storeFactory,
sourceName,
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
Expand All @@ -116,16 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
consumedInternal().valueDeserializer(),
topicName);

topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName);
processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName});

// only add state store if the source KTable should be materialized
// if the KTableSource should not be materialized, stores will be null or empty
final KTableSource<K, V> tableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
if (tableSource.materialized()) {
topologyBuilder.addStateStore(storeFactory, nodeName());

if (tableSource.stores() != null) {
if (shouldReuseSourceTopicForChangelog) {
storeFactory.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName);
tableSource.stores().forEach(store -> {
store.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I noticed that this is actually called inside #addStateStore, at least as long as the parent processor names are passed in, which should always be the case except for with global tables/stores, and when adding stores to a Topology directly via #addStateStore and connecting them manually using #connectProcessorToStateStores (ie the alternative to implementing ProcessorSupplier#stores)

In both those cases #connectSourceStoreAndTopic is called directly, so AFAICT there's no reason to be invoking topologyBuilder.connectSourceStoreAndTopic all over the place including right here.

Granted, it's idempotent so calling it again is fine, but it makes the already-messy topology building code even more confusing. Might be nice to remove all these extraneous calls (can be done in a separate PR so we can make sure it doesn't break anything)

});
}
}
}
Expand All @@ -138,7 +125,6 @@ public static final class TableSourceNodeBuilder<K, V> {
private String sourceName;
private String topic;
private ConsumedInternal<K, V> consumedInternal;
private MaterializedInternal<K, V, ?> materializedInternal;
private ProcessorParameters<K, V, ?, ?> processorParameters;
private boolean isGlobalKTable = false;

Expand All @@ -155,11 +141,6 @@ public TableSourceNodeBuilder<K, V> withTopic(final String topic) {
return this;
}

public TableSourceNodeBuilder<K, V> withMaterializedInternal(final MaterializedInternal<K, V, ?> materializedInternal) {
this.materializedInternal = materializedInternal;
return this;
}

public TableSourceNodeBuilder<K, V> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) {
this.consumedInternal = consumedInternal;
return this;
Expand All @@ -185,7 +166,6 @@ public TableSourceNode<K, V> build() {
sourceName,
topic,
consumedInternal,
materializedInternal,
processorParameters,
isGlobalKTable);
}
Expand Down
Loading
Loading