Skip to content

Commit f5c7061

Browse files
author
Doug Davis
committed
Try to fix race again - don't reuse clients for sender/receiver
Signed-off-by: Doug Davis <dug@microsoft.com>
1 parent d7f845b commit f5c7061

File tree

1 file changed

+17
-13
lines changed

1 file changed

+17
-13
lines changed

test/integration/kafka_sarama_binding/kafka_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"os"
1111
"strings"
1212
"testing"
13-
"time"
1413

1514
"github.com/IBM/sarama"
1615
"github.com/google/uuid"
@@ -91,25 +90,30 @@ func testClient(t testing.TB) sarama.Client {
9190
}
9291

9392
func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver, string) {
94-
client := testClient(t)
95-
9693
topicName := "test-ce-client-" + uuid.New().String()
97-
p, err := kafka_sarama.NewProtocolFromClient(client, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
94+
95+
// Create a 'client' and 'protocol" for the Receiver side
96+
clientR := testClient(t)
97+
protocolR, err := kafka_sarama.NewProtocolFromClient(clientR, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
9898
require.NoError(t, err)
99-
require.NotNil(t, p)
99+
require.NotNil(t, protocolR)
100+
101+
// Create a 'client' and 'protocol" for the Sender side
102+
clientS := testClient(t)
103+
protocolS, err := kafka_sarama.NewProtocolFromClient(clientS, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
104+
require.NoError(t, err)
105+
require.NotNil(t, protocolS)
100106

101107
go func() {
102-
require.NoError(t, p.OpenInbound(context.TODO()))
108+
require.NoError(t, protocolR.OpenInbound(context.TODO()))
103109
}()
104110

105-
// Not perfect but we need to give OpenInbound() as chance to start
106-
// as it's a race condition. I couldn't find something on 'p' to wait for
107-
time.Sleep(6 * time.Second)
108-
109111
return func() {
110-
require.NoError(t, p.Close(context.TODO()))
111-
require.NoError(t, client.Close())
112-
}, p, p, topicName
112+
require.NoError(t, protocolR.Close(context.TODO()))
113+
require.NoError(t, protocolS.Close(context.TODO()))
114+
require.NoError(t, clientR.Close())
115+
require.NoError(t, clientS.Close())
116+
}, protocolS, protocolR, topicName
113117
}
114118

115119
func BenchmarkSendReceive(b *testing.B) {

0 commit comments

Comments
 (0)