Skip to content

Keep event ordering #642

@pastequo

Description

@pastequo

Hello,

It might have been discussed, but I couldn't find any information related to data ordering.
With the current sdk, it's not possible to ensure that event are processed in the order that they are read

Some example by modifying the kafka sender-receiver sample:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"sync/atomic"
	"time"

	"github.com/Shopify/sarama"

	"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
)

const (
	count = 10
)

func main() {
	saramaConfig := sarama.NewConfig()
	saramaConfig.Version = sarama.V2_0_0_0

	topic := fmt.Sprintf("topic-%v", time.Now().Unix())

	receiver, err := kafka_sarama.NewConsumer([]string{"127.0.0.1:9092"}, saramaConfig, "test-group-id", topic)
	if err != nil {
		log.Fatalf("failed to create receiver: %s", err.Error())
	}

	defer receiver.Close(context.Background())

	c, err := cloudevents.NewClient(receiver, cloudevents.WithPollGoroutines(1))
	if err != nil {
		log.Fatalf("failed to create client, %v", err)
	}

	// Create a done channel to block until we've received (count) messages
	done := make(chan struct{})

	// Start the receiver
	go func() {
		log.Printf("will listen consuming topic %v\n", topic)
		var recvCount int32
		err = c.StartReceiver(context.TODO(), func(ctx context.Context, event cloudevents.Event) {
			receive(ctx, event)
			if atomic.AddInt32(&recvCount, 1) == count {
				done <- struct{}{}
			}
		})
		if err != nil {
			log.Fatalf("failed to start receiver: %s", err)
		} else {
			log.Printf("receiver stopped\n")
		}
	}()

	// Ensure that the consumer is ready before staring pushing events
	time.Sleep(4 * time.Second)

	sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, topic)
	if err != nil {
		log.Fatalf("failed to create protocol: %s", err.Error())
	}

	defer sender.Close(context.Background())

	cp, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs(), cloudevents.WithPollGoroutines(1))
	if err != nil {
		log.Fatalf("failed to create client, %v", err)
	}

	// Start sending the events
	for i := 0; i < count; i++ {
		e := cloudevents.NewEvent()
		e.SetType("com.cloudevents.sample.sent")
		e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/httpb/requester")
		_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
			"id":      i,
			"message": "Hello, World!",
		})

		if result := cp.Send(context.Background(), e); cloudevents.IsUndelivered(result) {
			log.Printf("failed to send: %v", err)
		} else {
			log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
		}
	}

	<-done
}

func receive(ctx context.Context, event cloudevents.Event) {
	// Random processing time
	n := rand.Intn(10)
	time.Sleep(time.Duration(n) * time.Second)
	
	fmt.Printf("After %vs, %s\n", n, event.DataEncoded)
}

The output being:

2020/12/15 10:00:44 will listen consuming topic topic-1608022844
2020/12/15 10:00:48 sent: 0, accepted: true
2020/12/15 10:00:48 sent: 1, accepted: true
2020/12/15 10:00:48 sent: 2, accepted: true
2020/12/15 10:00:48 sent: 3, accepted: true
2020/12/15 10:00:48 sent: 4, accepted: true
2020/12/15 10:00:48 sent: 5, accepted: true
2020/12/15 10:00:48 sent: 6, accepted: true
2020/12/15 10:00:48 sent: 7, accepted: true
2020/12/15 10:00:48 sent: 8, accepted: true
2020/12/15 10:00:48 sent: 9, accepted: true
After 1s, {"id":2,"message":"Hello, World!"}
After 1s, {"id":7,"message":"Hello, World!"}
After 3s, {"id":9,"message":"Hello, World!"}
After 4s, {"id":8,"message":"Hello, World!"}
After 4s, {"id":6,"message":"Hello, World!"}
After 5s, {"id":4,"message":"Hello, World!"}
After 6s, {"id":0,"message":"Hello, World!"}
After 7s, {"id":3,"message":"Hello, World!"}
After 9s, {"id":1,"message":"Hello, World!"}
After 9s, {"id":5,"message":"Hello, World!"}

I think it would be great to have a way to specify that events should be processed 1 by 1.
Especially with Kafka, if the event with id:9 (cf output above) is processed quickly and successfully, marking the message will (I think) set the consumer group offset to 9, even if processing of messages 8, 6, 4, 0, 3, 1, 5 are not done and may fail. Which would lead, after a panic for example, to data loss

I tried to investigate a little bit, I think a limited change here

Especially, in term of concurrency, polling events are done with a poll of goroutines whereas processing the events are done with an unbounded poll of goroutines. Would it be possible to start N goroutine dedicated to process the events ?

Code example:

	wg := sync.WaitGroup{}

	type job struct {
		Msg    binding.Message
		RespFn protocol.ResponseFn
	}

	// chan to hold message to process.
	jobs := make(chan job)

	// Start N processors.
	for i := 0; i < c.processGoroutines; i++ {
		go func() {
			for job := range jobs {
				wg.Add(1)
				defer wg.Done()

				if err := c.invoker.Invoke(ctx, job.Msg, job.RespFn); err != nil {
					cecontext.LoggerFrom(ctx).Warnf("Error while handling a message: %s", err)
				}
			}

			cecontext.LoggerFrom(ctx).Warn("Processor stop working")
			// Restart it ?
		}()
	}

	// Start Polling.
	for i := 0; i < c.pollGoroutines; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for {
				var msg binding.Message
				var respFn protocol.ResponseFn
				var err error

				if c.responder != nil {
					msg, respFn, err = c.responder.Respond(ctx)
				} else if c.receiver != nil {
					msg, err = c.receiver.Receive(ctx)
					respFn = noRespFn
				}

				if err == io.EOF { // Normal close
					return
				}

				if err != nil {
					cecontext.LoggerFrom(ctx).Warnf("Error while receiving a message: %s", err)
					continue
				}

				// Add job to queue
				jobs <- job{Msg: msg, RespFn: respFn}
			}
		}()
	}

Then, a project could set processGoroutines & pollGoroutines to 1 to ensure event ordering

I can help with the PR if needed

Thank you !

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions