@@ -505,7 +505,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
505505 // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
506506 leader .Close () // producer should get EOF
507507 leader = NewMockBrokerAddr (t , 2 , leaderAddr ) // start it up again right away for giggles
508- seedBroker .Returns (metadataResponse ) // tell it to go to broker 2 again
508+ leader .Returns (metadataResponse ) // tell it to go to broker 2 again
509509
510510 // Then: a produced message goes through the new broker connection.
511511 producer .Input () <- & ProducerMessage {Topic : "my_topic" , Key : nil , Value : StringEncoder (TestMessage )}
@@ -591,13 +591,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
591591 metadataLeader2 .AddBroker (leader2 .Addr (), leader2 .BrokerID ())
592592 metadataLeader2 .AddTopicPartition ("my_topic" , 0 , leader2 .BrokerID (), nil , nil , nil , ErrNoError )
593593
594- seedBroker .Returns (metadataLeader2 )
594+ leader1 .Returns (metadataLeader2 )
595595 leader2 .Returns (prodNotLeader )
596- seedBroker .Returns (metadataLeader1 )
596+ leader2 .Returns (metadataLeader1 )
597597 leader1 .Returns (prodNotLeader )
598- seedBroker .Returns (metadataLeader1 )
598+ leader1 .Returns (metadataLeader1 )
599599 leader1 .Returns (prodNotLeader )
600- seedBroker .Returns (metadataLeader2 )
600+ leader1 .Returns (metadataLeader2 )
601601
602602 prodSuccess := new (ProduceResponse )
603603 prodSuccess .AddTopicPartition ("my_topic" , 0 , ErrNoError )
@@ -653,13 +653,13 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
653653 metadataLeader2 .AddTopicPartition ("my_topic" , 0 , leader2 .BrokerID (), nil , nil , nil , ErrNoError )
654654
655655 leader1 .Returns (prodNotLeader )
656- seedBroker .Returns (metadataLeader2 )
656+ leader1 .Returns (metadataLeader2 )
657657 leader2 .Returns (prodNotLeader )
658- seedBroker .Returns (metadataLeader1 )
658+ leader2 .Returns (metadataLeader1 )
659659 leader1 .Returns (prodNotLeader )
660- seedBroker .Returns (metadataLeader1 )
660+ leader1 .Returns (metadataLeader1 )
661661 leader1 .Returns (prodNotLeader )
662- seedBroker .Returns (metadataLeader2 )
662+ leader1 .Returns (metadataLeader2 )
663663 leader2 .Returns (prodSuccess )
664664
665665 expectResults (t , producer , 1 , 0 )
@@ -739,16 +739,17 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
739739 leader := NewMockBroker (t , 2 )
740740
741741 var leaderLock sync.Mutex
742-
743- // The seed broker only handles Metadata request
744- seedBroker .setHandler (func (req * request ) (res encoderWithHeader ) {
742+ metadataRequestHandlerFunc := func (req * request ) (res encoderWithHeader ) {
745743 leaderLock .Lock ()
746744 defer leaderLock .Unlock ()
747745 metadataLeader := new (MetadataResponse )
748746 metadataLeader .AddBroker (leader .Addr (), leader .BrokerID ())
749747 metadataLeader .AddTopicPartition ("my_topic" , 0 , leader .BrokerID (), nil , nil , nil , ErrNoError )
750748 return metadataLeader
751- })
749+ }
750+
751+ // The seed broker only handles Metadata request in bootstrap
752+ seedBroker .setHandler (metadataRequestHandlerFunc )
752753
753754 var emptyValues int32 = 0
754755
@@ -770,14 +771,27 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
770771 }
771772 }
772773
773- leader . setHandler ( func (req * request ) (res encoderWithHeader ) {
774+ failedProduceRequestHandlerFunc := func (req * request ) (res encoderWithHeader ) {
774775 countRecordsWithEmptyValue (req )
775776
776777 time .Sleep (50 * time .Millisecond )
777778
778779 prodSuccess := new (ProduceResponse )
779780 prodSuccess .AddTopicPartition ("my_topic" , 0 , ErrNotLeaderForPartition )
780781 return prodSuccess
782+ }
783+
784+ succeededProduceRequestHandlerFunc := func (req * request ) (res encoderWithHeader ) {
785+ countRecordsWithEmptyValue (req )
786+
787+ prodSuccess := new (ProduceResponse )
788+ prodSuccess .AddTopicPartition ("my_topic" , 0 , ErrNoError )
789+ return prodSuccess
790+ }
791+
792+ leader .SetHandlerFuncByMap (map [string ]requestHandlerFunc {
793+ "ProduceRequest" : failedProduceRequestHandlerFunc ,
794+ "MetadataRequest" : metadataRequestHandlerFunc ,
781795 })
782796
783797 config := NewTestConfig ()
@@ -816,12 +830,9 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
816830 leaderLock .Lock ()
817831 leader = NewMockBroker (t , 2 )
818832 leaderLock .Unlock ()
819- leader .setHandler (func (req * request ) (res encoderWithHeader ) {
820- countRecordsWithEmptyValue (req )
821-
822- prodSuccess := new (ProduceResponse )
823- prodSuccess .AddTopicPartition ("my_topic" , 0 , ErrNoError )
824- return prodSuccess
833+ leader .SetHandlerFuncByMap (map [string ]requestHandlerFunc {
834+ "ProduceRequest" : succeededProduceRequestHandlerFunc ,
835+ "MetadataRequest" : metadataRequestHandlerFunc ,
825836 })
826837
827838 wg .Wait ()
@@ -938,7 +949,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
938949 producer .Input () <- & ProducerMessage {Topic : "my_topic" , Key : nil , Value : StringEncoder (TestMessage )}
939950
940951 // tell partition 0 to go to that broker again
941- seedBroker .Returns (metadataResponse )
952+ leader .Returns (metadataResponse )
942953
943954 // succeed this time
944955 prodSuccess = new (ProduceResponse )
@@ -994,14 +1005,11 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
9941005
9951006 time .Sleep (50 * time .Millisecond )
9961007
997- leader .SetHandlerByMap (map [string ]MockResponse {
998- "ProduceRequest" : NewMockProduceResponse (t ).
999- SetVersion (0 ).
1000- SetError ("my_topic" , 0 , ErrNoError ),
1001- })
1002-
10031008 // tell partition 0 to go to that broker again
1004- seedBroker .Returns (metadataResponse )
1009+ leader .Returns (metadataResponse )
1010+ prodSuccess := new (ProduceResponse )
1011+ prodSuccess .AddTopicPartition ("my_topic" , 0 , ErrNoError )
1012+ leader .Returns (prodSuccess )
10051013
10061014 // succeed this time
10071015 expectResults (t , producer , 5 , 0 )
@@ -1010,6 +1018,9 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
10101018 for i := 0 ; i < 5 ; i ++ {
10111019 producer .Input () <- & ProducerMessage {Topic : "my_topic" , Key : nil , Value : StringEncoder (TestMessage ), Partition : 0 }
10121020 }
1021+ prodSuccess = new (ProduceResponse )
1022+ prodSuccess .AddTopicPartition ("my_topic" , 0 , ErrNoError )
1023+ leader .Returns (prodSuccess )
10131024 expectResults (t , producer , 5 , 0 )
10141025
10151026 // shutdown
@@ -1051,7 +1062,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
10511062 prodNotLeader .AddTopicPartition ("my_topic" , 0 , ErrNotLeaderForPartition )
10521063 leader .Returns (prodNotLeader )
10531064
1054- seedBroker .Returns (metadataLeader )
1065+ leader .Returns (metadataLeader )
10551066
10561067 prodSuccess := new (ProduceResponse )
10571068 prodSuccess .AddTopicPartition ("my_topic" , 0 , ErrNoError )
0 commit comments