Skip to content

Commit 57be3cd

Browse files
author
Doug Davis
committed
Try to make sure the Receiver starts before we send events
Signed-off-by: Doug Davis <dug@microsoft.com>
1 parent 24f0eb4 commit 57be3cd

File tree

4 files changed

+49
-6
lines changed

4 files changed

+49
-6
lines changed

test/integration/mqtt_paho/mqtt_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,18 @@ func TestSendEvent(t *testing.T) {
3737
// start a cloudevents receiver client go to receive the event
3838
eventChan := make(chan receiveEvent)
3939
defer close(eventChan)
40+
41+
// Used to try to make sure the receiver is ready before we start to
42+
// send events
43+
wait := make(chan bool)
44+
4045
go func() {
4146
client, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName))
4247
if err != nil {
4348
eventChan <- receiveEvent{err: err}
4449
return
4550
}
51+
wait <- true
4652
err = client.StartReceiver(ctx, func(event cloudevents.Event) {
4753
eventChan <- receiveEvent{event: event}
4854
})
@@ -52,6 +58,11 @@ func TestSendEvent(t *testing.T) {
5258
}
5359
}()
5460

61+
// Wait until receiver thread starts, and then wait a second to
62+
// give the "StartReceive" call a chance to start (finger's crossed)
63+
<-wait
64+
time.Sleep(time.Second)
65+
5566
// start a cloudevents sender client go to send the event, set the topic on context
5667
client, err := cloudevents.NewClient(protocolFactory(ctx, t, ""))
5768
require.NoError(t, err)

test/integration/mqtt_paho_binding/mqtt_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,25 @@ func startReceiver(ctx context.Context, topicName string, messageChan chan recei
106106
if err != nil {
107107
messageChan <- receiveMessage{err: err}
108108
}
109+
110+
// Used to try to make sure the receiver is ready before we start to
111+
// get events
112+
wait := make(chan bool)
113+
109114
go func() {
115+
wait <- true
110116
err := receiver.OpenInbound(ctx)
111117
if err != nil {
112118
messageChan <- receiveMessage{err: err}
113119
}
114120
receiver.Close(ctx)
115121
}()
122+
123+
// Wait for other thread to start and run OpenInbound + sleep a sec
124+
// hoping that things will get ready before we call Receive() below
125+
<-wait
126+
time.Sleep(time.Second)
127+
116128
go func() {
117129
msg, result := receiver.Receive(ctx)
118130
messageChan <- receiveMessage{msg, result}

v2/client/test/test.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,20 @@ import (
2525
func SendReceive(t *testing.T, protocolFactory func() interface{}, in event.Event, outAssert func(e event.Event), opts ...client.Option) {
2626
t.Helper()
2727
pf := protocolFactory()
28-
c, err := client.New(pf, opts...)
28+
29+
// Create a sender and receiver client since we can't assume it's safe
30+
// to use the same one for both roles
31+
32+
sender, err := client.New(pf, opts...)
33+
require.NoError(t, err)
34+
35+
receiver, err := client.New(pf, opts...)
2936
require.NoError(t, err)
37+
3038
wg := sync.WaitGroup{}
3139
wg.Add(2)
3240

33-
// Give time for Kafka client protocol to get setup
34-
time.Sleep(2 * time.Second)
41+
receiverReady := make(chan bool)
3542

3643
go func() {
3744
ctx, cancel := context.WithCancel(context.TODO())
@@ -42,7 +49,8 @@ func SendReceive(t *testing.T, protocolFactory func() interface{}, in event.Even
4249
wg.Done()
4350
}(inCh)
4451
go func(channel chan event.Event) {
45-
err := c.StartReceiver(ctx, func(e event.Event) {
52+
receiverReady <- true
53+
err := receiver.StartReceiver(ctx, func(e event.Event) {
4654
channel <- e
4755
})
4856
if err != nil {
@@ -53,12 +61,14 @@ func SendReceive(t *testing.T, protocolFactory func() interface{}, in event.Even
5361
outAssert(e)
5462
}()
5563

56-
// Give time for the receiever to start
64+
// Wait for receiver to be setup. Not 100% perefect but the channel + the
65+
// sleep should do it
66+
<-receiverReady
5767
time.Sleep(2 * time.Second)
5868

5969
go func() {
6070
defer wg.Done()
61-
err := c.Send(context.Background(), in)
71+
err := sender.Send(context.Background(), in)
6272
require.NoError(t, err)
6373
}()
6474

v2/protocol/test/test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@ func SendReceive(t *testing.T, ctx context.Context, in binding.Message, s protoc
2525
wg := sync.WaitGroup{}
2626
wg.Add(2)
2727

28+
// Used to try to make sure the receiver is ready before we start to
29+
// send events
30+
wait := make(chan bool)
31+
2832
go func() {
2933
defer wg.Done()
34+
wait <- true
3035
out, result := r.Receive(ctx)
3136
if !protocol.IsACK(result) {
3237
require.NoError(t, result)
@@ -36,6 +41,11 @@ func SendReceive(t *testing.T, ctx context.Context, in binding.Message, s protoc
3641
require.NoError(t, finishErr)
3742
}()
3843

44+
// Wait until receiver thread starts, and then wait a second to
45+
// give the "Receive" call a chance to start (finger's crossed)
46+
<-wait
47+
time.Sleep(time.Second)
48+
3949
go func() {
4050
defer wg.Done()
4151
mx := sync.Mutex{}

0 commit comments

Comments
 (0)