Skip to content

storagenode,client: introduce LogStreamAppender #433

@ijsong

Description

@ijsong

Motivation

Varlog is a storage system for logs that is both ordered and distributed. To ensure that the storage system is scalable and tolerant to faults, the storage layer in Varlog is partitioned. These partitions are called LogStreams, and each LogStream orders log entries sequentially in what we call "Local Order". The log entries are also arranged sequentially across LogStreams in what we call "Global Order".

Certain applications, such as Kov (Kafka on Varlog), only require "Local Order" and can use the Varlog client. However, this client has some limitations, such as blocking operations until prior results are received and only providing synchronous patterns. Supporting asynchronous Append operations is not trivial. See #441.

To address this, a new asynchronous client has been proposed that supports only Append operation to a single log stream - LogStreamAppender.

Design

The LogStreamAppender operates with a single LogStream. It can append new log entries only to a specific LogStream. Although it only works with specific LogStream, it can take advantage of asynchronous patterns, hence, does not block user codes.

// LogStreamAppenderOption configures a LogStreamAppender.
type LogStreamAppenderOption interface {
}

// WithPipelineSize sets request pipeline size. The default pipeline size is
// two. Any value below one will be set to one, and any above eight will be
// limited to eight.
func WithPipelineSize(pipelineSize int) LogStreamAppenderOption {
}

// WithDefaultBatchCallback sets the default callback function. The default callback
// function can be overridden by the argument callback of the AppendBatch
// method.
func WithDefaultCallback(callback AppendBatchCallback) LogStreamAppenderOption {
}

type Varlog interface {
	// NewLogStreamAppender returns a new LogStreamAppender. The argument ctx
	// is used for all lifetimes of the result LogStreamAppender.
	NewLogStreamAppender(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, opts ...LogStreamAppenderOption) (LogStreamAppender, error)
}

// BatchCallback is a callback function to notify the result of
// AppendBatch.
type BatchCallback func([]varlogpb.LogEntryMeta, error)

// LogStreamAppender is a client only to be able to append to a particular log
// stream.
type LogStreamAppender interface {
	// AppendBatch appends dataBatch to the given log stream asynchronously.
	// Users can call this method without being blocked until the pipeline of
	// the LogStreamAppender is full. If the pipeline of the LogStreamAppender
	// is already full, it may become blocked. However, the process will
	// continue once a response is received from the storage node.
	// On completion of AppendBatch, the argument callback provided by users
	// will be invoked. All callback functions registered to the same
	// LogStreamAppender will be called by the same goroutine sequentially.
	// Therefore, the callback should be lightweight. If heavy work is
	// necessary for the callback, it would be better to use separate worker
	// goroutines.
	// The only error from the AppendBatch is ErrClosed, which is returned when
	// the LogStreamAppender is already closed. It returns nil even if the
	// underlying stream is disconnected and notifies errors via callback.
	AppendBatch(dataBatch [][]byte, callback BatchCallback) error

	// Close closes the LogStreamAppender client. Once the client is closed,
	// calling AppendBatch will fail immediately. If AppendBatch still waits
	// for room of pipeline, Close will be blocked. It also waits for all
	// pending callbacks to be called.
	Close()
}

NewLogStreamAppender returns a new client for LogStreamAppender, which provides asynchronous Append APIs for the particular log stream specified by the argument tpid and lsid.
The result client offers only Append to the specific log stream. When the client triggers multiple operations in a goroutine, the appended log entries are kept in the operation order.
Different clients connected to the same log stream can not follow the order. Even if two clients, c1 and c2, connected to the same log stream, call Append interleaved in a single goroutine, it cannot guarantee "Local Order" in two clients' operations.

Challenges

These are the challenges we need to address:

  • What is the potential performance improvement we can achieve with this enhancement?
  • How do we accurately measure the performance gain?
  • When utilizing an internal buffer, how can we determine the optimal parameters?
  • What are some ideal uses for our clients?

Tasks

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions