Skip to content

Commit 5d438bb

Browse files
committed
fix(storagenode): support partial success/failure for append
The Append RPC function allows batch logging but may occasionally only partially succeed. The storage node previously overlooked partial success, but this PR fixes to consider partial success.
1 parent 5b27a18 commit 5d438bb

2 files changed

Lines changed: 80 additions & 9 deletions

File tree

internal/storagenode/logstream/append.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"time"
66

7+
"go.uber.org/zap"
8+
79
"github.com/kakao/varlog/internal/batchlet"
810
snerrors "github.com/kakao/varlog/internal/storagenode/errors"
911
"github.com/kakao/varlog/pkg/verrors"
@@ -155,16 +157,24 @@ func (lse *Executor) waitForCompletionOfAppends(ctx context.Context, dataBatchLe
155157
result := make([]snpb.AppendResult, dataBatchLen)
156158
for i := range awgs {
157159
cerr := awgs[i].wait(ctx)
158-
if err == nil && cerr != nil {
159-
err = cerr
160+
if cerr != nil {
161+
result[i].Error = cerr.Error()
162+
if err == nil {
163+
err = cerr
164+
}
165+
continue
160166
}
161-
if cerr == nil {
162-
result[i].Meta.TopicID = lse.tpid
163-
result[i].Meta.LogStreamID = lse.lsid
164-
result[i].Meta.GLSN = awgs[i].glsn
165-
result[i].Meta.LLSN = awgs[i].llsn
166-
awgs[i].release()
167+
if err != nil {
168+
lse.logger.Panic("Results of batch requests of Append RPC must not be interleaved with success and failure", zap.Error(err))
167169
}
170+
result[i].Meta.TopicID = lse.tpid
171+
result[i].Meta.LogStreamID = lse.lsid
172+
result[i].Meta.GLSN = awgs[i].glsn
173+
result[i].Meta.LLSN = awgs[i].llsn
174+
awgs[i].release()
175+
}
176+
if result[0].Meta.GLSN.Invalid() {
177+
return nil, err
168178
}
169-
return result, err
179+
return result, nil
170180
}

internal/storagenode/storagenode_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,67 @@ func TestStorageNode_Append(t *testing.T) {
601601
}()
602602
},
603603
},
604+
{
605+
name: "AppendBatch",
606+
testf: func(t *testing.T, addr string, lc *client.LogClient) {
607+
lss, lastGLSN := TestSealLogStreamReplica(t, cid, snid, tpid, lsid, types.InvalidGLSN, addr)
608+
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
609+
require.True(t, lastGLSN.Invalid())
610+
611+
TestUnsealLogStreamReplica(t, cid, snid, tpid, lsid, []varlogpb.LogStreamReplica{
612+
{
613+
StorageNode: varlogpb.StorageNode{
614+
StorageNodeID: snid,
615+
Address: addr,
616+
},
617+
TopicLogStream: varlogpb.TopicLogStream{
618+
TopicID: tpid,
619+
LogStreamID: lsid,
620+
},
621+
},
622+
}, addr)
623+
624+
batch := [][]byte{[]byte("msg1"), []byte("msg2"), []byte("msg3")}
625+
var wg sync.WaitGroup
626+
wg.Add(1)
627+
go func() {
628+
defer wg.Done()
629+
res, err := lc.Append(context.Background(), tpid, lsid, batch)
630+
require.NoError(t, err)
631+
require.Len(t, res, len(batch))
632+
require.Empty(t, res[0].Error)
633+
require.False(t, res[0].Meta.GLSN.Invalid())
634+
require.NotEmpty(t, res[1].Error)
635+
require.True(t, res[1].Meta.GLSN.Invalid())
636+
require.NotEmpty(t, res[2].Error)
637+
require.True(t, res[2].Meta.GLSN.Invalid())
638+
}()
639+
640+
require.Eventually(t, func() bool {
641+
reportcommitter.TestCommit(t, addr, snpb.CommitRequest{
642+
StorageNodeID: snid,
643+
CommitResult: snpb.LogStreamCommitResult{
644+
TopicID: tpid,
645+
LogStreamID: lsid,
646+
CommittedLLSNOffset: 1,
647+
CommittedGLSNOffset: 1,
648+
CommittedGLSNLength: 1,
649+
Version: 1,
650+
HighWatermark: 1,
651+
},
652+
})
653+
reports := reportcommitter.TestGetReport(t, addr)
654+
require.Len(t, reports, 1)
655+
return reports[0].Version == types.Version(1)
656+
}, time.Second, 10*time.Millisecond)
657+
658+
lss, lastGLSN = TestSealLogStreamReplica(t, cid, snid, tpid, lsid, 1, addr)
659+
require.Equal(t, varlogpb.LogStreamStatusSealed, lss)
660+
require.Equal(t, types.GLSN(1), lastGLSN)
661+
662+
wg.Wait()
663+
},
664+
},
604665
}
605666

606667
for _, tc := range tcs {

0 commit comments

Comments
 (0)