Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -618,11 +619,22 @@ private static int parseAcks(String acksString) {
public void initTransactions() {
throwIfNoTransactionManager();
throwIfProducerClosed();
maybeAllocateTransactionalId();

TransactionalRequestResult result = transactionManager.initializeTransactions();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
}

private void maybeAllocateTransactionalId() {
String allocatedTransactionalId = transactionManager.transactionalId();
if (allocatedTransactionalId == null || allocatedTransactionalId.isEmpty()) {
String threadProducerId = "thread-producer-" + UUID.randomUUID().toString();
log.info("Allocating thread producer id: {}", threadProducerId);
transactionManager.setTransactionalId(threadProducerId);
}
}

/**
* Should be called before the start of each new transaction. Note that prior to the first invocation
* of this method, you must invoke {@link #initTransactions()} exactly one time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class TransactionManager {
private static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;

private final Logger log;
private final String transactionalId;
private String transactionalId;
private final int transactionTimeoutMs;

private static class TopicPartitionBookkeeper {
Expand Down Expand Up @@ -272,6 +272,10 @@ public TransactionManager(LogContext logContext, String transactionalId, int tra
this(new LogContext(), null, 0, 100L);
}

public void setTransactionalId(String transactionalId) {
this.transactionalId = transactionalId;
}

public synchronized TransactionalRequestResult initializeTransactions() {
return handleCachedTransactionRequestResult(() -> {
transitionTo(State.INITIALIZING);
Expand Down Expand Up @@ -986,8 +990,9 @@ private TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult
offsetAndMetadata.metadata(), offsetAndMetadata.leaderEpoch());
pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
}

TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(transactionalId, consumerGroupId,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, pendingTxnOffsetCommits);
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, pendingTxnOffsetCommits);
return new TxnOffsetCommitHandler(result, builder);
}

Expand Down
12 changes: 12 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String EXACTLY_ONCE = "exactly_once";

/**
* Config to use thread level producer
*/
@SuppressWarnings("WeakerAccess")
public static final String USE_EOS_THREAD_PRODUCER_CONFIG = "use.eos.thread.producer";
private static final String USE_EOS_THREAD_PRODUCER_DOC = "Flag to use thread level producer.";

/** {@code application.id} */
@SuppressWarnings("WeakerAccess")
public static final String APPLICATION_ID_CONFIG = "application.id";
Expand Down Expand Up @@ -568,6 +575,11 @@ public class StreamsConfig extends AbstractConfig {
in(NO_OPTIMIZATION, OPTIMIZE),
Importance.MEDIUM,
TOPOLOGY_OPTIMIZATION_DOC)
.define(USE_EOS_THREAD_PRODUCER_CONFIG,
Type.BOOLEAN,
false,
Importance.MEDIUM,
USE_EOS_THREAD_PRODUCER_DOC)

// LOW

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {

AssignedStandbyTasks(final LogContext logContext) {
super(logContext, "standby task");
super(logContext, "standby task", null, null, "dummy-group-id");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;

Expand All @@ -35,9 +37,14 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
private final Map<TaskId, StreamTask> restoring = new HashMap<>();
private final Set<TopicPartition> restoredPartitions = new HashSet<>();
private final Map<TopicPartition, StreamTask> restoringByPartition = new HashMap<>();

AssignedStreamsTasks(final LogContext logContext) {
super(logContext, "stream task");
private final Producer<byte[], byte[]> threadProducer;

AssignedStreamsTasks(final LogContext logContext,
final Producer<byte[], byte[]> threadProducer,
final Time time,
final String consumerGroupId) {
super(logContext, "stream task", threadProducer, time, consumerGroupId);
this.threadProducer = threadProducer;
}

@Override
Expand Down Expand Up @@ -136,41 +143,8 @@ void addToRestoring(final StreamTask task) {
* or if the task producer got fenced (EOS)
*/
int maybeCommitPerUserRequested() {
int committed = 0;
RuntimeException firstException = null;

for (final Iterator<StreamTask> it = running().iterator(); it.hasNext(); ) {
final StreamTask task = it.next();
try {
if (task.commitRequested() && task.commitNeeded()) {
task.commit();
committed++;
log.debug("Committed active task {} per user request in", task.id());
}
} catch (final TaskMigratedException e) {
log.info("Failed to commit {} since it got migrated to another thread already. " +
"Closing it as zombie before triggering a new rebalance.", task.id());
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
it.remove();
throw e;
} catch (final RuntimeException t) {
log.error("Failed to commit StreamTask {} due to the following error:",
task.id(),
t);
if (firstException == null) {
firstException = t;
}
}
}

if (firstException != null) {
throw firstException;
}

return committed;
return commitInternal(log, threadProducer,
task -> task.commitRequested() && task.commitNeeded());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
Expand All @@ -42,15 +45,24 @@ abstract class AssignedTasks<T extends Task> {
private final Map<TaskId, T> created = new HashMap<>();
private final Map<TaskId, T> suspended = new HashMap<>();
private final Set<TaskId> previousActiveTasks = new HashSet<>();
private final Producer<byte[], byte[]> eosProducer;
private final Time time;
private final String consumerGroupId;

// IQ may access this map.
final Map<TaskId, T> running = new ConcurrentHashMap<>();
private final Map<TopicPartition, T> runningByPartition = new HashMap<>();

AssignedTasks(final LogContext logContext,
final String taskTypeName) {
final String taskTypeName,
final Producer<byte[], byte[]> eosProducer,
final Time time,
final String consumerGroupId) {
this.taskTypeName = taskTypeName;
this.log = logContext.logger(getClass());
this.eosProducer = eosProducer;
this.time = time;
this.consumerGroupId = consumerGroupId;
}

void addNewTask(final T task) {
Expand Down Expand Up @@ -276,13 +288,36 @@ Set<TaskId> previousTaskIds() {
* or if the task producer got fenced (EOS)
*/
int commit() {
return commitInternal(
log,
eosProducer,
Task::commitNeeded
);
}

public interface TaskStatus {
boolean needsCommit(Task task);
}

protected int commitInternal(final Logger log,
final Producer<byte[], byte[]> eosProducer,
final TaskStatus taskStatus) {
int committed = 0;
RuntimeException firstException = null;

final Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();
final List<Task> externalCommitTasks = new ArrayList<>();

for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
final T task = it.next();
try {
if (task.commitNeeded()) {
task.commit();
if (taskStatus.needsCommit(task)) {
if (eosProducer != null) {
pendingOffsets.putAll(task.getPendingOffsets());
externalCommitTasks.add(task);
} else {
task.commit();
}
committed++;
}
} catch (final TaskMigratedException e) {
Expand All @@ -296,9 +331,7 @@ int commit() {
throw e;
} catch (final RuntimeException t) {
log.error("Failed to commit {} {} due to the following error:",
taskTypeName,
task.id(),
t);
taskTypeName, task.id(), t);
if (firstException == null) {
firstException = t;
}
Expand All @@ -309,6 +342,17 @@ int commit() {
throw firstException;
}

if (!pendingOffsets.isEmpty()) {
final long startNs = time.nanoseconds();
eosProducer.sendOffsetsToTransaction(pendingOffsets, consumerGroupId);
eosProducer.commitTransaction();
final long commitLatency = time.nanoseconds() - startNs;
for (final Task task : externalCommitTasks) {
task.markCommitDone(commitLatency);
}
eosProducer.beginTransaction();
}

return committed;
}

Expand Down
Loading