Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores #17903

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Nov 21, 2024

Description

This PR is part of the implementation for KIP-1112 (KAFKA-18026). In order to have DSL operators be properly wrapped by the interface suggestion in 1112, we need to make sure they all use the ConnectedStoreProvider#stores method to connect stores instead of manually calling addStateStore.

Review Guide

  1. Checkout KTableSource which now implements the stores() method
  2. GlobalKTables no longer hack materialized by setting the source name on the builder, instead we pipe a forceQueryable into MaterializedInternal so that we directly indicate that it should be queryable/materialized instead of relying on a side effect of setting the store name
  3. StoreDelegatingProcessorSupplier is used only in the case of GlobalKTables because the stores are expected to be passed in separately in the public API -- this PR doesn't change that
  4. FactoryWrappingStoreBuilder is used so that already created store factories can be typecast as StoreBuilders.

NOTE: For point (3), I think this might be a bug in the current implementation that we may want to change in 4.0 since it's backwards incompatible. If you pass in a ProcessorSupplier to addGlobalTable, the stores() method on that will be ignored.

Testing

This is a refactor only, there is no new behaviors.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ableegoldman ableegoldman changed the title [KAFKA-18026] migrate KTableSource to use ProcesserSupplier#stores KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores Nov 22, 2024
Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gentle reminder: it's "KAFKA-18026: ..." not "[KAFKA-18026] " when naming your PRs in AK 😉

@@ -384,7 +384,8 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder, topic + "-");
internalStreamsBuilder, topic + "-",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you fix this line to put params on new lines?

private String queryableName;
private boolean sendOldValues;

public KTableSource(final String storeName, final String queryableName) {
Objects.requireNonNull(storeName, "storeName can't be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably keep this check

public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix,
final boolean forceQueryable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice refactor, this is way cleaner/easy to follow than randomly overriding the queryableStoreName for things like global stores

topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName);
tableSource.stores().forEach(store -> {
store.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I noticed that this is actually called inside #addStateStore, at least as long as the parent processor names are passed in, which should always be the case except for with global tables/stores, and when adding stores to a Topology directly via #addStateStore and connecting them manually using #connectProcessorToStateStores (ie the alternative to implementing ProcessorSupplier#stores)

In both those cases #connectSourceStoreAndTopic is called directly, so AFAICT there's no reason to be invoking topologyBuilder.connectSourceStoreAndTopic all over the place including right here.

Granted, it's idempotent so calling it again is fine, but it makes the already-messy topology building code even more confusing. Might be nice to remove all these extraneous calls (can be done in a separate PR so we can make sure it doesn't break anything)


@Override
public StoreBuilder<T> withCachingDisabled() {
throw new IllegalStateException("Should not try to modify StoreBuilder wrapper");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure it's necessary, but I think we might as well delegate to the StoryFactory here as well. If the StoryFactory has/needs this method for whatever reason then it seems theoretically possible for it to be callled during the topology building process (whereas imo it's highly unlikely for withLoggingEnabled or withCachingEnabled to be called on this, especially if those methods don't even exist on StoryFactory)

@ableegoldman
Copy link
Contributor

btw @agavra I merged the first KIP-1112 PR so can you add a StreamsBuilderTest to verify the processor wrapping for this operator?

(also there's a question for you about method names in that PR)

@ableegoldman
Copy link
Contributor

@agavra also I enabled the PR build (apparently you now need a committer to approve this :/ ) and it looks like the spotless check failed due to import ordering . If you look at the output it should a command you can run to fix it automatically so you don't have to rearrange everything yourself

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants