Skip to content

Snapshot streaming#2458

Merged
stevvooe merged 3 commits intomoby:masterfrom
anshulpundir:snap2
Dec 1, 2017
Merged

Snapshot streaming#2458
stevvooe merged 3 commits intomoby:masterfrom
anshulpundir:snap2

Conversation

@anshulpundir
Copy link
Copy Markdown
Contributor

@anshulpundir anshulpundir commented Nov 20, 2017

Based on proposal #2416

Signed-off-by: Anshul Pundir anshul.pundir@docker.com

api/raft.proto Outdated
// RaftMessageStream returns a gRPC stream that can be used to stream raft messages
// to be processed on a raft member.
// It is called from the RaftMember willing to send a message to its destination ('To' field)
rpc RaftMessageStream(stream ProcessRaftMessageRequest) returns (ProcessRaftMessageResponse) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a response type wrapper.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems confusing. The doc says that it returns a stream, but it only seems to take a stream as input. Who is the caller and callee in this scenario?

func DefaultRaftConfig() api.RaftConfig {
return api.RaftConfig{
KeepOldSnapshots: 0,
SnapshotInterval: 10000,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the big change? Was this for testing?


// Append the received snapshot data.
if recvdMsg.Message.Type == raftpb.MsgSnap {
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this arrive at the same problem? This buffers the whole thing into a single buffer. How does this address the issue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was being able to stream large snapshots without arbitrarily large gRPC message limits and timeout outs. This addresses that problem. Memory constraint is not the problem we're trying to solve, at least yet.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anshulpundir Fair enough. :)

This could double (or more!) memory usage.

Here's how we can decide: Do we have to load the entire snapshot into memory to apply it? If yes, then this is fine, but if not, we should buffer this to disk (tmp is fine).

Copy link
Copy Markdown
Contributor Author

@anshulpundir anshulpundir Nov 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffering to disk won't be any better would it ? to apply we'd still need to load the snapshot into memory before passing it to etcd/raft.

Also, I think MemoryStorage::ApplySnapshot() doesn't actually do a memory copy, so we should still just need 1x memory.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three components:

  1. "The bytes" of the snapshot itself.
  2. The structure that gets applied to the store.
  3. The store itself (not raft, but the database).

With the current model, I think the minimum is a 2x bump. Ideally, we can stream it in and avoid 1 and 2. Buffering to disk avoids 1, if 2 is necessary.

Looking at the code, there are two components: the entries and the snapshot data, a []byte. From the looks of it, the snapshot is applied always as a []byte, so we can never avoid 2. If a snapshot is applied in this fashion, there may exist a snapshot that has a similar size, which can contribute to bloat.

So, I think I agree with you: this code is probably fine as is, as long as this doesn't get copied (and it doesn't look like it does). Following that, I don't think there is a easy way to avoid the 2x memory bump by applying a snapshot in a stream.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i might be mistaken, but I don't think this code is any worse off than the regular raft message rpc call, except for the allocations and copies required to expand the array from the successive appends.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that said, maybe it's advantageous to take the couple of extra bytes to send the total message length with each stream request, so we can on the first message allocate an appropriately sized byte slice.

Copy link
Copy Markdown
Contributor Author

@anshulpundir anshulpundir Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider that, but streaming snapshots should not happen that frequently, so this is probably OK for now. We can always make the change to send the size later, if needed.

// Function to check if streaming is needed.
// Returns true if the message type is MsgSnap/MsgApp
// and size larger than MaxRaftMsgSize.
func needsStreaming(m *raftpb.Message) bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advocate for always using streaming unless the other end doesn't support it.

Copy link
Copy Markdown
Contributor Author

@anshulpundir anshulpundir Nov 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please elaborate on this ?

I thought a bit more about this. If the message is smaller than GrpcMaxMsgSize, it's less efficient to first try to stream and then fall back to a basic send if unsupported.

Having said that, I do see the value in always streaming, and eventually deprecating the other endpoint, unless there is a performance overhead to streaming a single message.

Can you please also elaborate on this ?

@stevvooe

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like the idea of always using streaming. It gives the streaming version of the RPC more real-world coverage instead of only using it in unusual corner cases. It would let us eventually deprecate the non-streaming version.

I don't think the mixed-version scenario is important to optimize.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the point about giving this more real-world coverage. I'll make this change.

@stevvooe stevvooe changed the title Snapshot streaming [WIP] Snapshot streaming Nov 20, 2017
Copy link
Copy Markdown
Contributor

@stevvooe stevvooe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviews inline.

This still seems to buffer the entire snapshot into the Rss of the swarm process. It would be best to buffer into a local file or apply the snapshot to a temp location as it comes in.

@codecov
Copy link
Copy Markdown

codecov bot commented Nov 20, 2017

Codecov Report

Merging #2458 into master will decrease coverage by 2.09%.
The diff coverage is 93.67%.

@@            Coverage Diff            @@
##           master    #2458     +/-   ##
=========================================
- Coverage   63.74%   61.64%   -2.1%     
=========================================
  Files          64      128     +64     
  Lines       11793    21076   +9283     
=========================================
+ Hits         7517    12993   +5476     
- Misses       3664     6680   +3016     
- Partials      612     1403    +791

"errors"
"fmt"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/manager/state/raft/transport"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put these below with the other non-stdlib imports?

}
}

func GetSnapshotMessage(from, to uint64, size int) *raftpb.Message {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this in manager/state/raft/testutils instead of duplicating it in two places.

// Get the max payload size.
payloadSize := raftMessagePayloadSize(m)

// split the snpashot into smaller messages.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

// split the snpashot into smaller messages.
for snapDataIndex := 0; snapDataIndex < size; {
remainingSize := size - snapDataIndex
chunkSize := int(math.Min(float64(remainingSize), float64(payloadSize)))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using floating point math here doesn't feel right.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

holy shit I know right!

you'd prefer just doing if/else ? @aaronlehmann

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of math.Min why not just do min by hand for integers, so there's no round trip through floats?

chunkSize = payloadSize
if remainingSize < payloadSize {
    chunkSize = remainingSize
}

for chunkIndex := 0; chunkIndex < chunkSize; chunkIndex++ {
raftMsg.Snapshot.Data[chunkIndex] = m.Snapshot.Data[snapDataIndex]
snapDataIndex++
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the builtin function copy instead of this byte-by-byte loop.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't believe we're mutating the bytes, so there's probably a way to avoid copying and leverage the same underlying storage array for all of the slices.

this would need to be documented, so that someone didn't come later on and break that assumption.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i misunderstood this code actually...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoiding copy here would be ideal, but I'm not sure if there is a way to do this. I'll make the change if it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible to avoid copy @dperny

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we don't need to do any copying at all. instead of this loop, just do

raftMsg.Snapshot.Data = m.Snapshot.Data[snapDataIndex:(snapDataIndex+chunkSize)]

And then you can change the for loop above to be something like

var chunkSize int
for snapDataIndex := 0; snapDataIndex < size; snapDataIndex = snapDataIndex + chunkSize {
    // do the processing in here
}

// Function to check if streaming is needed.
// Returns true if the message type is MsgSnap/MsgApp
// and size larger than MaxRaftMsgSize.
func needsStreaming(m *raftpb.Message) bool {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like the idea of always using streaming. It gives the streaming version of the RPC more real-world coverage instead of only using it in unusual corner cases. It would let us eventually deprecate the non-streaming version.

I don't think the mixed-version scenario is important to optimize.

@anshulpundir
Copy link
Copy Markdown
Contributor Author

Also ping @dperny @nishanttotla for review

Copy link
Copy Markdown
Collaborator

@dperny dperny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lot of comments. Not all of these are blockers. I'll defer to your judgement on them.

if err == io.EOF {
break
} else if err != nil {
log.G(context.Background()).WithError(err).Error("error while reading from stream")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: stream.Context(), instead of context.Background(), unless there's some reason I'm not aware of to do so.

if recvdMsg.Message.Type == raftpb.MsgSnap {
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)
} else {
log.G(context.Background()).Errorf("Ignoring unexpected message type received on stream: %d", recvdMsg.Message.Type)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, use stream context.

// split the snpashot into smaller messages.
for snapDataIndex := 0; snapDataIndex < size; {
remainingSize := size - snapDataIndex
chunkSize := int(math.Min(float64(remainingSize), float64(payloadSize)))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of math.Min why not just do min by hand for integers, so there's no round trip through floats?

chunkSize = payloadSize
if remainingSize < payloadSize {
    chunkSize = remainingSize
}


const (
// GrpcMaxMsgSize is the max allowed gRPC message size.
GrpcMaxMsgSize = 4 << 20
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why represent this as 4 << 20? just curious.

Copy link
Copy Markdown
Contributor Author

@anshulpundir anshulpundir Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone suggested it as being cleaner/easier to read than 4 * 1024 * 1024.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol i disagree strongly with someone but that's bikeshedding.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this part looks fine to me.

Copy link
Copy Markdown
Contributor Author

@anshulpundir anshulpundir Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a little shorter, but I think its fine either way.

for chunkIndex := 0; chunkIndex < chunkSize; chunkIndex++ {
raftMsg.Snapshot.Data[chunkIndex] = m.Snapshot.Data[snapDataIndex]
snapDataIndex++
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't believe we're mutating the bytes, so there's probably a way to avoid copying and leverage the same underlying storage array for all of the slices.

this would need to be documented, so that someone didn't come later on and break that assumption.

// Returns the max allowable payload based on MaxRaftMsgSize and
// the struct size for the given raftpb.Message and
func raftMessagePayloadSize(m *raftpb.Message) int {
return GrpcMaxMsgSize - raftMessageStructSize(m)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumb question: is GrpcMaxMsgSize actually the optimal message size? Or would a smaller message perform better?

Copy link
Copy Markdown
Contributor Author

@anshulpundir anshulpundir Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GrpcMaxMsgSize is probably fine for now. I can't think of any reason to treat streamed messages differently than the regular messages.

assert.Error(t, err, "Received unexpected error EOF")

_, err = stream.CloseAndRecv()
errStr := fmt.Sprintf("rpc error: code = Internal desc = grpc: received message length %d exceeding the max size %d", msg.Size(), transport.GrpcMaxMsgSize)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of building an error string, you might want to use golang.google.com/grpc/status.FromError, and compare the code and message separately?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable


// We should have the complete snapshot. Verify and process.
if err == io.EOF {
_, err = n.ProcessRaftMessage(context.Background(), assembledMessage)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure if I think we should use the stream context here or not.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's better not to discard the first return value, and instead use it as the return value for stream.SendAndClose below?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure if I think we should use the stream context here or not.

Yea, I'm unclear on this also. I'll think more about this.


// Append the received snapshot data.
if recvdMsg.Message.Type == raftpb.MsgSnap {
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i might be mistaken, but I don't think this code is any worse off than the regular raft message rpc call, except for the allocations and copies required to expand the array from the successive appends.


// Append the received snapshot data.
if recvdMsg.Message.Type == raftpb.MsgSnap {
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that said, maybe it's advantageous to take the couple of extra bytes to send the total message length with each stream request, so we can on the first message allocate an appropriately sized byte slice.

)

// Build a snapshot message where each byte in the data is of the value (index % sizeof(byte))
func getSnapshotMessage(from uint64, to uint64) raftpb.Message {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to use this from testutils causes a circular dependency and moving this test to a transport_test package was just too problematic since unexported fields have been used extensively in the tests. Leaving as is for now, will find a way to address this in the future.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to use this from testutils?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping it in once place instead of duplicating it. It's also used in raft_test.go

@anshulpundir
Copy link
Copy Markdown
Contributor Author

anshulpundir commented Nov 28, 2017

Working on making the change to always stream. Just to clarify: this change will only stream snapshot messages larger than the max grpc message size. Support for splitting other message types, e.g. append entires, may be supported later.

@anshulpundir anshulpundir force-pushed the snap2 branch 2 times, most recently from 2d95f3a to b7c27fa Compare November 29, 2017 19:31
@anshulpundir
Copy link
Copy Markdown
Contributor Author

anshulpundir commented Nov 29, 2017

Addressed all review comments. ping @dperny @stevvooe @aaronlehmann for another pass

Copy link
Copy Markdown
Collaborator

@dperny dperny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of new comments but no blockers. LGTM.

// if a snapshot is being sent, set timeout to LargeSendTimeout because
// sending snapshots can take more time than other messages sent between peers.
// The same applies to AppendEntries as well, where messages can get large.
// TODO(anshul) remove when streaming change ready to merge.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The large timeout/grpc message size is no longer needed once we have snapshot streaming. I guess that can be a separate PR.

chunkSize = payloadSize
}

raftMsg := *m
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear, you're derefing m to make a copy of it, yeah?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since I want to keep the entire message the same except for the fields to be split.

msgs := splitSnapshotData(ctx, &raftMsg)
assert.Equal(t, numMsgs, len(msgs), "unexpected number of messages")

raftMsg.Snapshot.Data = make([]byte, raftMessagePayloadSize)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, not a blocker. change if you want to, or don't. either way.

check := func(size, expectedMessages int) {
    raftMsg.Snapshot.Data = make([]byte, size)
    msgs := splitSnapshotData(ctx, &raftMsg)
    assert.Equal(t, expectedMesages, len(msgs), "unexpected number of raft messages")
}

numMsgs := int(math.Ceil(float64(snapshotSize) / float64(raftMessagePayloadSize)))
check(raftMessagePayloadSize, numMsgs
check(raftMessagePayloadSize-1, 1)
check(raftMessagePayloadSize*2, 2)
check(0, 0)

api/raft.proto Outdated
// RaftMessageStream accepts a stream of raft messages to be processed on a raft member,
// returning a ProcessRaftMessageResponse when processing of the streamed messages is complete.
// It is called from the Raft leader, which uses it to stream messages to a raft member.
rpc RaftMessageStream(stream ProcessRaftMessageRequest) returns (ProcessRaftMessageResponse) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to have rpc specific types. You can't just reuse the existing request type.

Copy link
Copy Markdown
Contributor Author

@anshulpundir anshulpundir Nov 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline conversation with @stevvooe This is a generally a d good practice, to help separate out the two rpcs and enable them to evolve independently.

// GetSnapshotMessage creates and returns a raftpb.Message of type MsgSnap
// where the snapshot data is of the given size and the value of each byte
// is (index of the byte) % 256.
func GetSnapshotMessage(from, to uint64, size int) *raftpb.Message {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a getter. This should be NewSnapshotMessage.

)

const (
// GrpcMaxMsgSize is the max allowed gRPC message size.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GRPCMaxMsgSize.

log.G(stream.Context()).WithError(err).Error("error while reading from stream")
return err
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate that each message belongs to the same index and are to be concatenated.


// RaftMessageStream accepts a stream of raft messages to be processed on a raft member,
// returning a ProcessRaftMessageResponse when processing of the streamed messages is complete.
// It is called from the Raft leader, which uses it to stream messages to a raft member.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain here that a single stream call corresponds to a single, disassembled message.

@anshulpundir anshulpundir force-pushed the snap2 branch 4 times, most recently from 614d598 to a6ceb09 Compare December 1, 2017 00:47
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
@stevvooe
Copy link
Copy Markdown
Contributor

stevvooe commented Dec 1, 2017

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants