@@ -1190,7 +1190,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
11901190 cmtStream3 := test .NewRPCVerifier (t )
11911191 cmtStream3 .Push (initCommitReq (subscriptionPartition {Path : subscription , Partition : 3 }), initCommitResp (), nil )
11921192 cmtStream3 .Push (commitReq (34 ), commitResp (1 ), nil )
1193- cmtStream3 .Push (commitReq (35 ), commitResp (1 ), nil )
1193+ cmt2Barrier := cmtStream3 .PushWithBarrier (commitReq (35 ), commitResp (1 ), nil )
11941194 verifiers .AddCommitStream (subscription , 3 , cmtStream3 )
11951195
11961196 // Partition 6
@@ -1203,7 +1203,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
12031203
12041204 cmtStream6 := test .NewRPCVerifier (t )
12051205 cmtStream6 .Push (initCommitReq (subscriptionPartition {Path : subscription , Partition : 6 }), initCommitResp (), nil )
1206- cmtStream6 .Push (commitReq (67 ), commitResp (1 ), nil )
1206+ cmt3Barrier := cmtStream6 .PushWithBarrier (commitReq (67 ), commitResp (1 ), nil )
12071207 verifiers .AddCommitStream (subscription , 6 , cmtStream6 )
12081208
12091209 // Partition 8
@@ -1214,7 +1214,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
12141214
12151215 cmtStream8 := test .NewRPCVerifier (t )
12161216 cmtStream8 .Push (initCommitReq (subscriptionPartition {Path : subscription , Partition : 8 }), initCommitResp (), nil )
1217- cmtStream8 .Push (commitReq (89 ), commitResp (1 ), nil )
1217+ cmt5Barrier := cmtStream8 .PushWithBarrier (commitReq (89 ), commitResp (1 ), nil )
12181218 verifiers .AddCommitStream (subscription , 8 , cmtStream8 )
12191219
12201220 mockServer .OnTestStart (verifiers )
@@ -1243,8 +1243,11 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
12431243 msg4Barrier .Release ()
12441244 receiver .ValidateMsgs (partitionMsgs (3 , msg2 ))
12451245
1246- // Ensure the second assignment ack is received by the server to avoid test
1247- // flakiness.
1246+ // Ensure requests are received by the server to avoid test flakiness.
1247+ sub .FlushCommits ()
1248+ cmt2Barrier .Release ()
1249+ cmt3Barrier .Release ()
1250+ cmt5Barrier .Release ()
12481251 assignmentBarrier2 .Release ()
12491252
12501253 // Stop should flush all commit cursors.
0 commit comments