-
Notifications
You must be signed in to change notification settings - Fork 242
CE receiver client does not have option to make callback a blocking call #770
Copy link
Copy link
Closed
Description
#574 changed the behavior of the CE v2 client so the callback fn is always asynchronous ( one go routine is spawn for each event ).
Lines 251 to 258 in 5decd2e
| // Do not block on the invoker. | |
| wg.Add(1) | |
| go func() { | |
| if err := c.invoker.Invoke(ctx, msg, respFn); err != nil { | |
| cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err) | |
| } | |
| wg.Done() | |
| }() |
This indeed addressed #569 but also made any of the following use cases a lot trickier.
- Don't want two events to be processed at the same time
- Want events to be handled in the order they were received
- Want to resume from last check point in the event of service restart ( e.g. in Kafka case,
fncallback may commit the offset for the messages they has been received and restarted service will pick up any event hasn't been committed offset for)
I'd propose adding an option WithBlockingInvokerCall to allow the old behavior. I'll create a pull request for it to be considered.
How to reproduce
This is basically the counter of the same section in #569
Run the same example receiver code as in #569: https://gist.github.com/liu-cong/a132fe8629715419c7a58ee20001d071
Then send a few events to the receiver using curl,
curl -v localhost:8080
-X POST \
-H "Ce-Sleep: 1m" \
-H "Ce-Id: 1" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: type" \
-H "Ce-Source: source" \
-H "Content-Type: application/json" \
-d '{"msg":"Test!"}'The receive function won't be blocked until the sleep period is done
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels