@@ -10,7 +10,6 @@ import (
1010 "os"
1111 "strings"
1212 "testing"
13- "time"
1413
1514 "github.com/IBM/sarama"
1615 "github.com/google/uuid"
@@ -91,25 +90,30 @@ func testClient(t testing.TB) sarama.Client {
9190}
9291
9392func testSenderReceiver (t testing.TB ) (func (), bindings.Sender , bindings.Receiver , string ) {
94- client := testClient (t )
95-
9693 topicName := "test-ce-client-" + uuid .New ().String ()
97- p , err := kafka_sarama .NewProtocolFromClient (client , topicName , topicName , kafka_sarama .WithReceiverGroupId (TEST_GROUP_ID ))
94+
95+ // Create a 'client' and 'protocol" for the Receiver side
96+ clientR := testClient (t )
97+ protocolR , err := kafka_sarama .NewProtocolFromClient (clientR , topicName , topicName , kafka_sarama .WithReceiverGroupId (TEST_GROUP_ID ))
9898 require .NoError (t , err )
99- require .NotNil (t , p )
99+ require .NotNil (t , protocolR )
100+
101+ // Create a 'client' and 'protocol" for the Sender side
102+ clientS := testClient (t )
103+ protocolS , err := kafka_sarama .NewProtocolFromClient (clientS , topicName , topicName , kafka_sarama .WithReceiverGroupId (TEST_GROUP_ID ))
104+ require .NoError (t , err )
105+ require .NotNil (t , protocolS )
100106
101107 go func () {
102- require .NoError (t , p .OpenInbound (context .TODO ()))
108+ require .NoError (t , protocolR .OpenInbound (context .TODO ()))
103109 }()
104110
105- // Not perfect but we need to give OpenInbound() as chance to start
106- // as it's a race condition. I couldn't find something on 'p' to wait for
107- time .Sleep (6 * time .Second )
108-
109111 return func () {
110- require .NoError (t , p .Close (context .TODO ()))
111- require .NoError (t , client .Close ())
112- }, p , p , topicName
112+ require .NoError (t , protocolR .Close (context .TODO ()))
113+ require .NoError (t , protocolS .Close (context .TODO ()))
114+ require .NoError (t , clientR .Close ())
115+ require .NoError (t , clientS .Close ())
116+ }, protocolS , protocolR , topicName
113117}
114118
115119func BenchmarkSendReceive (b * testing.B ) {
0 commit comments