-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores #17903
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gentle reminder: it's "KAFKA-18026: ..." not "[KAFKA-18026] " when naming your PRs in AK 😉
@@ -384,7 +384,8 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, | |||
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = | |||
new MaterializedInternal<>( | |||
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), | |||
internalStreamsBuilder, topic + "-"); | |||
internalStreamsBuilder, topic + "-", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you fix this line to put params on new lines?
private String queryableName; | ||
private boolean sendOldValues; | ||
|
||
public KTableSource(final String storeName, final String queryableName) { | ||
Objects.requireNonNull(storeName, "storeName can't be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably keep this check
public MaterializedInternal(final Materialized<K, V, S> materialized, | ||
final InternalNameProvider nameProvider, | ||
final String generatedStorePrefix, | ||
final boolean forceQueryable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice refactor, this is way cleaner/easy to follow than randomly overriding the queryableStoreName for things like global stores
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName); | ||
tableSource.stores().forEach(store -> { | ||
store.withLoggingDisabled(); | ||
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw I noticed that this is actually called inside #addStateStore, at least as long as the parent processor names are passed in, which should always be the case except for with global tables/stores, and when adding stores to a Topology directly via #addStateStore and connecting them manually using #connectProcessorToStateStores (ie the alternative to implementing ProcessorSupplier#stores)
In both those cases #connectSourceStoreAndTopic is called directly, so AFAICT there's no reason to be invoking topologyBuilder.connectSourceStoreAndTopic
all over the place including right here.
Granted, it's idempotent so calling it again is fine, but it makes the already-messy topology building code even more confusing. Might be nice to remove all these extraneous calls (can be done in a separate PR so we can make sure it doesn't break anything)
|
||
@Override | ||
public StoreBuilder<T> withCachingDisabled() { | ||
throw new IllegalStateException("Should not try to modify StoreBuilder wrapper"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure it's necessary, but I think we might as well delegate to the StoryFactory here as well. If the StoryFactory has/needs this method for whatever reason then it seems theoretically possible for it to be callled during the topology building process (whereas imo it's highly unlikely for withLoggingEnabled or withCachingEnabled to be called on this, especially if those methods don't even exist on StoryFactory)
btw @agavra I merged the first KIP-1112 PR so can you add a StreamsBuilderTest to verify the processor wrapping for this operator? (also there's a question for you about method names in that PR) |
@agavra also I enabled the PR build (apparently you now need a committer to approve this :/ ) and it looks like the spotless check failed due to import ordering . If you look at the output it should a command you can run to fix it automatically so you don't have to rearrange everything yourself |
Description
This PR is part of the implementation for KIP-1112 (KAFKA-18026). In order to have DSL operators be properly wrapped by the interface suggestion in 1112, we need to make sure they all use the
ConnectedStoreProvider#stores
method to connect stores instead of manually callingaddStateStore
.Review Guide
KTableSource
which now implements thestores()
methodforceQueryable
intoMaterializedInternal
so that we directly indicate that it should be queryable/materialized instead of relying on a side effect of setting the store nameStoreDelegatingProcessorSupplier
is used only in the case of GlobalKTables because the stores are expected to be passed in separately in the public API -- this PR doesn't change thatFactoryWrappingStoreBuilder
is used so that already created store factories can be typecast asStoreBuilder
s.NOTE: For point (3), I think this might be a bug in the current implementation that we may want to change in 4.0 since it's backwards incompatible. If you pass in a
ProcessorSupplier
toaddGlobalTable
, thestores()
method on that will be ignored.Testing
This is a refactor only, there is no new behaviors.
Committer Checklist (excluded from commit message)