Skip to content

CE receiver client does not have option to make callback a blocking call #770

@jyu-go

Description

@jyu-go

#574 changed the behavior of the CE v2 client so the callback fn is always asynchronous ( one go routine is spawn for each event ).

sdk-go/v2/client/client.go

Lines 251 to 258 in 5decd2e

// Do not block on the invoker.
wg.Add(1)
go func() {
if err := c.invoker.Invoke(ctx, msg, respFn); err != nil {
cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err)
}
wg.Done()
}()

This indeed addressed #569 but also made any of the following use cases a lot trickier.

  1. Don't want two events to be processed at the same time
  2. Want events to be handled in the order they were received
  3. Want to resume from last check point in the event of service restart ( e.g. in Kafka case, fn callback may commit the offset for the messages they has been received and restarted service will pick up any event hasn't been committed offset for)

I'd propose adding an option WithBlockingInvokerCall to allow the old behavior. I'll create a pull request for it to be considered.

How to reproduce

This is basically the counter of the same section in #569

Run the same example receiver code as in #569: https://gist.github.com/liu-cong/a132fe8629715419c7a58ee20001d071

Then send a few events to the receiver using curl,

curl -v localhost:8080
  -X POST \
  -H "Ce-Sleep: 1m" \
  -H "Ce-Id: 1" \
  -H "Ce-Specversion: 1.0" \
  -H "Ce-Type: type" \
  -H "Ce-Source: source" \
  -H "Content-Type: application/json" \
  -d '{"msg":"Test!"}'

The receive function won't be blocked until the sleep period is done

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions