Skip to content
16 changes: 13 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,22 @@
public class StreamsBuilder {

/** The actual topology that is constructed by this StreamsBuilder. */
private final Topology topology = new Topology();
protected final Topology topology;

/** The topology's internal builder. */
final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
protected final InternalTopologyBuilder internalTopologyBuilder;

private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
protected final InternalStreamsBuilder internalStreamsBuilder;

public StreamsBuilder() {
topology = getNewTopology();
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
}

protected Topology getNewTopology() {
return new Topology();
}

/**
* Create a {@link KStream} from the specified topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
*/
public class Topology {

final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
protected final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();

/**
* Sets the {@code auto.offset.reset} configuration when
Expand Down
135 changes: 118 additions & 17 deletions streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,63 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.MIN_NAMED_TOPOLOGY_VERSION;

/**
* The task ID representation composed as topic group ID plus the assigned partition ID.
*/
public class TaskId implements Comparable<TaskId> {

private static final Logger LOG = LoggerFactory.getLogger(TaskId.class);

/** The ID of the topic group. */
public final int topicGroupId;
/** The ID of the partition. */
public final int partition;
/** The namedTopology that this task belongs to, or null if it does not belong to one */
private final String namedTopology;

public TaskId(final int topicGroupId, final int partition) {
this(topicGroupId, partition, null);
}

public TaskId(final int topicGroupId, final int partition, final String namedTopology) {
this.topicGroupId = topicGroupId;
this.partition = partition;
if (namedTopology != null && namedTopology.length() == 0) {
LOG.warn("Empty string passed in for task's namedTopology, since NamedTopology name cannot be empty, we "
+ "assume this task does not belong to a NamedTopology and downgrade this to null");
this.namedTopology = null;
} else {
this.namedTopology = namedTopology;
}
}

public Optional<String> namedTopology() {
return namedTopology == null ? Optional.empty() : Optional.of(namedTopology);
}

@Override
public String toString() {
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
}

public String toTaskDirString() {
return topicGroupId + "_" + partition;
}

/**
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
* Parse the task directory name (of the form topicGroupId_partition) and construct the TaskId with the
* optional namedTopology (may be null)
*
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
*/
public static TaskId parse(final String taskIdStr) {
public static TaskId parseTaskDirectoryName(final String taskIdStr, final String namedTopology) {
final int index = taskIdStr.indexOf('_');
if (index <= 0 || index + 1 >= taskIdStr.length()) {
throw new TaskIdFormatException(taskIdStr);
Expand All @@ -55,7 +88,7 @@ public static TaskId parse(final String taskIdStr) {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index));
final int partition = Integer.parseInt(taskIdStr.substring(index + 1));

return new TaskId(topicGroupId, partition);
return new TaskId(topicGroupId, partition, namedTopology);
} catch (final Exception e) {
throw new TaskIdFormatException(taskIdStr);
}
Expand All @@ -64,50 +97,118 @@ public static TaskId parse(final String taskIdStr) {
/**
* @throws IOException if cannot write to output stream
*/
public void writeTo(final DataOutputStream out) throws IOException {
public void writeTo(final DataOutputStream out, final int version) throws IOException {
out.writeInt(topicGroupId);
out.writeInt(partition);
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I did strip out the protocol change, I left this in since it doesn't affect anything until we actually bump the protocol version (which I did take out of this PR)

if (namedTopology != null) {
out.writeInt(namedTopology.length());
out.writeChars(namedTopology);
} else {
out.writeInt(0);
}
}
}

/**
* @throws IOException if cannot read from input stream
*/
public static TaskId readFrom(final DataInputStream in) throws IOException {
return new TaskId(in.readInt(), in.readInt());
public static TaskId readFrom(final DataInputStream in, final int version) throws IOException {
final int topicGroupId = in.readInt();
final int partition = in.readInt();
final String namedTopology;
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
final int numNamedTopologyChars = in.readInt();
final StringBuilder namedTopologyBuilder = new StringBuilder();
for (int i = 0; i < numNamedTopologyChars; ++i) {
namedTopologyBuilder.append(in.readChar());
}
namedTopology = namedTopologyBuilder.toString();
} else {
namedTopology = null;
}
return new TaskId(topicGroupId, partition, getNamedTopologyOrElseNull(namedTopology));
}

public void writeTo(final ByteBuffer buf) {
public void writeTo(final ByteBuffer buf, final int version) {
buf.putInt(topicGroupId);
buf.putInt(partition);
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
if (namedTopology != null) {
buf.putInt(namedTopology.length());
for (final char c : namedTopology.toCharArray()) {
buf.putChar(c);
}
} else {
buf.putInt(0);
}
}
}

public static TaskId readFrom(final ByteBuffer buf) {
return new TaskId(buf.getInt(), buf.getInt());
public static TaskId readFrom(final ByteBuffer buf, final int version) {
final int topicGroupId = buf.getInt();
final int partition = buf.getInt();
final String namedTopology;
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
final int numNamedTopologyChars = buf.getInt();
final StringBuilder namedTopologyBuilder = new StringBuilder();
for (int i = 0; i < numNamedTopologyChars; ++i) {
namedTopologyBuilder.append(buf.getChar());
}
namedTopology = namedTopologyBuilder.toString();
} else {
namedTopology = null;
}
return new TaskId(topicGroupId, partition, getNamedTopologyOrElseNull(namedTopology));
}

/**
* @return the namedTopology name, or null if the passed in namedTopology is null or the empty string
*/
private static String getNamedTopologyOrElseNull(final String namedTopology) {
return (namedTopology == null || namedTopology.length() == 0) ?
null :
namedTopology;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TaskId taskId = (TaskId) o;

if (o instanceof TaskId) {
final TaskId other = (TaskId) o;
return other.topicGroupId == this.topicGroupId && other.partition == this.partition;
} else {
if (topicGroupId != taskId.topicGroupId || partition != taskId.partition) {
return false;
}

if (namedTopology != null && taskId.namedTopology != null) {
return namedTopology.equals(taskId.namedTopology);
} else {
return namedTopology == null && taskId.namedTopology == null;
}
}

@Override
public int hashCode() {
final long n = ((long) topicGroupId << 32) | (long) partition;
return (int) (n % 0xFFFFFFFFL);
return Objects.hash(topicGroupId, partition, namedTopology);
}

@Override
public int compareTo(final TaskId other) {
final int compare = Integer.compare(this.topicGroupId, other.topicGroupId);
return compare != 0 ? compare : Integer.compare(this.partition, other.partition);
if (namedTopology != null && other.namedTopology != null) {
final int comparingNamedTopologies = namedTopology.compareTo(other.namedTopology);
if (comparingNamedTopologies != 0) {
return comparingNamedTopologies;
}
} else if (namedTopology != null || other.namedTopology != null) {
LOG.error("Tried to compare this = {} with other = {}, but only one had a valid named topology", this, other);
throw new IllegalStateException("Can't compare a TaskId with a namedTopology to one without");
}
final int comparingTopicGroupId = Integer.compare(this.topicGroupId, other.topicGroupId);
return comparingTopicGroupId != 0 ? comparingTopicGroupId : Integer.compare(this.partition, other.partition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2013,6 +2013,11 @@ private void updateSubscribedTopics(final Set<String> topics, final String logPr
setRegexMatchedTopicToStateStore();
}


public synchronized List<String> fullSourceTopicNames() {
return maybeDecorateInternalSourceTopics(sourceTopicNames);
}

// following functions are for test only
public synchronized Set<String> sourceTopicNames() {
return sourceTopicNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
for (final File taskDir : listNonEmptyTaskDirectories()) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parse(dirName);
final TaskId id = TaskId.parseTaskDirectoryName(dirName, null);
if (!lockedTasksToOwner.containsKey(id)) {
try {
if (lock(id)) {
Expand Down Expand Up @@ -417,7 +417,7 @@ private void cleanRemovedTasksCalledByUser() throws Exception {
final AtomicReference<Exception> firstException = new AtomicReference<>();
for (final File taskDir : listAllTaskDirectories()) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parse(dirName);
final TaskId id = TaskId.parseTaskDirectoryName(dirName, null);
if (!lockedTasksToOwner.containsKey(id)) {
try {
if (lock(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ private void tryToLockAllNonEmptyTaskDirectories() {

for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) {
try {
final TaskId id = TaskId.parse(dir.getName());
final TaskId id = TaskId.parseTaskDirectoryName(dir.getName(), null);
if (stateDirectory.lock(id)) {
lockedTaskDirectories.add(id);
if (!tasks.owned(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.HostInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -206,14 +207,14 @@ private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out) th
// encode active tasks
out.writeInt(activeTasks.size());
for (final TaskId id : activeTasks) {
id.writeTo(out);
id.writeTo(out, usedVersion);
}

// encode standby tasks
out.writeInt(standbyTasks.size());
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
final TaskId id = entry.getKey();
id.writeTo(out);
id.writeTo(out, usedVersion);

final Set<TopicPartition> partitions = entry.getValue();
writeTopicPartitions(out, partitions);
Expand Down Expand Up @@ -382,7 +383,7 @@ private static void decodeActiveTasks(final AssignmentInfo assignmentInfo,
final int count = in.readInt();
assignmentInfo.activeTasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
assignmentInfo.activeTasks.add(TaskId.readFrom(in));
assignmentInfo.activeTasks.add(TaskId.readFrom(in, assignmentInfo.usedVersion));
}
}

Expand All @@ -391,7 +392,7 @@ private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo,
final int count = in.readInt();
assignmentInfo.standbyTasks = new HashMap<>(count);
for (int i = 0; i < count; i++) {
final TaskId id = TaskId.readFrom(in);
final TaskId id = TaskId.readFrom(in, assignmentInfo.usedVersion);
assignmentInfo.standbyTasks.put(id, readTopicPartitions(in));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
public final class StreamsAssignmentProtocolVersions {
public static final int UNKNOWN = -1;
public static final int EARLIEST_PROBEABLE_VERSION = 3;
public static final int MIN_NAMED_TOPOLOGY_VERSION = 10;
public static final int LATEST_SUPPORTED_VERSION = 9;
//When changing the versions update this test: streams_upgrade_test.py::StreamsUpgradeTest.test_version_probing_upgrade
//Add add a unit test in SubscriptionInfoTest
// When changing the versions:
// 1) Update the version_probing_message and end_of_upgrade_message in streams_upgrade_test.py::StreamsUpgradeTest.test_version_probing_upgrade
// 2) Add a unit test in SubscriptionInfoTest and/or AssignmentInfoTest

private StreamsAssignmentProtocolVersions() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.namedtopology;

import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;

import java.util.Properties;

public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {

//TODO It should be possible to start up streams with no NamedTopology (or regular Topology) at all, in the meantime we can just pass in an empty NamedTopology
public KafkaStreamsNamedTopologyWrapper(final NamedTopology topology, final Properties props, final KafkaClientSupplier clientSupplier) {
super(topology, props, clientSupplier);
}

public NamedTopology getTopologyByName(final String name) {
throw new UnsupportedOperationException();
}

public void addNamedTopology(final NamedTopology topology) {
throw new UnsupportedOperationException();
}

public void removeNamedTopology(final NamedTopology topology) {
throw new UnsupportedOperationException();
}
}
Loading