Skip to content

Alternative API with KStream on top of Processor#5

Merged
Guozhang Wang (guozhangwang) merged 18 commits into
confluentinc:streamingfrom
guozhangwang:S-Processor
Aug 11, 2015
Merged

Alternative API with KStream on top of Processor#5
Guozhang Wang (guozhangwang) merged 18 commits into
confluentinc:streamingfrom
guozhangwang:S-Processor

Conversation

@guozhangwang

Copy link
Copy Markdown

Jay Kreps (@jkreps) Neha Narkhede (@nehanarkhede) ymatsuda

This is a second attempt for KStream-on-top-of-Processor: I maintained generics on KafkaProcessor and make addProcessor accepting the parent object directly.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you allow processors to be added after the topology has been built?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should need a default Processor implementation.

@jkreps

Copy link
Copy Markdown

I like the separation of layers a lot. I left some minor comments. At a high-level I think I am still stuck by the end user interface being a little awkward. Currently I think the interface is to define a class that builds a topology and then pass it in as a class. I think this is a bit weird.

I expect to do something like this:

    Topology myTopology = new TopologyBuilder(defaultDeser)
                                                 .addProcessor("my-processor", MyProcessor.class, new Source("my-source"))
                                                .addProcessor("my-other-processor", MyOtherProcessor.class, new Source("other-source", customDeser, customDeser), "my-processor");
    KafkaStreaming streaming = new KafkaStreaming(config, myTopology);
   streaming.run();

I think this is tied back to the question of whether I pass processor instances or classes. I think I may understand part of the confusion on this issue. One thing I am assuming is that the user will pass fully instantiated Objects to the processors that would be available in the init method.

So the implementation of KStream.filter currently looks like this:

    public KStream<K, V> filter(Predicate<K, V> predicate) {
        KStreamFilter<K, V> filter = new KStreamFilter<>(predicate);
        topology.addProcessor(filter, processor);
        return new KStreamImpl<>(topology, filter);
    }

What I am proposing is that it look instead like this:

    public KStream<K, V> filter(Predicate<K, V> predicate) {
        KStreamFilter<K, V> filter = new KStreamFilter<>(predicate);
        topology.addProcessor(KStreamFilter.class, new Configs("predicate", predicate));
        return this;
    }

The advantage is that the user code can now get rid of the whole Topology class with the builder. I think the order of execution for that API is quite unintuitive.

Guozhang Wang (guozhangwang) added a commit that referenced this pull request Aug 11, 2015
Alternative API with KStream on top of Processor
@guozhangwang Guozhang Wang (guozhangwang) merged commit 68d055c into confluentinc:streaming Aug 11, 2015
@ymatsuda

Copy link
Copy Markdown

I am not convinced that we need this complexity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants