While debugging an issue in knative-gcp with @tommyreddad, we found that the CE v2 http client can only support up to GOMAXPROCS concurrent requests. As a result, if some receiver fn are taking long time to finish, they essentially block other event processing.
The issue is that in client.go, it spawns GOMAXPROCS goroutines. Each goroutine is an infinite loop to call the invoker to process events. The problem is that invoker is called synchronously, therefore each goroutine up to GOMAXPROCS is essentially processing events one by one.
|
for i := 0; i < c.pollGoroutines; i++ { |
|
wg.Add(1) |
|
go func() { |
|
defer wg.Done() |
|
for { |
|
var msg binding.Message |
|
var respFn protocol.ResponseFn |
|
var err error |
|
|
|
if c.responder != nil { |
|
msg, respFn, err = c.responder.Respond(ctx) |
|
} else if c.receiver != nil { |
|
msg, err = c.receiver.Receive(ctx) |
|
respFn = noRespFn |
|
} |
|
|
|
if err == io.EOF { // Normal close |
|
return |
|
} |
|
|
|
if err != nil { |
|
cecontext.LoggerFrom(ctx).Warnf("Error while receiving a message: %s", err) |
|
continue |
|
} |
|
|
|
if err := c.invoker.Invoke(ctx, msg, respFn); err != nil { |
|
cecontext.LoggerFrom(ctx).Warnf("Error while handling a message: %s", err) |
|
} |
|
} |
|
}() |
|
} |
Another potential performance blocker is that the incoming channel in http protocol is blocking. I believe a buffered channel is better.
|
p := &Protocol{ |
|
incoming: make(chan msgErr), |
|
Port: -1, |
|
} |
How to reproduce
Run the example receiver code: https://gist.github.com/liu-cong/a132fe8629715419c7a58ee20001d071
It retrieves the Ce-Sleep attribute from the event and sleep for that period of time.
Then send a few events to the receiver using curl, (note that we set Ce-Sleep to 1min).
curl -v localhost:8080
-X POST \
-H "Ce-Sleep: 1m" \
-H "Ce-Id: 1" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: type" \
-H "Ce-Source: source" \
-H "Content-Type: application/json" \
-d '{"msg":"Test!"}
Once you reach GOMAXPROCS events, the receive function for next event will be blocked until the sleep period is done.
While debugging an issue in knative-gcp with @tommyreddad, we found that the CE v2 http client can only support up to GOMAXPROCS concurrent requests. As a result, if some receiver
fnare taking long time to finish, they essentially block other event processing.The issue is that in
client.go, it spawns GOMAXPROCS goroutines. Each goroutine is an infinite loop to call theinvokerto process events. The problem is thatinvokeris called synchronously, therefore each goroutine up to GOMAXPROCS is essentially processing events one by one.sdk-go/v2/client/client.go
Lines 211 to 241 in 3db01d4
Another potential performance blocker is that the incoming channel in http protocol is blocking. I believe a buffered channel is better.
sdk-go/v2/protocol/http/protocol.go
Lines 73 to 76 in 3db01d4
How to reproduce
Run the example receiver code: https://gist.github.com/liu-cong/a132fe8629715419c7a58ee20001d071
It retrieves the
Ce-Sleepattribute from the event and sleep for that period of time.Then send a few events to the receiver using curl, (note that we set
Ce-Sleepto 1min).Once you reach GOMAXPROCS events, the
receivefunction for next event will be blocked until the sleep period is done.