-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores #17903
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,21 +16,24 @@ | |
*/ | ||
package org.apache.kafka.streams.kstream.internals; | ||
|
||
import java.util.Set; | ||
import org.apache.kafka.common.metrics.Sensor; | ||
import org.apache.kafka.common.utils.Bytes; | ||
import org.apache.kafka.streams.processor.api.Processor; | ||
import org.apache.kafka.streams.processor.api.ProcessorContext; | ||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; | ||
import org.apache.kafka.streams.processor.api.Record; | ||
import org.apache.kafka.streams.processor.api.RecordMetadata; | ||
import org.apache.kafka.streams.processor.internals.StoreFactory; | ||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; | ||
import org.apache.kafka.streams.state.KeyValueStore; | ||
import org.apache.kafka.streams.state.StoreBuilder; | ||
import org.apache.kafka.streams.state.ValueAndTimestamp; | ||
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Objects; | ||
|
||
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; | ||
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; | ||
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; | ||
|
@@ -40,15 +43,16 @@ public class KTableSource<KIn, VIn> implements ProcessorSupplier<KIn, VIn, KIn, | |
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class); | ||
|
||
private final String storeName; | ||
private final StoreFactory storeFactory; | ||
private String queryableName; | ||
private boolean sendOldValues; | ||
|
||
public KTableSource(final String storeName, final String queryableName) { | ||
Objects.requireNonNull(storeName, "storeName can't be null"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should probably keep this check |
||
|
||
this.storeName = storeName; | ||
this.queryableName = queryableName; | ||
public KTableSource( | ||
final MaterializedInternal<KIn, VIn, KeyValueStore<Bytes, byte[]>> materialized) { | ||
this.storeName = materialized.storeName(); | ||
this.queryableName = materialized.queryableStoreName(); | ||
this.sendOldValues = false; | ||
this.storeFactory = new KeyValueStoreMaterializer<>(materialized); | ||
} | ||
|
||
public String queryableName() { | ||
|
@@ -60,6 +64,15 @@ public Processor<KIn, VIn, KIn, Change<VIn>> get() { | |
return new KTableSourceProcessor(); | ||
} | ||
|
||
@Override | ||
public Set<StoreBuilder<?>> stores() { | ||
if (materialized()) { | ||
return Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory)); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
// when source ktable requires sending old values, we just | ||
// need to set the queryable name as the store name to enforce materialization | ||
public void enableSendingOldValues() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,12 +39,19 @@ public MaterializedInternal(final Materialized<K, V, S> materialized) { | |
public MaterializedInternal(final Materialized<K, V, S> materialized, | ||
final InternalNameProvider nameProvider, | ||
final String generatedStorePrefix) { | ||
this(materialized, nameProvider, generatedStorePrefix, false); | ||
} | ||
|
||
public MaterializedInternal(final Materialized<K, V, S> materialized, | ||
final InternalNameProvider nameProvider, | ||
final String generatedStorePrefix, | ||
final boolean forceQueryable) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice refactor, this is way cleaner/easy to follow than randomly overriding the queryableStoreName for things like global stores |
||
super(materialized); | ||
|
||
// if storeName is not provided, the corresponding KTable would never be queryable; | ||
// but we still need to provide an internal name for it in case we materialize. | ||
queryable = storeName() != null; | ||
if (!queryable && nameProvider != null) { | ||
queryable = forceQueryable || storeName() != null; | ||
if (storeName() == null && nameProvider != null) { | ||
storeName = nameProvider.newStoreName(generatedStorePrefix); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,15 +17,10 @@ | |
|
||
package org.apache.kafka.streams.kstream.internals.graph; | ||
|
||
import org.apache.kafka.common.utils.Bytes; | ||
import org.apache.kafka.streams.kstream.internals.ConsumedInternal; | ||
import org.apache.kafka.streams.kstream.internals.KTableSource; | ||
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; | ||
import org.apache.kafka.streams.kstream.internals.MaterializedInternal; | ||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; | ||
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; | ||
import org.apache.kafka.streams.processor.internals.StoreFactory; | ||
import org.apache.kafka.streams.state.KeyValueStore; | ||
|
||
import java.util.Collections; | ||
import java.util.Iterator; | ||
|
@@ -36,7 +31,6 @@ | |
*/ | ||
public class TableSourceNode<K, V> extends SourceGraphNode<K, V> { | ||
|
||
private final MaterializedInternal<K, V, ?> materializedInternal; | ||
private final ProcessorParameters<K, V, ?, ?> processorParameters; | ||
private final String sourceName; | ||
private final boolean isGlobalKTable; | ||
|
@@ -46,7 +40,6 @@ private TableSourceNode(final String nodeName, | |
final String sourceName, | ||
final String topic, | ||
final ConsumedInternal<K, V> consumedInternal, | ||
final MaterializedInternal<K, V, ?> materializedInternal, | ||
final ProcessorParameters<K, V, ?, ?> processorParameters, | ||
final boolean isGlobalKTable) { | ||
|
||
|
@@ -57,7 +50,6 @@ private TableSourceNode(final String nodeName, | |
this.sourceName = sourceName; | ||
this.isGlobalKTable = isGlobalKTable; | ||
this.processorParameters = processorParameters; | ||
this.materializedInternal = materializedInternal; | ||
} | ||
|
||
|
||
|
@@ -68,7 +60,6 @@ public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicFor | |
@Override | ||
public String toString() { | ||
return "TableSourceNode{" + | ||
"materializedInternal=" + materializedInternal + | ||
", processorParameters=" + processorParameters + | ||
", sourceName='" + sourceName + '\'' + | ||
", isGlobalKTable=" + isGlobalKTable + | ||
|
@@ -93,12 +84,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { | |
throw new IllegalStateException("A table source node must have a single topic as input"); | ||
} | ||
|
||
final StoreFactory storeFactory = | ||
new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal); | ||
|
||
if (isGlobalKTable) { | ||
topologyBuilder.addGlobalStore( | ||
storeFactory, | ||
sourceName, | ||
consumedInternal().timestampExtractor(), | ||
consumedInternal().keyDeserializer(), | ||
|
@@ -116,16 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { | |
consumedInternal().valueDeserializer(), | ||
topicName); | ||
|
||
topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName); | ||
processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName}); | ||
|
||
// only add state store if the source KTable should be materialized | ||
// if the KTableSource should not be materialized, stores will be null or empty | ||
final KTableSource<K, V> tableSource = (KTableSource<K, V>) processorParameters.processorSupplier(); | ||
if (tableSource.materialized()) { | ||
topologyBuilder.addStateStore(storeFactory, nodeName()); | ||
|
||
if (tableSource.stores() != null) { | ||
if (shouldReuseSourceTopicForChangelog) { | ||
storeFactory.withLoggingDisabled(); | ||
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName); | ||
tableSource.stores().forEach(store -> { | ||
store.withLoggingDisabled(); | ||
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Btw I noticed that this is actually called inside #addStateStore, at least as long as the parent processor names are passed in, which should always be the case except for with global tables/stores, and when adding stores to a Topology directly via #addStateStore and connecting them manually using #connectProcessorToStateStores (ie the alternative to implementing ProcessorSupplier#stores) In both those cases #connectSourceStoreAndTopic is called directly, so AFAICT there's no reason to be invoking Granted, it's idempotent so calling it again is fine, but it makes the already-messy topology building code even more confusing. Might be nice to remove all these extraneous calls (can be done in a separate PR so we can make sure it doesn't break anything) |
||
}); | ||
} | ||
} | ||
} | ||
|
@@ -138,7 +125,6 @@ public static final class TableSourceNodeBuilder<K, V> { | |
private String sourceName; | ||
private String topic; | ||
private ConsumedInternal<K, V> consumedInternal; | ||
private MaterializedInternal<K, V, ?> materializedInternal; | ||
private ProcessorParameters<K, V, ?, ?> processorParameters; | ||
private boolean isGlobalKTable = false; | ||
|
||
|
@@ -155,11 +141,6 @@ public TableSourceNodeBuilder<K, V> withTopic(final String topic) { | |
return this; | ||
} | ||
|
||
public TableSourceNodeBuilder<K, V> withMaterializedInternal(final MaterializedInternal<K, V, ?> materializedInternal) { | ||
this.materializedInternal = materializedInternal; | ||
return this; | ||
} | ||
|
||
public TableSourceNodeBuilder<K, V> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) { | ||
this.consumedInternal = consumedInternal; | ||
return this; | ||
|
@@ -185,7 +166,6 @@ public TableSourceNode<K, V> build() { | |
sourceName, | ||
topic, | ||
consumedInternal, | ||
materializedInternal, | ||
processorParameters, | ||
isGlobalKTable); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you fix this line to put params on new lines?