Skip to content

Commit e4da890

Browse files
authored
test(pubsublite): fix flaky TestAssigningSubscriberAddRemovePartitions (#8496)
Wait for commit requests to reach the server before ending the test. Fixes: https://togithub.com/googleapis/google-cloud-go/issues/8459
1 parent 73a958d commit e4da890

1 file changed

Lines changed: 8 additions & 5 deletions

File tree

pubsublite/internal/wire/subscriber_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,7 +1190,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
11901190
cmtStream3 := test.NewRPCVerifier(t)
11911191
cmtStream3.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 3}), initCommitResp(), nil)
11921192
cmtStream3.Push(commitReq(34), commitResp(1), nil)
1193-
cmtStream3.Push(commitReq(35), commitResp(1), nil)
1193+
cmt2Barrier := cmtStream3.PushWithBarrier(commitReq(35), commitResp(1), nil)
11941194
verifiers.AddCommitStream(subscription, 3, cmtStream3)
11951195

11961196
// Partition 6
@@ -1203,7 +1203,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
12031203

12041204
cmtStream6 := test.NewRPCVerifier(t)
12051205
cmtStream6.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 6}), initCommitResp(), nil)
1206-
cmtStream6.Push(commitReq(67), commitResp(1), nil)
1206+
cmt3Barrier := cmtStream6.PushWithBarrier(commitReq(67), commitResp(1), nil)
12071207
verifiers.AddCommitStream(subscription, 6, cmtStream6)
12081208

12091209
// Partition 8
@@ -1214,7 +1214,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
12141214

12151215
cmtStream8 := test.NewRPCVerifier(t)
12161216
cmtStream8.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 8}), initCommitResp(), nil)
1217-
cmtStream8.Push(commitReq(89), commitResp(1), nil)
1217+
cmt5Barrier := cmtStream8.PushWithBarrier(commitReq(89), commitResp(1), nil)
12181218
verifiers.AddCommitStream(subscription, 8, cmtStream8)
12191219

12201220
mockServer.OnTestStart(verifiers)
@@ -1243,8 +1243,11 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
12431243
msg4Barrier.Release()
12441244
receiver.ValidateMsgs(partitionMsgs(3, msg2))
12451245

1246-
// Ensure the second assignment ack is received by the server to avoid test
1247-
// flakiness.
1246+
// Ensure requests are received by the server to avoid test flakiness.
1247+
sub.FlushCommits()
1248+
cmt2Barrier.Release()
1249+
cmt3Barrier.Release()
1250+
cmt5Barrier.Release()
12481251
assignmentBarrier2.Release()
12491252

12501253
// Stop should flush all commit cursors.

0 commit comments

Comments
 (0)