-
Notifications
You must be signed in to change notification settings - Fork 7
storagenode,client: introduce LogStreamAppender #433
Description
Motivation
Varlog is a storage system for logs that is both ordered and distributed. To ensure that the storage system is scalable and tolerant to faults, the storage layer in Varlog is partitioned. These partitions are called LogStreams, and each LogStream orders log entries sequentially in what we call "Local Order". The log entries are also arranged sequentially across LogStreams in what we call "Global Order".
Certain applications, such as Kov (Kafka on Varlog), only require "Local Order" and can use the Varlog client. However, this client has some limitations, such as blocking operations until prior results are received and only providing synchronous patterns. Supporting asynchronous Append operations is not trivial. See #441.
To address this, a new asynchronous client has been proposed that supports only Append operation to a single log stream - LogStreamAppender.
Design
The LogStreamAppender operates with a single LogStream. It can append new log entries only to a specific LogStream. Although it only works with specific LogStream, it can take advantage of asynchronous patterns, hence, does not block user codes.
// LogStreamAppenderOption configures a LogStreamAppender.
type LogStreamAppenderOption interface {
}
// WithPipelineSize sets request pipeline size. The default pipeline size is
// two. Any value below one will be set to one, and any above eight will be
// limited to eight.
func WithPipelineSize(pipelineSize int) LogStreamAppenderOption {
}
// WithDefaultBatchCallback sets the default callback function. The default callback
// function can be overridden by the argument callback of the AppendBatch
// method.
func WithDefaultCallback(callback AppendBatchCallback) LogStreamAppenderOption {
}
type Varlog interface {
// NewLogStreamAppender returns a new LogStreamAppender. The argument ctx
// is used for all lifetimes of the result LogStreamAppender.
NewLogStreamAppender(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error)
}
// BatchCallback is a callback function to notify the result of
// AppendBatch.
type BatchCallback func([]varlogpb.LogEntryMeta, error)
// LogStreamAppender is a client only to be able to append to a particular log
// stream.
type LogStreamAppender interface {
// AppendBatch appends dataBatch to the given log stream asynchronously.
// Users can call this method without being blocked until the pipeline of
// the LogStreamAppender is full. If the pipeline of the LogStreamAppender
// is already full, it may become blocked. However, the process will
// continue once a response is received from the storage node.
// On completion of AppendBatch, the argument callback provided by users
// will be invoked. All callback functions registered to the same
// LogStreamAppender will be called by the same goroutine sequentially.
// Therefore, the callback should be lightweight. If heavy work is
// necessary for the callback, it would be better to use separate worker
// goroutines.
// The only error from the AppendBatch is ErrClosed, which is returned when
// the LogStreamAppender is already closed. It returns nil even if the
// underlying stream is disconnected and notifies errors via callback.
AppendBatch(dataBatch [][]byte, callback BatchCallback) error
// Close closes the LogStreamAppender client. Once the client is closed,
// calling AppendBatch will fail immediately. If AppendBatch still waits
// for room of pipeline, Close will be blocked. It also waits for all
// pending callbacks to be called.
Close()
}NewLogStreamAppender returns a new client for LogStreamAppender, which provides asynchronous Append APIs for the particular log stream specified by the argument tpid and lsid.
The result client offers only Append to the specific log stream. When the client triggers multiple operations in a goroutine, the appended log entries are kept in the operation order.
Different clients connected to the same log stream can not follow the order. Even if two clients, c1 and c2, connected to the same log stream, call Append interleaved in a single goroutine, it cannot guarantee "Local Order" in two clients' operations.
Challenges
These are the challenges we need to address:
- What is the potential performance improvement we can achieve with this enhancement?
- How do we accurately measure the performance gain?
- When utilizing an internal buffer, how can we determine the optimal parameters?
- What are some ideal uses for our clients?
Tasks
- Change Append RPC from unary to stream. It should be transparent to users regarding client API usage and performance. feat(storagenode): change append rpc from unary to stream #449
- Change the behavior of the Append RPC handler in the storage node to support streaming RPC invocations. It also should be transparent to users. However, it has to be ready to support "LogStreamAppender". feat(storagenode): add pipelined Append RPC handler #457
- Implement "LogStreamAppender". feat(client): add LogStreamAppender #459