Skip to content

req.Body not reset in doWithRetry #773

@embano1

Description

@embano1

Scenario:

When using a net/http HTTP server as remote/receiver (instead of the receiver from this package), ceClient has issues when retrying a non-successful HTTP response.

Error: "msg":"Invalid result type, not HTTP Result: Post \"http://always-fail.default.127.0.0.1.sslip.io\": http: ContentLength=34 with Body length 0 (type: *protocol.Receipt)"}

This happens when the remote server returns a message in the response, e.g. „server down“ (not CE) but not when just returning a status code and empty response body.

The issue is caused by func (p *Protocol) doWithRetry() because it does not reset the req.Body during subsequent attempts.

Expected behavior:

Return encoding error, but correctly reset req.Body, e.g. return "failed to convert response into event: unknown Message encoding\n500: (3x)"

Client code:

/*
 Copyright 2021 The CloudEvents Authors
 SPDX-License-Identifier: Apache-2.0
*/

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
	"go.uber.org/zap"
	"knative.dev/eventing/pkg/kncloudevents"
	"knative.dev/pkg/logging"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/protocol"
	cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

func main() {
	ctx := cloudevents.ContextWithTarget(context.Background(), "http://always-fail.default.127.0.0.1.sslip.io")

	topts := []cehttp.Option{cehttp.WithIsRetriableFunc(func(statusCode int) bool {
		retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: statusCode}, nil)
		return retry
	})}

	ceClient, err := client.NewClientHTTP(topts, nil)
	if err != nil {
		panic(err)
	}

	e := cloudevents.NewEvent()
	e.SetType("com.cloudevents.sample.sent")
	e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/httpb/sender")
	_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
		"id":      1,
		"message": "Hello, World!",
	})

	ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, 10*time.Millisecond, 3)
	response, result := ceClient.Request(ctx, e)

	if !isSuccess(ctx, result) {
		logging.FromContext(ctx).Errorw("Failed to deliver", zap.Error(result))
		return
	}

	fmt.Printf("got: %v", response)
}

func isSuccess(ctx context.Context, result protocol.Result) bool {
	var retriesResult *cehttp.RetriesResult
	if cloudevents.ResultAs(result, &retriesResult) {
		var httpResult *cehttp.Result
		if cloudevents.ResultAs(retriesResult.Result, &httpResult) {
			retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: httpResult.StatusCode}, nil)
			return !retry
		}
		logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v (type: %T)", retriesResult.Result, retriesResult.Result)
		return false
	}

	logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult")
	return false
}

HTTP server code (note: just using net/http)

package main

import (
	"context"
	"errors"
	"net/http"
	"time"

	"github.com/go-chi/chi/v5"
	"github.com/go-chi/chi/v5/middleware"
	"knative.dev/pkg/logging"
	"knative.dev/pkg/signals"
)

const (
	addr    = ":8080"
	timeout = 5 * time.Second
)

func main() {
	ctx := signals.NewContext()

	r := chi.NewRouter()
	r.Use(middleware.Logger)
	r.Post("/", func(w http.ResponseWriter, r *http.Request) {
		// w.WriteHeader(http.StatusInternalServerError) <- just doing this works!
		http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
	})

	srv := http.Server{
		Addr:         addr,
		Handler:      r,
		ReadTimeout:  timeout,
		WriteTimeout: timeout,
	}

	go func() {
		<-ctx.Done()
		logging.FromContext(ctx).Info("shutting down")
		if err := srv.Shutdown(context.Background()); err != nil {
			logging.FromContext(ctx).Warnf("could not shutdown: %v", err)
		}
	}()

	logging.FromContext(ctx).Infow("listening", "address", addr)
	if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
		logging.FromContext(ctx).Fatal(err)
	}
}

Patch

diff --git a/v2/protocol/http/protocol_retry.go b/v2/protocol/http/protocol_retry.go
index fb7bcd2..9050469 100644
--- a/v2/protocol/http/protocol_retry.go
+++ b/v2/protocol/http/protocol_retry.go
@@ -6,8 +6,10 @@
 package http
 
 import (
+	"bytes"
 	"context"
 	"errors"
+	"io"
 	"net/http"
 	"net/url"
 	"time"
@@ -53,6 +55,17 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam
 	retry := 0
 	results := make([]protocol.Result, 0)
 
+	var (
+		body []byte
+		err  error
+	)
+	if req.Body != nil {
+		body, err = io.ReadAll(req.Body)
+		if err != nil {
+			panic(err)
+		}
+	}
+
 	for {
 		msg, result := p.doOnce(req)
 
@@ -90,6 +103,13 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam
 		}
 
 	DoBackoff:
+		if req.Body != nil {
+			req.Body = io.NopCloser(bytes.NewReader(body))
+			req.GetBody = func() (io.ReadCloser, error) {
+				return io.NopCloser(bytes.NewReader(body)), nil
+			}
+		}
+
 		// Wait for the correct amount of backoff time.
 
 		// total tries = retry + 1

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions