55package websocket
66
77import (
8+ "bytes"
89 "context"
910 "errors"
1011 "fmt"
12+ "io"
1113 "net/url"
1214 "reflect"
1315 "time"
1416
1517 "github.com/google/cel-go/cel"
1618 "github.com/gorilla/websocket"
19+ "go.uber.org/zap/zapcore"
1720 "google.golang.org/protobuf/types/known/structpb"
1821
1922 v2 "github.com/elastic/beats/v7/filebeat/input/v2"
@@ -109,7 +112,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
109112 headers := formHeader (cfg )
110113 c , resp , err := websocket .DefaultDialer .DialContext (ctx , url , headers )
111114 if resp != nil && resp .Body != nil {
112- log .Debugw ("websocket connection response" , "body" , resp .Body )
115+ var buf bytes.Buffer
116+ if log .Core ().Enabled (zapcore .DebugLevel ) {
117+ const limit = 1e4
118+ io .CopyN (& buf , resp .Body , limit )
119+ }
120+ if n , _ := io .Copy (io .Discard , resp .Body ); n != 0 && buf .Len () != 0 {
121+ buf .WriteString ("... truncated" )
122+ }
123+ log .Debugw ("websocket connection response" , "body" , & buf )
113124 resp .Body .Close ()
114125 }
115126 if err != nil {
@@ -119,41 +130,26 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
119130 }
120131 defer c .Close ()
121132
122- done := make (chan error )
123-
124- go func () {
125- defer close (done )
126- for {
127- _ , message , err := c .ReadMessage ()
128- if err != nil {
129- metrics .errorsTotal .Inc ()
130- if websocket .IsCloseError (err , websocket .CloseNormalClosure , websocket .CloseGoingAway ) {
131- log .Errorw ("websocket connection closed" , "error" , err )
132- } else {
133- log .Errorw ("failed to read websocket data" , "error" , err )
134- }
135- done <- err
136- return
137- }
138- metrics .receivedBytesTotal .Add (uint64 (len (message )))
139- state ["response" ] = message
140- log .Debugw ("received websocket message" , logp .Namespace ("websocket" ), string (message ))
141- err = i .processAndPublishData (ctx , metrics , prg , ast , state , cursor , pub , log )
142- if err != nil {
143- metrics .errorsTotal .Inc ()
144- log .Errorw ("failed to process and publish data" , "error" , err )
145- done <- err
146- return
133+ for {
134+ _ , message , err := c .ReadMessage ()
135+ if err != nil {
136+ metrics .errorsTotal .Inc ()
137+ if websocket .IsCloseError (err , websocket .CloseNormalClosure , websocket .CloseGoingAway ) {
138+ log .Errorw ("websocket connection closed" , "error" , err )
139+ } else {
140+ log .Errorw ("failed to read websocket data" , "error" , err )
147141 }
142+ return err
143+ }
144+ metrics .receivedBytesTotal .Add (uint64 (len (message )))
145+ state ["response" ] = message
146+ log .Debugw ("received websocket message" , logp .Namespace ("websocket" ), string (message ))
147+ err = i .processAndPublishData (ctx , metrics , prg , ast , state , cursor , pub , log )
148+ if err != nil {
149+ metrics .errorsTotal .Inc ()
150+ log .Errorw ("failed to process and publish data" , "error" , err )
151+ return err
148152 }
149- }()
150-
151- // blocks until done is closed, context is cancelled or an error is received
152- select {
153- case err := <- done :
154- return err
155- case <- ctx .Done ():
156- return ctx .Err ()
157153 }
158154}
159155
0 commit comments