Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams context forward multiple topics? #5

Open
imperio-wxm opened this issue Nov 16, 2017 · 5 comments
Open

Streams context forward multiple topics? #5

imperio-wxm opened this issue Nov 16, 2017 · 5 comments

Comments

@imperio-wxm
Copy link

Usually we set one processor forward to one sink.
There is any way to forward data to multiple topics?

@Override
public void process(byte[] key, byte[] value) {
     context.forward(key, value);
}

I have one source,but i want sent data to two topics,such as:

// message = {"type":"1",........}
Topic-1sourceTopic-2Topic-3

if(type == "1") 
     send data to Topic-2
else if(type == "2")
    send data to Topic-3
@bbejeck
Copy link
Owner

bbejeck commented May 6, 2018

Hi @imperio-wxm,

Sorry for the seriously delayed response. The best way for you to do this is with KStream#branch or in the ProcessorAPI where you can forward to specific child nodes by name. Either way, the output topics need to be defined in the topology ahead of time. Let me know if you need more info.

HTH
Bill

@psawmora
Copy link

psawmora commented May 1, 2019

@bbejeck What if we want to create a new record in addition to what we have and forward both to different child processors ? It's like,

 new_record = createNewRecord(old_record);
 context.forward(original_record, to(child_1));
 context.forward(new_record, to(child_2));

Is this a good practice ?

@HungUnicorn
Copy link

HungUnicorn commented May 9, 2019

I think that's the API(Kafka) restriction and wouldn't be a good practice.
Instead of doing the above, you could have two topologies as children, and let them obtain the output of old_record

ParentTopo -> NewRecordTopo(createNewRecord) -> sink1
           -> OldRecordTopo -> sink2

This could achieve your goal. @bbejeck maybe could give more insight about why Processor context cannot forward to two processors.

@imperio-wxm
Copy link
Author

Hi, @bbejeck
Sorry, my reply is late.

I think @psawmora is right.

I do like this, and it works, for a long time I didn't find any problems:

switch(type) {
     case type1:
          context.forward(newKey1, newValue1, TOPIC_1);
          break;
     case type2:
          context.forward(newKey2, newValue2, TOPIC_2);
          break:
     .......
}

I wrote a generic Processor to specifically split the topic. Topological image of a tree.
But source topic can only be one.
Is this a good practice ?

@psawmora
Copy link

@imperio-wxm

But have you tried forwarding the same flow to two different processors at the same time ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants