Skip to content

[WIP] New API#6

Merged
Guozhang Wang (guozhangwang) merged 11 commits into
confluentinc:streamingfrom
guozhangwang:NewAPI
Aug 21, 2015
Merged

[WIP] New API#6
Guozhang Wang (guozhangwang) merged 11 commits into
confluentinc:streamingfrom
guozhangwang:NewAPI

Conversation

@guozhangwang

Copy link
Copy Markdown

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have more descriptive name than PConfig?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guozhang Wang (@guozhangwang)
I think whatever we replace P with will have to be consistent across PTopology, PTopologyBuilder and PConfig. Few suggestions:

  1. s/P/Stream
  2. s/P/KStreamProcessor
  3. s/P/KafkaProcessor

The problem with 3 is that it is unrelated to streams. But if that class remains to be called KafkaProcessor, then its Config object can't be called something else.

@guozhangwang

Copy link
Copy Markdown
Author

ymatsuda Neha Narkhede (@nehanarkhede) Jay Kreps (@jkreps)

Made some changes according to the comments:

  1. Rename o.a.k.stream to o.a.k.streaming, which includes processor / kstream / state / examples.
  2. Rename P* class names to TopologyBuilder, KStreamBuilder, ProcessorTopology and ProcessorMetadata.
  3. Removed ProcessorProperties to have all configs in StreamingConfig (renamed from ProcessorConfig).
  4. Hide Chooser / StampedRecord / TimestampTracker from user facing APIs, we can decide whether to maintain the Chooser / TimestampTracker interfaces moving forward.

@guozhangwang

Copy link
Copy Markdown
Author

ymatsuda Neha Narkhede (@nehanarkhede) Jay Kreps (@jkreps)

Add SinkNode to unify context.send() with processor.forward(). Now if we want to send a key-value to a kafka topic, it will be forwarded into a sink node.

Some open questions:

  1. Do we want to hard-code the Chooser / TimestampTracker for now, or still leave it as instantiatable interface?

  2. I thought about how to avoid passing the Class object in KStream.process() but do not have a good solution, anyone have any ideas?

  3. I tried to rename the classes under the old API, one example looks like:

public class WordCount {

    public class WordCountJob extends KStreamJob {

        @Override
        public void process() {
            KStream<String, String> input = from(new StringDeserializer(), new StringDeserializer(), "topic1");

            KStream<String, Integer> counts = input.flatMap((key, value) ->
            for (String word : value.split(" "))
                    new KeyValue(word, 1);
                );

            KStream<String, Integer> groupedCounts = counts.aggregate(1);

            groupedCounts.sendTo("topic2");
        }
    }


    public static void main(String[] args) throws Exception {
        KafkaStreaming stream = new KafkaStreaming(new WordCountJob(), new StreamingConfig(new Properties()));
        stream.run();
    }
}

Its difference is arguably nuance with Flink, which requires calling env.execute(programName) in the end, and Spark Streaming, which requires calling StreamingContext.start() in the end as well. But it may make REPL hard because of the wrapper class. Thoughts?

  1. About compiling SQL / etc to Processor, I think it should be doable via pre-defined Processor module as we did in KStream for now. So I personally think it should be OK unless someone find any corner cases?

@guozhangwang

Copy link
Copy Markdown
Author

ymatsuda

The latest patch is in-progress to the stateless plan API. Some details:

  1. Class names are following the packaging proposal wiki.
  2. StreamThread contains a map of StreamTasks.
  3. StreamTask has an associated PartitionGroup.
  4. PartitionGroup handles timestamp tracking and queue buffered size.
  5. StreamTask also keeps a ProcessorContextImpl.
  6. A single ProcessorTopology will be shared among all the StreamThreads.
  7. When processing a record, it will be passed from StreamTask's queue into the same ProcessorTopology, but along with different ProcessorContextImpl depending on the task.

Minor:

  1. Trying to remove Ingestor and directly use Consumer with the pause / resume call.
  2. Remove Chooser / TimestampTracker interfaces.

Most of these classes are in-complete yet, if you are OK I can merge them into branch-streaming for now for you to take over.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants