-
Notifications
You must be signed in to change notification settings - Fork 242
req.Body not reset in doWithRetry #773
Description
Scenario:
When using a net/http HTTP server as remote/receiver (instead of the receiver from this package), ceClient has issues when retrying a non-successful HTTP response.
Error: "msg":"Invalid result type, not HTTP Result: Post \"http://always-fail.default.127.0.0.1.sslip.io\": http: ContentLength=34 with Body length 0 (type: *protocol.Receipt)"}
This happens when the remote server returns a message in the response, e.g. „server down“ (not CE) but not when just returning a status code and empty response body.
The issue is caused by func (p *Protocol) doWithRetry() because it does not reset the req.Body during subsequent attempts.
Expected behavior:
Return encoding error, but correctly reset req.Body, e.g. return "failed to convert response into event: unknown Message encoding\n500: (3x)"
Client code:
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package main
import (
"context"
"fmt"
"net/http"
"time"
"github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
"go.uber.org/zap"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/logging"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)
func main() {
ctx := cloudevents.ContextWithTarget(context.Background(), "http://always-fail.default.127.0.0.1.sslip.io")
topts := []cehttp.Option{cehttp.WithIsRetriableFunc(func(statusCode int) bool {
retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: statusCode}, nil)
return retry
})}
ceClient, err := client.NewClientHTTP(topts, nil)
if err != nil {
panic(err)
}
e := cloudevents.NewEvent()
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/httpb/sender")
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": 1,
"message": "Hello, World!",
})
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, 10*time.Millisecond, 3)
response, result := ceClient.Request(ctx, e)
if !isSuccess(ctx, result) {
logging.FromContext(ctx).Errorw("Failed to deliver", zap.Error(result))
return
}
fmt.Printf("got: %v", response)
}
func isSuccess(ctx context.Context, result protocol.Result) bool {
var retriesResult *cehttp.RetriesResult
if cloudevents.ResultAs(result, &retriesResult) {
var httpResult *cehttp.Result
if cloudevents.ResultAs(retriesResult.Result, &httpResult) {
retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: httpResult.StatusCode}, nil)
return !retry
}
logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v (type: %T)", retriesResult.Result, retriesResult.Result)
return false
}
logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult")
return false
}HTTP server code (note: just using net/http)
package main
import (
"context"
"errors"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
)
const (
addr = ":8080"
timeout = 5 * time.Second
)
func main() {
ctx := signals.NewContext()
r := chi.NewRouter()
r.Use(middleware.Logger)
r.Post("/", func(w http.ResponseWriter, r *http.Request) {
// w.WriteHeader(http.StatusInternalServerError) <- just doing this works!
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
})
srv := http.Server{
Addr: addr,
Handler: r,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
go func() {
<-ctx.Done()
logging.FromContext(ctx).Info("shutting down")
if err := srv.Shutdown(context.Background()); err != nil {
logging.FromContext(ctx).Warnf("could not shutdown: %v", err)
}
}()
logging.FromContext(ctx).Infow("listening", "address", addr)
if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
logging.FromContext(ctx).Fatal(err)
}
}Patch
diff --git a/v2/protocol/http/protocol_retry.go b/v2/protocol/http/protocol_retry.go
index fb7bcd2..9050469 100644
--- a/v2/protocol/http/protocol_retry.go
+++ b/v2/protocol/http/protocol_retry.go
@@ -6,8 +6,10 @@
package http
import (
+ "bytes"
"context"
"errors"
+ "io"
"net/http"
"net/url"
"time"
@@ -53,6 +55,17 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam
retry := 0
results := make([]protocol.Result, 0)
+ var (
+ body []byte
+ err error
+ )
+ if req.Body != nil {
+ body, err = io.ReadAll(req.Body)
+ if err != nil {
+ panic(err)
+ }
+ }
+
for {
msg, result := p.doOnce(req)
@@ -90,6 +103,13 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam
}
DoBackoff:
+ if req.Body != nil {
+ req.Body = io.NopCloser(bytes.NewReader(body))
+ req.GetBody = func() (io.ReadCloser, error) {
+ return io.NopCloser(bytes.NewReader(body)), nil
+ }
+ }
+
// Wait for the correct amount of backoff time.
// total tries = retry + 1