DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Henry Cai, Thomas Thornton
Status
Current state: Withdrawn
Discussion thread: here
JIRA: KAFKA-19225
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In KIP-405, the community has proposed and implemented the tiered storage for old Kafka log segment files, when the current log segment file is closed, it becomes eligible to be uploaded to cloud's object storage and later when the file age is older than local.retention.ms, it is removed from the local storage to reduce local storage cost. KIP-405 only uploads older log segments but not the most recent active log segments (write-ahead logs). Thus in a typical 3-way replicated Kafka cluster, the 2 follower brokers would still need to replicate the active log segments from the leader broker. It is common practice to set up the 3 brokers in three different AZs to improve the high availability of the cluster. This would cause the replications between leader/follower brokers to be across AZs which is a significant cost (various studies show the across AZ transfer cost typically comprises 50%-60% of the total cluster cost). Since all the active log segments are physically present on three Kafka Brokers, they still comprise significant resource usage on the brokers. The state of the broker is still quite big during node replacement, leading to longer node replacement time. KIP-1150 recently proposes diskless Kafka topic, but leads to increased latency and a significant redesign. In comparison, this proposed KIP maintains identical performance for acks=1 producer path, minimizes design changes to Kafka, and still slashes cost by an estimated 43%.
One of the reasons why Kafka tiered storage is not using object storage for active log segments is due to the latency and cost of uploading active log segments to traditional object storage (e.g. Amazon S3). A typical S3 object upload usually takes 100ms and the cost of uploading will quickly add up astronomically if we need to upload many small log segments to S3 to keep up with the incoming write traffic to a Kafka topic.
However, with the new development and pricing discount of S3 express one Zone bucket (S3E1Z) and EBS, there is a possible path to store active log segment files to the cloud.
Proposed Changes
In this KIP, we propose to extend the tiered storage support for active log segment (write-ahead logs) as well. The active log segments will be eligible to be uploaded onto object storage once they get to a certain size or pass a certain retention time.
Overall architecture and Cloud Storage for Active Log Segment
The overall architecture is mainly centered around using background tasks to upload a section of active log segments from the leader broker to the object storage and download them onto the follower broker. As a result the follower broker no longer directly reads the data from the leader broker during FetchRequest/Response flow. Instead the data flows from the leader broker to the object storage and then to the follower broker without paying for across-AZ transfer cost. Note: for most cloud storage providers there will still be some costs (e.g., PUT/GET cost per request, transfer surcharge for performant object storage), but these are typically a fraction of the original across-AZ transfer cost.
For the active log segments, due to the fast and frequent updating nature of those write ahead logs, we would need to upload them into a fast object storage. In terms of object cloud storage choice, there are fast storage types that provide single digit ms upload latency and with about half of the cost of file uploading. On the other hand, EBS would also be a good candidate for storing active Kafka log segments since EBS also offers single digit ms latency access and offers free transfer cost if the IOPS is under a certain threshold. EBS requires additional set up to allow for multiple hosts to read the same volume (example), while S3E1Z works out of the box. S3E1Z and EBS are the choice in AWS world, GCS and Azure have similar competing products as well. In the case of cloud providers where cross-AZ data transfer is free (e.g., Azure), there won't be any cross AZ savings, but there would still be broker savings. Since this approach only needs two replicas, the typical three replicas can be reduced.
With the advancement of S3E1Z bucket and EBS, we can choose to replicate the most recent active log segment (we call them WriteAheadLog shortnamed WAL Log) onto those fast short-term cloud storage. Using S3E1Z bucket to store active log segment files is not a new idea and in fact several commercial cloud native Kafka offerings (Confluent Freight, WarpStream, AutoMQ) are using this bucket type to implement stateless Kafka brokers.
These cloud storage (S3E1Z or EBS) are internally multi-way replicated, once the kafka log data is uploaded into these cloud storage, there is not much need to replicate further into other Kafka follower brokers. Therefore in theory, you can reduce the replication factor of a Kafka topic to 1 to save the cost of 2 Kafka brokers. In practice and for the initial version of this KIP, you might want to set up one or more Kafka follower brokers as a hot-standby or serve as an extra read replica for the downstream consumers in the same AZ (as the follower broker) to save across AZ traffic cost for downstream consumers. But the replication from cloud storage to those extra Kafka follower brokers are free. For a future enhancement of this KIP, we can support a mode which ends the replication when the data is uploaded to object storage and elect a new leader from any brokers when the old leader crashes. The new leader would need to sync up with the object storage for the last few seconds of data when it starts up as the new leader (and read the rest of the data async in the background). Additionally, the follower uses the leader epoch to filter out any messages sent by the old leader. Only metadata from the latest leader is processed.
Note although this proposal is promoting to use fast cloud storage (S3E1Z or EBS) to store active log segment, the interfaces proposed in this proposal can also be easily implemented on a traditional cloud storage (e.g. S3) with a high latency.
Key Classes and Constructs
We are proposing to reuse and extend the data structures and constructs introduced in KIP-405 to support active log segments uploading. Most of the new classes in this KIP follow their counterparts from KIP-405 with the convention of adding WAL (WriteAheadLog) in the classes name or config parameters to indicate they are to support active write-ahead log segments. We briefly mention a few classes in this section to introduce the new design, the details of the class design are in the next section Public Interfaces.
Configs
If a topic is configured with remote.wal.storage.enable=true (similar to remote.storage.enable in KIP-405), we will trigger the background uploading/downloading of the active log segment for the topic
RemoteWalStorageManager
Similar to RemoteStorageManager in KIP-405, we introduce an interface RemoteWalStorageManager to support uploading/downloading active WAL log segments. The main methods of the class are copyLogSegmentData and fetchLogSegments.
RemoteLogManager.RLMWalTask
Similar to RemoteLogManager.RLMTask in KIP-405, RLMWalTask is a background task which is responsible to upload a section of active log segments on leader broker and download the active log segments on follower broker;
The section of active log segment being copied is org.apache.kafka.common.record.FileRecords which represents the batch records from the last offset it was uploaded to the current log end of the log segment.
RemoteWalCombinedLogSegmentMetadata and metadata topic __remote_wal_log_metadata
Similar to RemoteLogSegmentMetadata which is published to topic __remote_log_metadata topic when each log segment is uploaded onto object storage in KIP-405, we are publishing a RemoteWalCombinedLogSegmentMetadata to topic __remote_wal_log_metadata when the log segment is uploaded onto object storage. The reason it is a combined log segment is for performance reasons. If we only upload one log segment for one topic partition, we would incur a huge cost of S3 transfer and the log segment will be too small. So in the implementation we would combine the log segments from multiple topic partitions and do the upload.
We chose to create a separate metadata topic __remote_wal_log_metadata because the uploading of active log segment is much more frequent comparing to closed log segment. We envision we will upload a log segment in the size of 300-500KB every 10-20ms, due to the frequency of the metadata publishing and the ensuing size of the topic, it is recommended to set the retention of the topic to small value. The minimum value of the topic retention needs to be bigger than all tiered storage topic's local.retention.ms because when the time exceeds tiered storage topic local.retention.ms, the closed log segments of the topic will be removed from local storage since they are assumed to uploaded to slow tiered storage already. Once they are on tiered storage object storage there is no need to keep active log segments anymore in the fast object storage.
Reads are done on a byte range of a combined object (metadata message contains the byte range), and read costs are only billed only on that byte range. Byte range requests are supported across the major cloud storage providers.
RemoteLogManager.RLMWalCombinerTask
Since we are combining log segments from multiple partitions, RLMWalTask will actually submit the candidate batch records from a topic partition into a queue and RLMWalCombinerTask will read those records from the queue and combine them into Combined LogSegment and upload to object storage.
Producer Path to The Leader Broker
When a Kafka broker receives a ProduceRequest from producer, KafkaApis.handleProduceRequest is called, a MemoryRecords byte buffer will be allocated to hold the produce batch and be appended to the active log segments of the specified topic partition.
If the topic is configured with remote.wal.enable=true, the async task RemoteLogManager.RLMWalTask will be periodically woken up to copy the remaining records in the current active Log segment files to cloud storage and publish a RemoteLogSegmentMetadata to metadata topic __remote_wal_log_metadata;
Producer Acknowledgement
Acks=1
When the producer produces using acks=1, the append of produce records onto the log segment file on the local disk will mark the receiving of produce records and a ProduceResponse reply will be sent out in this request/reply cycle. The async copy of active log segments onto Object storage will not affect the message acknowledgement.
Since we added the object storage layer in the active log segment producer path, if a leader is lost, then the data that is lost is equivalent to the amount of data that is waiting to be written to object storage. This is similar to the original Kafka design where the amount of data lost is the data on the leader broker but not yet replicated to the followers. The amount of data lost can be mitigated by uploading to object storage more frequently.
Acks=-1 / Acks=all
When the producer produces using acks=-1, the append of produce records to local log segment will not mark the completion of the message receiving, instead a DelayedProduce purgatory action will be triggered (this is the current Kafka implementation). There are two options for when to mark the produce request as successful:
- The leader broker will wait until all in-sync follower replicas have reached the given offset before the acknowledgement signal can be sent back. The async upload of active log segments onto object storage and the async download of the active log segment will be indirectly involved in the acks=-1 reply cycle; they will add one more hop (the object storage layer) in the cycle.
- The leader broker will wait only until the data is replicated to the object storage layer. Since the data is already in the object storage layer, it is multi-way replicated. There will be some additional time if the leader is lost for the follower to catch up from reading from object storage, but should at most be a few seconds of data. The result will be comparable performance to the current acks=-1 latency.
There is additional analysis of these two options in Performance Considerations below.
The role of object storage for replication factor
The WAL log segment files on fast cloud storage can serve as the remote replica for the given topic partition. In theory, the replication factor of the topic can be 1 (i.e. only the leader broker is configured). When the leader broker is crashed, a new leader broker can be configured and re-bootstrapped with the records coming from the fast cloud storage.
The above bootstrapping process might take some time to finish, for the use cases which are sensitive to time delay the user might want to configure one or more follower brokers as a hot standby. Alternatively as a future enhancement of this KIP, we can optimize the bootstrap time of the new leader by having it sync with the object storage on the last few seconds of the data when it becomes the new leader (and sync the rest of the data slowly async in the background). In a well operated Kafka pipeline, the consumer is usually only behind on the tail of the Kafka queue by a few seconds, once the new leaders get these few seconds of data from object storage the consumer can resume the consumption flow.
Replication and Download on follower broker
When the topic’s replication factor is > 1, the follower broker will send the normal FetchRequest to the lead broker to replicate data. However the leader broker will just respond with empty MemoryRecords as the reply. The follower broker will retrieve log segment metadata from __remote_wal_log_metadata topic and ask RemoteLogManager to transfer the records from the corresponding WAL log segment files. This data transfer will not incur the across AZ network transfer cost. The main purpose of FollowerFetchReqeust/FollowerFetchResponse is now just to update the offsets and high watermark between leader and follower.
The current API does support sending metadata with empty records object. However, the interface may be expanded to have a cleaner way of supporting this (e.g., only return metadata & directly update replicated offset).
Life Cycle of the active log segments on the cloud storage
- The active log segment are created and uploaded onto object storage on time interval from the leader broker;
- The active log segment on the fast object storage (e,g, S3E1Z) stays to be consumed by all follower brokers;
- At the same time, KIP-405 will also be working to upload older log segments onto slower long-term object storage (e.g. S3) when local.retention.ms passed for the log segments; When the main log segments uploads finishes for KIP-405, the active log segments on fast object storage can be deleted by the log cleaner thread;
- Since we are combining multiple log segments from multiple topic partitions during the active log segment uploads, the log cleaner will actually need to wait until segments from all topic partitions are uploaded to long term object storage before deleting the combined log segment file. The retention of the log segment file is equal to the max of all topics’ local.retention.ms.
Performance Considerations
Reliance on PageCache
Readers will notice that we are not proposing a diskless implementation as in KIP-1150, instead we are using the cloud storage as an intermediate data hop before the data is replicated onto follower brokers. Both the leader broker and follower broker still build a list of log segment files and consumer clients still fetch those log segments which are cached in Page Cache. This design maintains the same performance as traditional Kafka by relying on its high performance throughput core tenet: page cache.
Same performance for acks=1 producer path and close performance for acks=-1
The background upload of active log segments to object storage (and subsequent download) is performed asynchronously behind the scenes, and can be tuned for larger batch sizes. For acks=1 producers (which is still a majority of use cases in many companies), the data transfer to object storage has no impact on producer acknowledgement, the producer will be able to receive the reply as soon as the data is written into the page cache of the lead broker.
For acks=-1 producer path, there is a slight extra delay because data is transferred onto object storage first before it is downloaded onto the follower broker. This delay can be reduced by tuning the batch size of the upload. And for some use cases, the user might be able to treat data synced to object storage as acks=-1 completed since the data on object storage is multi-way replicated already. For this as a future enhancement we can add a mode to acknowledge back to the producer as soon as the data is uploaded onto object storage. This would give us the same performance with current Kafka implementation. The extra work we would need to do to complete this mode is to have the new leader (elected from the follower broker during leadership change) to pause and catch up with the object storage when it becomes the new leader. ..
Performance tuning of uploading Active Log Segments onto Cloud Storage
Uploading log segments onto S3 is a tradeoff, faster shorter upload will reduce the latency but will pay for the cost of more uploads and smaller log segments. To achieve a reasonable transfer cost and file size, we would need to combine the log segments from multiple topic partitions and increase the batch size of the upload (and thus increase the latency).
Using a traditional cloud storage (e.g. S3) for active log segments
The proposal of using cloud storage for active log segment can also be implemented on traditional object storage (e.g. S3), you can implement both RemoteStorageManager and RemoteWalStorageManager on the same class. Although S3 has higher latency than S3E1Z, S3 provides across-AZ durability which might be required for some use cases. For an implementation using S3 as active log segment storage, you would need to configure the system to upload the segment in lower frequency with higher batch size.
AZ Availability
In the proposed design, a typical setup would involve a leader broker in AZ1, a follower broker in AZ2, and an S3 Express One Zone (S3E1Z) bucket in AZ1. Co-locating the leader and the S3E1Z bucket in the same AZ minimizes upload latency. This configuration provides robust data redundancy across two AZs with what is effectively five replicas of the data (given that S3E1Z is internally multi-replicated within its AZ). For users concerned about a simultaneous failure of both AZ1 and AZ2, an additional follower can be placed in a third AZ to achieve complete three-AZ coverage, still benefiting from the elimination of cross-AZ replication costs.
Leader Failover and Recovery in an AZ Outage
A critical scenario to consider is the failure of an entire Availability Zone, which would take down both the leader broker and its co-located S3E1Z bucket. In this rare event, the cluster remains available by electing a new leader in a different AZ. Once a new leader (e.g., in AZ2) is elected, replica reconciliation takes place. This is similar to standard Kafka. If followers have more events than the high watermark, they will need to truncate them. Below is an example to illustrate this:
Suppose there is a remote WAL enabled topic with replication factor 3. The current leader of a partition is B1. B1 has received data up to offset 10, written data to S3E1Z up to offset 10, and published metadata up to offset 10. One follower, B2, has replicated up to offset 5, and follower B3, up to offset 7. Suppose B1 dies (e.g., AZ goes down), and B2 is elected as the new leader. B2 overwrites the LEO/high-watermark based on its own LEO, which is 5. B2 will tell its followers to reset their LEO to 5. So B3, which had an LEO of 7, will discard 2 messages and reset 5 to match B2, the new leader. In addition, B2 will also publish metadata DELETE events for any metadata entries it receives that are after its LEO (e.g., for messages 6-10), since they are no longer valid.
In the case that B3 was elected, with the higher end offset of offset 7. It will perform replica reconciliation and discard its data that is greater than the high watermark of 5. This prevents any sacrifices in availability, and is equivalent to standard Kafka.
Data Durability
In acks=all flow, we maintain the same data durability as today by requiring the data gets to the follower before we can acknowledge back to the producer;
In acks=1 flow, we have similar data durability as today by waiting to update the high watermark until the data arrives on the follower broker and the consumer won't be able to read the new message until the high watermark moves passing the new record. There is an extra latency for the message flows from object storage to the follower broker (in the order of 10ms for bigger batches), this extra latency would mean the consumer needs to wait longer and it also mean the producer would have a longer window for data loss. In today's code, we can lose about 10ms' data for the data on the way from the leader to the follower when the leader crashes, in this proposed KIP we can lose maybe 20ms' data for the data on the way from the leader to the object storage and then to the follower broker when the leader crashes. Although the data loss window doubles but it only happens on leader broker crash which is a rare event, most of the acks=1 data flow can tolerate this occasional data loss.
Public Interfaces
We are proposing to reuse and extend the data structures and constructs introduced in KIP-405 to support active log segments uploading. Most of the new classes in this KIP follow their counterparts from KIP-405 with the convention of adding WAL (WriteAheadLog) in the classes name or config parameters to indicate they are to support active write-ahead log segments.
org.apache.kafka.common.config.TopicConfig
- If a topic is configured with remote.wal.storage.enable=true, we will trigger the background uploading/downloading of the active log segment for the topic
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
- remote.log.wal.storage.manager.impl.prefix: config prefix for RemoteWalStorageManager configuration parameters
- remote.log.wal.storage.system.enable: boolean parameter to enable the remote wal storage systems
- remote.log.wal.storage.manager.class.name: classname for RemoteWalStorageManager
- remote.log.wal.storage.manager.class.path: classpath to load RemoteWalStorageManager
- remote.wal.log.manager.thread.pool.size: thread pool size for RLMWalTask treads
- remote.wal.log.manager.combiner.thread.pool.size: thread pool size for RLMWalCombiner task threads
- remote.wal.log.manager.combiner.queue.size: queue size for RemoteWalCombiner task
- remote.wal.log.manager.combiner.task.interval.ms: controls how frequent the combiner task runs
- remote.wal.log.manager.combiner.task.upload.bytes: control how big the upload packet is going to be;
org.apache.kafka.server.log.remote.storage.RemoteWalCombinedLogSegmentMetadata
- Similar to RemoteLogSegmentMetadata which is published to topic __remote_log_metadata topic when each log segment is uploaded onto object storage in KIP-405, we are publishing a RemoteWalCombinedLogSegmentMetadata onto metadata topic: __remote_wal_log_metadata when the log segment is uploaded onto object storage. The reason it is a combined log segment is for performance reasons. If we only upload one log segment for one topic partition, we would incur a huge cost of S3 transfer and the log segment will be too small. So in the implementation we would combine the log segments from multiple topic partitions and do the upload.
public class RemoteWalCombinedLogSegmentMetadata extends RemoteLogMetadata {
/**
* Universally unique remote log segment id.
*/
private final Uuid combinedSegmentId;
private final List<RemoteLogSegmentMetadataEntry> remoteLogSegmentMetadataEntries;
/**
* Custom metadata.
* Custom metadata can be used to encode the object path in object storage and/or the bucket name in the object storage. This is especially useful if you use multiple buckets in the object storage.
*/
private final Optional<CustomMetadata> customMetadata;
/**
* It indicates the state in which the action is executed on this segment.
*/
private final RemoteLogSegmentState state;
/**
* Metadata entry for each log segment
*/
public static class RemoteLogSegmentMetadataEntry {
RemoteLogSegmentId remoteLogSegmentId;
long startFilePosition; // start position in the file for this segment
long endFilePosition; // end position in the file for this segment
}
}
org.apache.kafka.server.log.remote.storage.RemoteWalCombinedLogSegmentMetadataUpdate
Similar to class RemoteLogSegmentMetadataUpdate class in KIP-405, this MetadataUpdate class captures the start and end file position for each log segment in the combined log segments and is constructed when RemoteWalStorageManager#copyLogSegments is called.
public class RemoteWalCombinedLogSegmentMetadataUpdate extends RemoteLogMetadata {
/**
* Universally unique remote log segment id.
*/
private final Uuid combinedSegmentId;
private final List<RemoteLogSegmentMetadataEntry> remoteLogSegmentMetadataEntries;
/**
* Custom metadata.
*/
private final Optional<CustomMetadata> customMetadata;
/**
* It indicates the state in which the action is executed on this segment.
*/
private final RemoteLogSegmentState state;
org.apache.kafka.server.log.remote.storage.RemoteWalStorageManager
Similar to RemoteStorageManager in KIP-405, we introduce an interface RemoteWalStorageManager to support uploading/downloading active WAL log segments. The main methods of the class are copyLogSegmentData and fetchLogSegments.
package org.apache.kafka.server.log.remote.storage;
/**
* Copies the given List of {@link Records} provided for the given {@code remoteWalCombinedLogSegmentMetadata}.
* {@code remoteWalCombinedLogSegmentMetadata} contains the metadata for a list of segments to be copied. The list
* of {@link Records} is the list of {@link org.apache.kafka.common.record.FileRecords} data objects be copied for
* each log segment.
* <p>
* This operation is expected to be idempotent. If a copy operation is retried and there is existing content already written,
* it should be overwritten, and do not throw {@link RemoteStorageException}
*
* @param remoteWalCombinedLogSegmentMetadata metadata about the combined remote wal log segment.
* @param recordsList a list of {@link Records} to be copied
* @return RemoteWalCombinedLogSegmentMetadataUpdate which contains the metadata for the copied segments
* @throws RemoteStorageException if there are any errors in storing the data of the segment.
*/
public interface RemoteWalStorageManager extends Configurable, Closeable {
RemoteWalCombinedLogSegmentMetadataUpdate copyLogSegmentData(
RemoteWalCombinedLogSegmentMetadata remoteWalCombinedLogSegmentMetadata, List<Records> recordsList)
throws RemoteStorageException;
/**
* Returns the remote log segment data file/object as InputStream for the given {@link RemoteWalLogSegmentMetadata}
* starting from the given startPosition. The stream will end at the smaller of endPosition and the end of the
* remote log segment data file/object.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param startPosition start position of log segment to be read, inclusive.
* @param endPosition end position of log segment to be read, inclusive.
* @return input stream of the requested log segment data.
* @throws RemoteStorageException if there are any errors while fetching the desired segment.
* @throws RemoteResourceNotFoundException the requested log segment is not found in the remote storage.
*/
InputStream fetchLogSegment(RemoteWalLogSegmentMetadata remoteLogSegmentMetadata,int startPosition,int endPosition) throws RemoteStorageException;
}
kafka.log.remote.RemoteLogManager.RLMWalTask
Similar to RemoteLogManager.RLMTask in KIP-405, RLMWalTask is a background task which is responsible to upload a section of active log segments on leader broker and download the active log segments on follower broker;
The section of active log segment being copied is from the last offset it was uploaded to the current log end of the log segment. The section of batch records is represented by Records (and FileRecords are the actual implementation)
Since we are combining log segments from multiple topic partitions, RLMWalTask (one for each topic partition) is submitting the records for the given topic partition through in-memory queue remoteSegmentQ, the next task RLMWalCombineTask is going to combine the records from multiple topic partitions and submit them as one CombinedLogSegment to object storage.
class RLMWalTask extends CancellableRunnable {
private final TopicIdPartition topicIdPartition;
private final Logger logger;
private final BlockingQueue<MetaQueueItem> remoteSegmentQ;
public RLMWalTask(TopicIdPartition topicIdPartition,
BlockingQueue<MetaQueueItem> remoteSegmentQ) {
static class MetaQueueItem {
private final RemoteWalLogSegmentMetadata metadata;
private final Records records;
kafka.log.remote.RemoteLogManager.RLMWalCombineTask
Since we are combining log segments from multiple topic partitions, RLMWalTask (one for each topic partition) is submitting the records for the given topic partition through in-memory queue remoteSegmentQ, the next task RLMWalCombineTask is going to combine the records from multiple topic partitions and submit them as one CombinedLogSegment to object storage.
class RLMWalCombineTask extends CancellableRunnable {
private final int combinerSize = 1;
private final BlockingQueue<MetaQueueItem> remoteMetadataQ;
private final int customMetadataSizeLimit;
private final Logger logger;
public RLMWalCombineTask(BlockingQueue<MetaQueueItem> remoteMetadataQ, int customMetadataSizeLimit) {
When RLMWalCombineTask is uploading records to the object storage, it is also publishing multiple events to __remote_wal_log_metadata topic. It is submitting one RemoteWalCombinedLogSegMetadata event followed by a RemoteWalLogSegmentMetadata event for each topic partition. The reason multiple events are submitted is because each ConsumerTask from KIP-405 is only listening for events for its own topic partition (it listens for a specific Metadata Partition in __remote_wal_log_metadata topic) and therefore a separate metadata event for each topic partition needs to be published.
And follow the same pattern from RLMTask in KIP-405, a RemoteLogSegmentMetatadata event is first published to metadata topic to mark COPY_SEGMENT_STARTED state and then after object uploads completes a RemoteLogSegmentMetadataUpdate event is published with COPY_SEGMENT_FINISHED state which also contains the extra information (e.g. custom metadata or file start/end position) retrieved from the object storage upload.
Here is a code snippet for the upload events during records upload:
for (MetaQueueItem item : metas) {
RemoteWalLogSegmentMetadata copySegmentStartedRlsm = item.getMetadata().createWithUpdate(combinedSegmentId);
// For each topic partition, publish out the Metadata start event
remoteLogMetadataManager.addRemoteWalLogSegmentMetadata(copySegmentStartedRlsm).get();
remoteLogSegmentMetadataEntries.add(new RemoteWalCombinedLogSegmentMetadata.RemoteLogSegmentMetadataEntry(
copySegmentStartedRlsm.remoteLogSegmentId(), -1, -1));
recordsList.add(item.getRecords());
}
RemoteWalCombinedLogSegmentMetadata startMetadata = new RemoteWalCombinedLogSegmentMetadata(combinedSegmentId,
remoteLogSegmentMetadataEntries, brokerId, time.milliseconds());
// Publish the Combined Metadata Start Event
remoteLogMetadataManager.addRemoteWalCombinedLogSegmentMetadata(startMetadata).get();
// Upload the objects to object storage
RemoteWalCombinedLogSegmentMetadataUpdate metadataUpdate = remoteWalLogStorageManager.copyLogSegmentData(
startMetadata, recordsList);
}
// Publish out the Combined Metadata finished event
remoteLogMetadataManager.updateRemoteWalCombinedLogSegmentMetadata(metadataUpdate).get();
for (RemoteWalCombinedLogSegmentMetadataUpdate.RemoteLogSegmentMetadataEntry entry : metadataUpdate.remoteLogSegmentMetadataEntries()) {
RemoteWalLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteWalLogSegmentMetadataUpdate(
entry.remoteLogSegmentIdSegmentId(), time.milliseconds(), metadataUpdate.customMetadata(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId,
combinedSegmentId, combinedCoordinatorTopicPartition, entry.startFilePosition(), entry.endFilePosition());
//Publish the Metadata finished event for each topic partition
remoteLogMetadataManager.updateRemoteWalLogSegmentMetadata(copySegmentFinishedRlsm).get();
}
Compatibility, Deprecation, and Migration Plan
- We are introducing new classes and new configs to support replicating active log segments onto object storage, no existing features are changed.
Test Plan
We implemented a LocalWalStorageManager to test the E2E flow using a shared local filesystem.
Rejected Alternatives
Comparison to KIP-1150: Diskless Kafka Topic
This KIP was being developed independently without much knowledge of Aiven’s proposal of KIP-1150 (Diskless Kafka Topic). Since KIP-1150 was published earlier, we would draw some comparisons between our proposal and KIP-1150:
- KIP-1150 is a major overhaul of Kafka storage system and KIP-1150 is just the first of the 7-8 KIPs proposed by Aiven;
- KIP-1150 completely removes all local storage, thus removes the core tenant of Kafka storage system: page cache, the main reason behind a fast Kafka;
- KIP-1150 has a long acknowledgement cycle on producer message publishing, the message cannot be acknowledged back to the producer until it is uploaded onto object storage. This performance degradation becomes more severe on acks=1 producer path where the data acknowledgement was used to be fast (as soon as the record is written to lead broker’s page cache). Although Kafka has promoted users to use acks=-1 as default for many years, acks=-1’s performance is significantly slower than acks=1 due to multiple acks signals needing to be collected from all follower brokers. For this reason, many company’s logging pipelines are still using acks=1 since occasional data loss are usually tolerable for logging use cases;
- KIP-1150 understands the diskless topic cannot replace all use cases since the upload onto object storage is going to be slower than local disk performance; In its blog, it targets its main usage for applications running in 200-400 ms latency
In contrast, our implementation is a gradual evolution of KIP-405 tiered storage. We are reusing many data structures and classes from KIP-405 (e.g. RemoteLogManager, RLMTask, RemoteLogSegmentMetadata, metadata topic). The amount of code change and new concepts introduced are more manageable in our proposal.
We understand the importance of page cache in traditional Kafka design and we still based our implementation of using local log segment files and its page cache, the object storage for active log segments are added to remove the across-AZ transfer cost but the local files stay to provide a caching layer for fast data retrieval to clients. For this reason the performance of our system under acks=1 can still maintain the single-digit ms latency;
Our proposal is in a way also trying to reduce the state size of Kafka broker by uploading more recent log segments onto object storage but we are implementing it in a more conservative way.
Note that we mentioned KIP-1150 in the Rejected Alternative section but we don't view this KIP and KIP-1150 are all-or-nothing alternatives, they are optimized for different use cases (this KIP proposal are optimizing for low-latency acks=1 use case while KIP-1150 is optimizing for a longer latency but diskless implementation), we believe some of the ideas or implementations can be learned or adopted by both KIPs.
Comparison to KIP-405: Tiered Storage
This KIP is being developed as an evolution of KIP-405 and it is using many of the constructs introduced in KIP (e.g. RemoteLogManager, RLMTask, metadata topic, ProducerManager, ConsumerManager, MetadataStore), however there are some differences between this KIP and KIP-405:
- While KIP-405 is uploading closed log segments to a slow object storage this KIP is uploading active log segment to a fast object storage.
- In KIP-405 we would need to upload various index files and producer snapshots alongside data logs to object storage, in this proposed KIP we only need to upload the data logs to object storage. In KIP-405, all local logs on all brokers will be removed after local.retention.ms passed we would need to keep a consistent log snapshot (including all index lookup files) in object storage for future lookups and downloads; In this proposed KIP the object storage is mostly used as an intermediate data hop between the leader and the follower, the follower will build up all the indexing files and producer snapshot files while it is appending batch records from object storage to local log by calling UnifiedLog.appendAsFollower (in the same way as today's follower when the follower builds indexes while calling UnifiedLog.appendAsFollower for the records coming from FetchResponse)
- Although both KIP-405 and this proposed KIP is using __remote_log_metadata topic to keep the metadata for each data log segment, the upload frequency is a lot more frequent in this proposed KIP. In KIP-405, we are uploading a closed log segment (usually in the size of 1GB) every 5-10 minutes while in this proposed KIP we are uploading a smaller data segment and much more frequent (e.g. every 10ms). With the cost optimization of combing data from multiple topic partitions, each uploaded combined active log segments are in the size of 100KB-300KB. But the data retention of active log segment's metadata is much shorter so the overall data size of the topic is not very big, the active log segment only need to live on object storage before the data is uploaded to slow object storage through KIP-405 (happens about 5-10 minutes after the message was appended to the local log when the active log segment is closed due to log rotation).
- We are moving the metadata for active log segment into its own metadata topic __remote_wal_log_metadata to set a shorter retention on the metadata topic
- If the topic's local.retention.ms is set to 1 hour, the local data logs for the topic will be removed after 1 hour since KIP-405's design assumes that the data will already be uploaded onto object storage by 1 hour mark. For this reason we can set the retention for active log segment metadata topic to be 1 hour, by the 1 hour mark the active segment's data log on fast object storage will be removed and there is no need to keep its metadata in the metadata topic anymore.
Future enhancement
Cloud native elasticity
This KIP is extending KIP-405: Tiered Storage. KIP-405 moved all closed log segments to object storage and this KIP moves the active log segment to the object storage as well. With all data now living on cloud storage, this opens the door for cloud-native-elasticity. The consumers now can read directly from cloud storage (without connecting to the broker), and we can utilize cloud vendor's fan-out support to easily add a lot more consumer reading volume in a short amount of time (elastic consumer fan-out support). By moving in this direction majority of the traffic (consumer traffic probably comprises 2/3 of the overall traffic) will be happening outside broker, there are much less resources (network bandwidth / memory) we need to allocate to the broker. The cluster will become much easier to scale up as well.
Further reduction of latency broker resources
For simplicity reason, the current proposal is still recommending set up a follower broker as a hot standby in case the existing broker crashes and you need to fast switch to a new leader. This adds more resource requirement on the broker and also add one more data hop in the latency cycle since the data needs to travel to the follower broker before acknowledging back to the producer (for acks=all). But with a bit more work, we can remove the need for this hot standby follower. With the conjunction of KIP-405, all the active and closed log segment are now living on the cloud storage, when the old broker dies any new broker can become the new leader by syncing up the data from object storage. KIP-405 already supports downloading data from closed log segment from cloud, for the active log segment we need to download about 5-10 minutes more of data. This download volume can be even smaller because in a well maintained data pipeline the consumer is usually only behind by a few seconds, this would mean we only need to download the last few seconds of data from the cloud to the new leader for the consumer fetch to resume. The new leader can continue downloading the rest of data asynchronous on the background.
Appendix: Cost Estimate for a typical workload
Workload example
For one of our sample 3-way replicated kafka cluster of 90 i3en.2xlarge nodes, for each node we are getting 30MB/s inbound traffic, which is about 112GB per hour of data needs to be stored; And we are targeting to upload every 10ms which is about 100 IOPS.
For the proposed new architecture, we are targeting to have one writer (leader kafka broker) writes to cloud storage, one reader (one follower kafka broker) to read from the cloud storage and plan to store 1 hour of data in the cloud storage (the storage cost is usually small so storing 2 or 3 hours of data wouldn’t add much cost).
Comparison Table
See the following table for cost comparison. The charge is per month per kafka broker
Baseline | S3E1Z | |
Storage/Instance Cost | $866 | $776 |
Transfer Cost | $1612 | $648 |
Total Cost | $2478 | $1424 |
Latency | Single digit ms | Single digit ms |
Across AZ | Yes | Yes |
The result is ~43% reduction in cost.
The details of the calculation can be provided per request.
Appendix: Performance Data
Configuration
Topic perf_cloud_native: 2-way replicated, one leader (AZ: use1-az4) writes to cloud storage, one reader (AZ: use1-az6) reads from cloud storage (uses KIP-1176 code)
Topic perf_local_disk: 2-way replicated (for fair comparison), standard replication path, leader (AZ: use1-az4) writes to local disk, follower (AZ: use1-az6) reads via cross AZ-transfer
Each topic has 1 partition.
Results
End to End Latency
A custom script was run to determine end-to-end-latency, which is defined here as the difference between system time when a record is consumed and system time when a record is created in memory on the producer side. The other producer/consumer metrics are all standard.
Workload
100 records, 1.6 KB avg record size, max batch size 32kb, acks=1, consumer client.rack=use1-az6 (read from follower)
Metrics
| Metric | perf_local_disk | perf_cloud_native |
| Producer Metrics | ||
| batch-size-avg | 31019 | 31479 |
| batch-size-max | 32569 | 31481 |
| record-queue-time-avg | 8.4 | 8.2 |
| record-queue-time-max | 10 | 10 |
| record-send-total | 100 | 100 |
| record-size-avg | 1625 | 1648 |
| record-size-max | 1625 | 1648 |
| records-per-request-avg | 20 | 20 |
| request-latency-avg | 7 | 6.6 |
| request-latency-max | 9 | 9 |
| byte-total | 155095 | 157395 |
| Consumer Metrics | ||
| bytes-consumed-total | 154790 | 157090 |
| fetch-latency-avg | 470.5294118 | 470.3125 |
| fetch-latency-max | 507 | 505 |
| fetch-size-avg | 38697.5 | 52363.33333 |
| fetch-size-max | 97514 | 157090 |
| fetch-total | 17 | 16 |
| records-consumed-total | 100 | 100 |
| records-per-request-avg | 25 | 33.33333333 |
End-to-End Latency Statistics (ms) | ||
| Mean (Average) | 708.9731 | 919.7159 |
| Min Latency | 523.6274 | 915.0023 |
| Max Latency | 1020.9279 | 942.0901 |
| p50 (Median) | 528.1985 | 919.2508 |
| p75 | 1019.5718 | 921.7726 |
| p90 | 1020.3517 | 924.0798 |
| p99 | 1020.9279 | 942.0901 |
| p99.9 | 1020.9279 | 942.0901 |
Kafka Producer/Consumer Perf Test Scripts
Workload
Used kafka/bin/kafka-producer-perf-test.sh and kafka/bin/kafka-consumer-perf-test.sh.
Producer: --num-records 100000 --record-size 2000 --throughput -1 --producer-props acks=1 retries=2 linger.ms=20 delivery.timeout.ms=300000 request.timeout.ms=30000
Consumer: --messages 100000 --timeout 240000 --consumer.config consumer.properties
The consumer.properties file contained client.rack=use1-az6 (the rack of the follower broker, so we consumed from the follower).
Metrics
| Metric | perf_local_disk | perf_cloud_native |
Producer Metrics | ||
| Records sent | 100,000 | 100,000 |
| Records/sec | 20,542.32 | 20,772.75 |
| Throughput (MB/sec) | 39.18 | 39.62 |
| Avg Latency (ms) | 638.39 | 635.4 |
| Max Latency (ms) | 817 | 760 |
| 50th Percentile | 667 | 666 |
| 95th Percentile | 745 | 710 |
| 99th Percentile | 802 | 746 |
| 99.9th Percentile | 815 | 758 |
Consumer Metrics | ||
| Data Consumed (MB) | 190.73 | 190.73 |
| Throughput (MB/sec) | 44.56 | 40.83 |
| Records Consumed | 100,000 | 100,000 |
| Records/sec | 23,364.49 | 21,404.11 |
| Rebalance Time (ms) | 3,635 | 3,961 |
| Fetch Time (ms) | 645 | 711 |
| Fetch MB/sec | 295.71 | 268.26 |
| Fetch Records/sec | 155,038.76 | 140,646.98 |
Analysis
Overall we see comparable performance between a standard topic and KIP-1176 for both producer & consumer metrics, in particular throughput and latency. The producer path is near identical performance, i.e., an ack to the producer is returned in the same time. Consumer performance is slightly slower as data must travel an additional hop to/from S3E1Z. There are additional tunings we are aware of that could further boost performance (e.g., tuning the interval that the leader/follower remote WAL tasks run).
For latency, there are two latency metrics. E2E latency measures when the consumer sees the data. Producer's request-latency measures when the producer can move on after a message is sent. We can see the request-latency number is very small (<10ms) on both cloud-native and local-disk cases, this is where acks=1 setting excels at and this is also the niche of our KIP.



