Conversation
api/raft.proto
Outdated
| // RaftMessageStream returns a gRPC stream that can be used to stream raft messages | ||
| // to be processed on a raft member. | ||
| // It is called from the RaftMember willing to send a message to its destination ('To' field) | ||
| rpc RaftMessageStream(stream ProcessRaftMessageRequest) returns (ProcessRaftMessageResponse) { |
There was a problem hiding this comment.
This needs a response type wrapper.
There was a problem hiding this comment.
This seems confusing. The doc says that it returns a stream, but it only seems to take a stream as input. Who is the caller and callee in this scenario?
| func DefaultRaftConfig() api.RaftConfig { | ||
| return api.RaftConfig{ | ||
| KeepOldSnapshots: 0, | ||
| SnapshotInterval: 10000, |
There was a problem hiding this comment.
Why the big change? Was this for testing?
manager/state/raft/raft.go
Outdated
|
|
||
| // Append the received snapshot data. | ||
| if recvdMsg.Message.Type == raftpb.MsgSnap { | ||
| assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...) |
There was a problem hiding this comment.
Doesn't this arrive at the same problem? This buffers the whole thing into a single buffer. How does this address the issue?
There was a problem hiding this comment.
The problem was being able to stream large snapshots without arbitrarily large gRPC message limits and timeout outs. This addresses that problem. Memory constraint is not the problem we're trying to solve, at least yet.
There was a problem hiding this comment.
@anshulpundir Fair enough. :)
This could double (or more!) memory usage.
Here's how we can decide: Do we have to load the entire snapshot into memory to apply it? If yes, then this is fine, but if not, we should buffer this to disk (tmp is fine).
There was a problem hiding this comment.
Buffering to disk won't be any better would it ? to apply we'd still need to load the snapshot into memory before passing it to etcd/raft.
Also, I think MemoryStorage::ApplySnapshot() doesn't actually do a memory copy, so we should still just need 1x memory.
There was a problem hiding this comment.
There are three components:
- "The bytes" of the snapshot itself.
- The structure that gets applied to the store.
- The store itself (not raft, but the database).
With the current model, I think the minimum is a 2x bump. Ideally, we can stream it in and avoid 1 and 2. Buffering to disk avoids 1, if 2 is necessary.
Looking at the code, there are two components: the entries and the snapshot data, a []byte. From the looks of it, the snapshot is applied always as a []byte, so we can never avoid 2. If a snapshot is applied in this fashion, there may exist a snapshot that has a similar size, which can contribute to bloat.
So, I think I agree with you: this code is probably fine as is, as long as this doesn't get copied (and it doesn't look like it does). Following that, I don't think there is a easy way to avoid the 2x memory bump by applying a snapshot in a stream.
There was a problem hiding this comment.
i might be mistaken, but I don't think this code is any worse off than the regular raft message rpc call, except for the allocations and copies required to expand the array from the successive appends.
There was a problem hiding this comment.
that said, maybe it's advantageous to take the couple of extra bytes to send the total message length with each stream request, so we can on the first message allocate an appropriately sized byte slice.
There was a problem hiding this comment.
I did consider that, but streaming snapshots should not happen that frequently, so this is probably OK for now. We can always make the change to send the size later, if needed.
manager/state/raft/transport/peer.go
Outdated
| // Function to check if streaming is needed. | ||
| // Returns true if the message type is MsgSnap/MsgApp | ||
| // and size larger than MaxRaftMsgSize. | ||
| func needsStreaming(m *raftpb.Message) bool { |
There was a problem hiding this comment.
I would advocate for always using streaming unless the other end doesn't support it.
There was a problem hiding this comment.
Can you please elaborate on this ?
I thought a bit more about this. If the message is smaller than GrpcMaxMsgSize, it's less efficient to first try to stream and then fall back to a basic send if unsupported.
Having said that, I do see the value in always streaming, and eventually deprecating the other endpoint, unless there is a performance overhead to streaming a single message.
Can you please also elaborate on this ?
There was a problem hiding this comment.
I also like the idea of always using streaming. It gives the streaming version of the RPC more real-world coverage instead of only using it in unusual corner cases. It would let us eventually deprecate the non-streaming version.
I don't think the mixed-version scenario is important to optimize.
There was a problem hiding this comment.
I agree with the point about giving this more real-world coverage. I'll make this change.
stevvooe
left a comment
There was a problem hiding this comment.
Reviews inline.
This still seems to buffer the entire snapshot into the Rss of the swarm process. It would be best to buffer into a local file or apply the snapshot to a temp location as it comes in.
Codecov Report
@@ Coverage Diff @@
## master #2458 +/- ##
=========================================
- Coverage 63.74% 61.64% -2.1%
=========================================
Files 64 128 +64
Lines 11793 21076 +9283
=========================================
+ Hits 7517 12993 +5476
- Misses 3664 6680 +3016
- Partials 612 1403 +791 |
manager/state/raft/raft_test.go
Outdated
| "errors" | ||
| "fmt" | ||
| "github.com/coreos/etcd/raft/raftpb" | ||
| "github.com/docker/swarmkit/manager/state/raft/transport" |
There was a problem hiding this comment.
Can you put these below with the other non-stdlib imports?
manager/state/raft/raft_test.go
Outdated
| } | ||
| } | ||
|
|
||
| func GetSnapshotMessage(from, to uint64, size int) *raftpb.Message { |
There was a problem hiding this comment.
Put this in manager/state/raft/testutils instead of duplicating it in two places.
manager/state/raft/transport/peer.go
Outdated
| // Get the max payload size. | ||
| payloadSize := raftMessagePayloadSize(m) | ||
|
|
||
| // split the snpashot into smaller messages. |
manager/state/raft/transport/peer.go
Outdated
| // split the snpashot into smaller messages. | ||
| for snapDataIndex := 0; snapDataIndex < size; { | ||
| remainingSize := size - snapDataIndex | ||
| chunkSize := int(math.Min(float64(remainingSize), float64(payloadSize))) |
There was a problem hiding this comment.
Using floating point math here doesn't feel right.
There was a problem hiding this comment.
holy shit I know right!
you'd prefer just doing if/else ? @aaronlehmann
There was a problem hiding this comment.
instead of math.Min why not just do min by hand for integers, so there's no round trip through floats?
chunkSize = payloadSize
if remainingSize < payloadSize {
chunkSize = remainingSize
}
manager/state/raft/transport/peer.go
Outdated
| for chunkIndex := 0; chunkIndex < chunkSize; chunkIndex++ { | ||
| raftMsg.Snapshot.Data[chunkIndex] = m.Snapshot.Data[snapDataIndex] | ||
| snapDataIndex++ | ||
| } |
There was a problem hiding this comment.
Use the builtin function copy instead of this byte-by-byte loop.
There was a problem hiding this comment.
i don't believe we're mutating the bytes, so there's probably a way to avoid copying and leverage the same underlying storage array for all of the slices.
this would need to be documented, so that someone didn't come later on and break that assumption.
There was a problem hiding this comment.
i think i misunderstood this code actually...
There was a problem hiding this comment.
Avoiding copy here would be ideal, but I'm not sure if there is a way to do this. I'll make the change if it is.
There was a problem hiding this comment.
I think it should be possible to avoid copy @dperny
There was a problem hiding this comment.
yeah, we don't need to do any copying at all. instead of this loop, just do
raftMsg.Snapshot.Data = m.Snapshot.Data[snapDataIndex:(snapDataIndex+chunkSize)]And then you can change the for loop above to be something like
var chunkSize int
for snapDataIndex := 0; snapDataIndex < size; snapDataIndex = snapDataIndex + chunkSize {
// do the processing in here
}
manager/state/raft/transport/peer.go
Outdated
| // Function to check if streaming is needed. | ||
| // Returns true if the message type is MsgSnap/MsgApp | ||
| // and size larger than MaxRaftMsgSize. | ||
| func needsStreaming(m *raftpb.Message) bool { |
There was a problem hiding this comment.
I also like the idea of always using streaming. It gives the streaming version of the RPC more real-world coverage instead of only using it in unusual corner cases. It would let us eventually deprecate the non-streaming version.
I don't think the mixed-version scenario is important to optimize.
|
Also ping @dperny @nishanttotla for review |
dperny
left a comment
There was a problem hiding this comment.
Lot of comments. Not all of these are blockers. I'll defer to your judgement on them.
manager/state/raft/raft.go
Outdated
| if err == io.EOF { | ||
| break | ||
| } else if err != nil { | ||
| log.G(context.Background()).WithError(err).Error("error while reading from stream") |
There was a problem hiding this comment.
nit: stream.Context(), instead of context.Background(), unless there's some reason I'm not aware of to do so.
manager/state/raft/raft.go
Outdated
| if recvdMsg.Message.Type == raftpb.MsgSnap { | ||
| assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...) | ||
| } else { | ||
| log.G(context.Background()).Errorf("Ignoring unexpected message type received on stream: %d", recvdMsg.Message.Type) |
There was a problem hiding this comment.
same as above, use stream context.
manager/state/raft/transport/peer.go
Outdated
| // split the snpashot into smaller messages. | ||
| for snapDataIndex := 0; snapDataIndex < size; { | ||
| remainingSize := size - snapDataIndex | ||
| chunkSize := int(math.Min(float64(remainingSize), float64(payloadSize))) |
There was a problem hiding this comment.
instead of math.Min why not just do min by hand for integers, so there's no round trip through floats?
chunkSize = payloadSize
if remainingSize < payloadSize {
chunkSize = remainingSize
}
manager/state/raft/transport/peer.go
Outdated
|
|
||
| const ( | ||
| // GrpcMaxMsgSize is the max allowed gRPC message size. | ||
| GrpcMaxMsgSize = 4 << 20 |
There was a problem hiding this comment.
why represent this as 4 << 20? just curious.
There was a problem hiding this comment.
Someone suggested it as being cleaner/easier to read than 4 * 1024 * 1024.
There was a problem hiding this comment.
lol i disagree strongly with someone but that's bikeshedding.
There was a problem hiding this comment.
so this part looks fine to me.
There was a problem hiding this comment.
Its a little shorter, but I think its fine either way.
manager/state/raft/transport/peer.go
Outdated
| for chunkIndex := 0; chunkIndex < chunkSize; chunkIndex++ { | ||
| raftMsg.Snapshot.Data[chunkIndex] = m.Snapshot.Data[snapDataIndex] | ||
| snapDataIndex++ | ||
| } |
There was a problem hiding this comment.
i don't believe we're mutating the bytes, so there's probably a way to avoid copying and leverage the same underlying storage array for all of the slices.
this would need to be documented, so that someone didn't come later on and break that assumption.
manager/state/raft/transport/peer.go
Outdated
| // Returns the max allowable payload based on MaxRaftMsgSize and | ||
| // the struct size for the given raftpb.Message and | ||
| func raftMessagePayloadSize(m *raftpb.Message) int { | ||
| return GrpcMaxMsgSize - raftMessageStructSize(m) |
There was a problem hiding this comment.
dumb question: is GrpcMaxMsgSize actually the optimal message size? Or would a smaller message perform better?
There was a problem hiding this comment.
GrpcMaxMsgSize is probably fine for now. I can't think of any reason to treat streamed messages differently than the regular messages.
manager/state/raft/raft_test.go
Outdated
| assert.Error(t, err, "Received unexpected error EOF") | ||
|
|
||
| _, err = stream.CloseAndRecv() | ||
| errStr := fmt.Sprintf("rpc error: code = Internal desc = grpc: received message length %d exceeding the max size %d", msg.Size(), transport.GrpcMaxMsgSize) |
There was a problem hiding this comment.
instead of building an error string, you might want to use golang.google.com/grpc/status.FromError, and compare the code and message separately?
There was a problem hiding this comment.
Sounds reasonable
manager/state/raft/raft.go
Outdated
|
|
||
| // We should have the complete snapshot. Verify and process. | ||
| if err == io.EOF { | ||
| _, err = n.ProcessRaftMessage(context.Background(), assembledMessage) |
There was a problem hiding this comment.
Unsure if I think we should use the stream context here or not.
There was a problem hiding this comment.
maybe it's better not to discard the first return value, and instead use it as the return value for stream.SendAndClose below?
There was a problem hiding this comment.
Unsure if I think we should use the stream context here or not.
Yea, I'm unclear on this also. I'll think more about this.
manager/state/raft/raft.go
Outdated
|
|
||
| // Append the received snapshot data. | ||
| if recvdMsg.Message.Type == raftpb.MsgSnap { | ||
| assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...) |
There was a problem hiding this comment.
i might be mistaken, but I don't think this code is any worse off than the regular raft message rpc call, except for the allocations and copies required to expand the array from the successive appends.
manager/state/raft/raft.go
Outdated
|
|
||
| // Append the received snapshot data. | ||
| if recvdMsg.Message.Type == raftpb.MsgSnap { | ||
| assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...) |
There was a problem hiding this comment.
that said, maybe it's advantageous to take the couple of extra bytes to send the total message length with each stream request, so we can on the first message allocate an appropriately sized byte slice.
| ) | ||
|
|
||
| // Build a snapshot message where each byte in the data is of the value (index % sizeof(byte)) | ||
| func getSnapshotMessage(from uint64, to uint64) raftpb.Message { |
There was a problem hiding this comment.
Trying to use this from testutils causes a circular dependency and moving this test to a transport_test package was just too problematic since unexported fields have been used extensively in the tests. Leaving as is for now, will find a way to address this in the future.
There was a problem hiding this comment.
why do you need to use this from testutils?
There was a problem hiding this comment.
Keeping it in once place instead of duplicating it. It's also used in raft_test.go
|
Working on making the change to always stream. Just to clarify: this change will only stream snapshot messages larger than the max grpc message size. Support for splitting other message types, e.g. append entires, may be supported later. |
2d95f3a to
b7c27fa
Compare
|
Addressed all review comments. ping @dperny @stevvooe @aaronlehmann for another pass |
dperny
left a comment
There was a problem hiding this comment.
Couple of new comments but no blockers. LGTM.
| // if a snapshot is being sent, set timeout to LargeSendTimeout because | ||
| // sending snapshots can take more time than other messages sent between peers. | ||
| // The same applies to AppendEntries as well, where messages can get large. | ||
| // TODO(anshul) remove when streaming change ready to merge. |
There was a problem hiding this comment.
The large timeout/grpc message size is no longer needed once we have snapshot streaming. I guess that can be a separate PR.
| chunkSize = payloadSize | ||
| } | ||
|
|
||
| raftMsg := *m |
There was a problem hiding this comment.
to be clear, you're derefing m to make a copy of it, yeah?
There was a problem hiding this comment.
Yes, since I want to keep the entire message the same except for the fields to be split.
| msgs := splitSnapshotData(ctx, &raftMsg) | ||
| assert.Equal(t, numMsgs, len(msgs), "unexpected number of messages") | ||
|
|
||
| raftMsg.Snapshot.Data = make([]byte, raftMessagePayloadSize) |
There was a problem hiding this comment.
nit, not a blocker. change if you want to, or don't. either way.
check := func(size, expectedMessages int) {
raftMsg.Snapshot.Data = make([]byte, size)
msgs := splitSnapshotData(ctx, &raftMsg)
assert.Equal(t, expectedMesages, len(msgs), "unexpected number of raft messages")
}
numMsgs := int(math.Ceil(float64(snapshotSize) / float64(raftMessagePayloadSize)))
check(raftMessagePayloadSize, numMsgs
check(raftMessagePayloadSize-1, 1)
check(raftMessagePayloadSize*2, 2)
check(0, 0)
api/raft.proto
Outdated
| // RaftMessageStream accepts a stream of raft messages to be processed on a raft member, | ||
| // returning a ProcessRaftMessageResponse when processing of the streamed messages is complete. | ||
| // It is called from the Raft leader, which uses it to stream messages to a raft member. | ||
| rpc RaftMessageStream(stream ProcessRaftMessageRequest) returns (ProcessRaftMessageResponse) { |
There was a problem hiding this comment.
These need to have rpc specific types. You can't just reuse the existing request type.
There was a problem hiding this comment.
Offline conversation with @stevvooe This is a generally a d good practice, to help separate out the two rpcs and enable them to evolve independently.
| // GetSnapshotMessage creates and returns a raftpb.Message of type MsgSnap | ||
| // where the snapshot data is of the given size and the value of each byte | ||
| // is (index of the byte) % 256. | ||
| func GetSnapshotMessage(from, to uint64, size int) *raftpb.Message { |
There was a problem hiding this comment.
This is not a getter. This should be NewSnapshotMessage.
manager/state/raft/transport/peer.go
Outdated
| ) | ||
|
|
||
| const ( | ||
| // GrpcMaxMsgSize is the max allowed gRPC message size. |
| log.G(stream.Context()).WithError(err).Error("error while reading from stream") | ||
| return err | ||
| } | ||
|
|
There was a problem hiding this comment.
Validate that each message belongs to the same index and are to be concatenated.
|
|
||
| // RaftMessageStream accepts a stream of raft messages to be processed on a raft member, | ||
| // returning a ProcessRaftMessageResponse when processing of the streamed messages is complete. | ||
| // It is called from the Raft leader, which uses it to stream messages to a raft member. |
There was a problem hiding this comment.
Explain here that a single stream call corresponds to a single, disassembled message.
614d598 to
a6ceb09
Compare
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
|
LGTM |
Based on proposal #2416
Signed-off-by: Anshul Pundir anshul.pundir@docker.com