-
Notifications
You must be signed in to change notification settings - Fork 242
Keep event ordering #642
Description
Hello,
It might have been discussed, but I couldn't find any information related to data ordering.
With the current sdk, it's not possible to ensure that event are processed in the order that they are read
Some example by modifying the kafka sender-receiver sample:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync/atomic"
"time"
"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
const (
count = 10
)
func main() {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
topic := fmt.Sprintf("topic-%v", time.Now().Unix())
receiver, err := kafka_sarama.NewConsumer([]string{"127.0.0.1:9092"}, saramaConfig, "test-group-id", topic)
if err != nil {
log.Fatalf("failed to create receiver: %s", err.Error())
}
defer receiver.Close(context.Background())
c, err := cloudevents.NewClient(receiver, cloudevents.WithPollGoroutines(1))
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
// Create a done channel to block until we've received (count) messages
done := make(chan struct{})
// Start the receiver
go func() {
log.Printf("will listen consuming topic %v\n", topic)
var recvCount int32
err = c.StartReceiver(context.TODO(), func(ctx context.Context, event cloudevents.Event) {
receive(ctx, event)
if atomic.AddInt32(&recvCount, 1) == count {
done <- struct{}{}
}
})
if err != nil {
log.Fatalf("failed to start receiver: %s", err)
} else {
log.Printf("receiver stopped\n")
}
}()
// Ensure that the consumer is ready before staring pushing events
time.Sleep(4 * time.Second)
sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, topic)
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer sender.Close(context.Background())
cp, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs(), cloudevents.WithPollGoroutines(1))
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
// Start sending the events
for i := 0; i < count; i++ {
e := cloudevents.NewEvent()
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/httpb/requester")
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": i,
"message": "Hello, World!",
})
if result := cp.Send(context.Background(), e); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", err)
} else {
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
}
<-done
}
func receive(ctx context.Context, event cloudevents.Event) {
// Random processing time
n := rand.Intn(10)
time.Sleep(time.Duration(n) * time.Second)
fmt.Printf("After %vs, %s\n", n, event.DataEncoded)
}The output being:
2020/12/15 10:00:44 will listen consuming topic topic-1608022844
2020/12/15 10:00:48 sent: 0, accepted: true
2020/12/15 10:00:48 sent: 1, accepted: true
2020/12/15 10:00:48 sent: 2, accepted: true
2020/12/15 10:00:48 sent: 3, accepted: true
2020/12/15 10:00:48 sent: 4, accepted: true
2020/12/15 10:00:48 sent: 5, accepted: true
2020/12/15 10:00:48 sent: 6, accepted: true
2020/12/15 10:00:48 sent: 7, accepted: true
2020/12/15 10:00:48 sent: 8, accepted: true
2020/12/15 10:00:48 sent: 9, accepted: true
After 1s, {"id":2,"message":"Hello, World!"}
After 1s, {"id":7,"message":"Hello, World!"}
After 3s, {"id":9,"message":"Hello, World!"}
After 4s, {"id":8,"message":"Hello, World!"}
After 4s, {"id":6,"message":"Hello, World!"}
After 5s, {"id":4,"message":"Hello, World!"}
After 6s, {"id":0,"message":"Hello, World!"}
After 7s, {"id":3,"message":"Hello, World!"}
After 9s, {"id":1,"message":"Hello, World!"}
After 9s, {"id":5,"message":"Hello, World!"}
I think it would be great to have a way to specify that events should be processed 1 by 1.
Especially with Kafka, if the event with id:9 (cf output above) is processed quickly and successfully, marking the message will (I think) set the consumer group offset to 9, even if processing of messages 8, 6, 4, 0, 3, 1, 5 are not done and may fail. Which would lead, after a panic for example, to data loss
I tried to investigate a little bit, I think a limited change here
Especially, in term of concurrency, polling events are done with a poll of goroutines whereas processing the events are done with an unbounded poll of goroutines. Would it be possible to start N goroutine dedicated to process the events ?
Code example:
wg := sync.WaitGroup{}
type job struct {
Msg binding.Message
RespFn protocol.ResponseFn
}
// chan to hold message to process.
jobs := make(chan job)
// Start N processors.
for i := 0; i < c.processGoroutines; i++ {
go func() {
for job := range jobs {
wg.Add(1)
defer wg.Done()
if err := c.invoker.Invoke(ctx, job.Msg, job.RespFn); err != nil {
cecontext.LoggerFrom(ctx).Warnf("Error while handling a message: %s", err)
}
}
cecontext.LoggerFrom(ctx).Warn("Processor stop working")
// Restart it ?
}()
}
// Start Polling.
for i := 0; i < c.pollGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
var msg binding.Message
var respFn protocol.ResponseFn
var err error
if c.responder != nil {
msg, respFn, err = c.responder.Respond(ctx)
} else if c.receiver != nil {
msg, err = c.receiver.Receive(ctx)
respFn = noRespFn
}
if err == io.EOF { // Normal close
return
}
if err != nil {
cecontext.LoggerFrom(ctx).Warnf("Error while receiving a message: %s", err)
continue
}
// Add job to queue
jobs <- job{Msg: msg, RespFn: respFn}
}
}()
}Then, a project could set processGoroutines & pollGoroutines to 1 to ensure event ordering
I can help with the PR if needed
Thank you !