Skip to content

Commit 310da90

Browse files
Matthew Dowdellembano1
authored andcommitted
Support ACK when receiving malformed events
My team faced much the same issue as outlined in #757; malformed events are sent to a Kafka topic and clients endlessly fail to read the event. While this is hard to induce when the sender uses the Go CloudEvents SDK, there are a good amount of Python clients across our services which unfortunately lack validation that might prevent this. I've elected to make this behaviour configurable via client options, as suggested in #757. This would be appropriate to use when no `protocol.Responder` implementation is available, as is the case with the `kafka_sarama` module. I explored wrapping the existing `protocol.Receiver` implementation to allow it to behave like `protocol.Responder`, but that ended up being a lot of code compared to the light touch that could be applied here. Signed-off-by: Matthew Dowdell <matthew.dowdell@hpe.com>
1 parent 85db5b9 commit 310da90

File tree

6 files changed

+162
-5
lines changed

6 files changed

+162
-5
lines changed

v2/client/client.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type ceClient struct {
9898
eventDefaulterFns []EventDefaulter
9999
pollGoroutines int
100100
blockingCallback bool
101+
ackMalformedEvent bool
101102
}
102103

103104
func (c *ceClient) applyOptions(opts ...Option) error {
@@ -202,7 +203,13 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
202203
return fmt.Errorf("client already has a receiver")
203204
}
204205

205-
invoker, err := newReceiveInvoker(fn, c.observabilityService, c.inboundContextDecorators, c.eventDefaulterFns...)
206+
invoker, err := newReceiveInvoker(
207+
fn,
208+
c.observabilityService,
209+
c.inboundContextDecorators,
210+
c.eventDefaulterFns,
211+
c.ackMalformedEvent,
212+
)
206213
if err != nil {
207214
return err
208215
}

v2/client/client_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/google/go-cmp/cmp"
2525

26+
"github.com/cloudevents/sdk-go/v2/binding"
2627
"github.com/cloudevents/sdk-go/v2/client"
2728
"github.com/cloudevents/sdk-go/v2/event"
2829
"github.com/cloudevents/sdk-go/v2/protocol"
@@ -399,6 +400,71 @@ func TestClientContext(t *testing.T) {
399400
wg.Wait()
400401
}
401402

403+
func TestClientStartReceiverWithAckMalformedEvent(t *testing.T) {
404+
testCases := []struct {
405+
name string
406+
opts []client.Option
407+
expectedAck bool
408+
}{
409+
{
410+
name: "without ack",
411+
},
412+
{
413+
name: "with ack",
414+
opts: []client.Option{client.WithAckMalformedEvent()},
415+
expectedAck: true,
416+
},
417+
}
418+
419+
for _, tc := range testCases {
420+
t.Run(tc.name, func(t *testing.T) {
421+
// make sure the receiver goroutine is closed
422+
ctx, cancel := context.WithCancel(context.Background())
423+
defer cancel()
424+
425+
receiver := &mockReceiver{
426+
finished: make(chan struct{}),
427+
}
428+
429+
// only need 1 goroutine to exercise this
430+
tc.opts = append(tc.opts, client.WithPollGoroutines(1))
431+
432+
c, err := client.New(receiver, tc.opts...)
433+
if err != nil {
434+
t.Errorf("failed to construct client: %v", err)
435+
}
436+
437+
go c.StartReceiver(ctx, func(ctx context.Context, e event.Event) protocol.Result {
438+
t.Error("receiver callback called unexpectedly")
439+
return nil
440+
})
441+
442+
// wait for receive to occur
443+
time.Sleep(time.Millisecond)
444+
445+
ctx, cancelTimeout := context.WithTimeout(ctx, time.Second)
446+
defer cancelTimeout()
447+
448+
select {
449+
case <-receiver.finished:
450+
// continue to rest of the test
451+
case <-ctx.Done():
452+
t.Errorf("timeoued out waiting for receiver to complete")
453+
}
454+
455+
if tc.expectedAck {
456+
if protocol.IsNACK(receiver.result) {
457+
t.Errorf("receiver did not receive ACK: %v", receiver.result)
458+
}
459+
} else {
460+
if protocol.IsACK(receiver.result) {
461+
t.Errorf("receiver did not receive NACK: %v", receiver.result)
462+
}
463+
}
464+
})
465+
}
466+
}
467+
402468
type requestValidation struct {
403469
Host string
404470
Headers http.Header
@@ -488,3 +554,38 @@ func isImportantHeader(h string) bool {
488554
}
489555
return true
490556
}
557+
558+
type mockMessage struct{}
559+
560+
func (m *mockMessage) ReadEncoding() binding.Encoding {
561+
return binding.EncodingUnknown
562+
}
563+
564+
func (m *mockMessage) ReadStructured(ctx context.Context, writer binding.StructuredWriter) error {
565+
return nil
566+
}
567+
func (m *mockMessage) ReadBinary(ctx context.Context, writer binding.BinaryWriter) error { return nil }
568+
func (m *mockMessage) Finish(err error) error { return nil }
569+
570+
type mockReceiver struct {
571+
mu sync.Mutex
572+
count int
573+
result error
574+
finished chan struct{}
575+
}
576+
577+
func (m *mockReceiver) Receive(ctx context.Context) (binding.Message, error) {
578+
m.mu.Lock()
579+
defer m.mu.Unlock()
580+
581+
if m.count > 0 {
582+
return nil, io.EOF
583+
}
584+
585+
m.count++
586+
587+
return binding.WithFinish(&mockMessage{}, func(err error) {
588+
m.result = err
589+
close(m.finished)
590+
}), nil
591+
}

v2/client/http_receiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
func NewHTTPReceiveHandler(ctx context.Context, p *thttp.Protocol, fn interface{}) (*EventReceiver, error) {
17-
invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil) //TODO(slinkydeveloper) maybe not nil?
17+
invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil, nil, false) //TODO(slinkydeveloper) maybe not nil?
1818
if err != nil {
1919
return nil, err
2020
}

v2/client/invoker.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,18 @@ type Invoker interface {
2323

2424
var _ Invoker = (*receiveInvoker)(nil)
2525

26-
func newReceiveInvoker(fn interface{}, observabilityService ObservabilityService, inboundContextDecorators []func(context.Context, binding.Message) context.Context, fns ...EventDefaulter) (Invoker, error) {
26+
func newReceiveInvoker(
27+
fn interface{},
28+
observabilityService ObservabilityService,
29+
inboundContextDecorators []func(context.Context, binding.Message) context.Context,
30+
fns []EventDefaulter,
31+
ackMalformedEvent bool,
32+
) (Invoker, error) {
2733
r := &receiveInvoker{
2834
eventDefaulterFns: fns,
2935
observabilityService: observabilityService,
3036
inboundContextDecorators: inboundContextDecorators,
37+
ackMalformedEvent: ackMalformedEvent,
3138
}
3239

3340
if fn, err := receiver(fn); err != nil {
@@ -44,6 +51,7 @@ type receiveInvoker struct {
4451
observabilityService ObservabilityService
4552
eventDefaulterFns []EventDefaulter
4653
inboundContextDecorators []func(context.Context, binding.Message) context.Context
54+
ackMalformedEvent bool
4755
}
4856

4957
func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn protocol.ResponseFn) (err error) {
@@ -58,13 +66,13 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p
5866
switch {
5967
case eventErr != nil && r.fn.hasEventIn:
6068
r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)
61-
return respFn(ctx, nil, protocol.NewReceipt(false, "failed to convert Message to Event: %w", eventErr))
69+
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr))
6270
case r.fn != nil:
6371
// Check if event is valid before invoking the receiver function
6472
if e != nil {
6573
if validationErr := e.Validate(); validationErr != nil {
6674
r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr)
67-
return respFn(ctx, nil, protocol.NewReceipt(false, "validation error in incoming event: %w", validationErr))
75+
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))
6876
}
6977
}
7078

v2/client/options.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,16 @@ func WithBlockingCallback() Option {
126126
return nil
127127
}
128128
}
129+
130+
// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged
131+
// rather than being permanently not-acknowledged. This can be useful when a protocol does not
132+
// provide a responder implementation and would otherwise cause the receiver to be partially or
133+
// fully stuck.
134+
func WithAckMalformedEvent() Option {
135+
return func(i interface{}) error {
136+
if c, ok := i.(*ceClient); ok {
137+
c.ackMalformedEvent = true
138+
}
139+
return nil
140+
}
141+
}

v2/client/options_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,31 @@ func TestWith_Defaulters(t *testing.T) {
136136
})
137137
}
138138
}
139+
140+
func TestWithAckMalformedEvent(t *testing.T) {
141+
testCases := []struct {
142+
name string
143+
opts []Option
144+
expected bool
145+
}{
146+
{
147+
name: "unset",
148+
},
149+
{
150+
name: "set",
151+
opts: []Option{WithAckMalformedEvent()},
152+
expected: true,
153+
},
154+
}
155+
156+
for _, tc := range testCases {
157+
t.Run(tc.name, func(t *testing.T) {
158+
client := &ceClient{}
159+
client.applyOptions(tc.opts...)
160+
161+
if client.ackMalformedEvent != tc.expected {
162+
t.Errorf("unexpected ackMalformedEvent; want: %t; got: %t", tc.expected, client.ackMalformedEvent)
163+
}
164+
})
165+
}
166+
}

0 commit comments

Comments
 (0)