@@ -28,12 +28,15 @@ import (
2828 "github.com/elastic/beats/v7/libbeat/logp"
2929)
3030
31+ const unlimited = 0
32+
3133// lineReader reads lines from underlying reader, decoding the input stream
3234// using the configured codec. The reader keeps track of bytes consumed
3335// from raw input stream for every decoded line.
3436type LineReader struct {
3537 reader io.Reader
3638 bufferSize int
39+ maxBytes int // max bytes per line limit to avoid OOM with malformatted files
3740 nl []byte
3841 decodedNl []byte
3942 inBuffer * streambuf.Buffer
@@ -62,6 +65,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
6265 return & LineReader {
6366 reader : input ,
6467 bufferSize : config .BufferSize ,
68+ maxBytes : config .MaxBytes ,
6569 decoder : config .Codec .NewDecoder (),
6670 nl : nl ,
6771 decodedNl : terminator ,
@@ -121,17 +125,17 @@ func (r *LineReader) advance() error {
121125 // Initial check if buffer has already a newLine character
122126 idx := r .inBuffer .IndexFrom (r .inOffset , r .nl )
123127
124- // fill inBuffer until newline sequence has been found in input buffer
128+ // Fill inBuffer until newline sequence has been found in input buffer
125129 for idx == - 1 {
126- // increase search offset to reduce iterations on buffer when looping
130+ // Increase search offset to reduce iterations on buffer when looping
127131 newOffset := r .inBuffer .Len () - len (r .nl )
128132 if newOffset > r .inOffset {
129133 r .inOffset = newOffset
130134 }
131135
132136 buf := make ([]byte , r .bufferSize )
133137
134- // try to read more bytes into buffer
138+ // Try to read more bytes into buffer
135139 n , err := r .reader .Read (buf )
136140
137141 // Appends buffer also in case of err
@@ -140,16 +144,39 @@ func (r *LineReader) advance() error {
140144 return err
141145 }
142146
143- // empty read => return buffer error (more bytes required error)
147+ // Empty read => return buffer error (more bytes required error)
144148 if n == 0 {
145149 return streambuf .ErrNoMoreBytes
146150 }
147151
148152 // Check if buffer has newLine character
149153 idx = r .inBuffer .IndexFrom (r .inOffset , r .nl )
154+
155+ // If max bytes limit per line is set, then drop the lines that are longer
156+ if r .maxBytes != 0 {
157+ // If newLine is found, drop the lines longer than maxBytes
158+ for idx != - 1 && idx > r .maxBytes {
159+ r .logger .Warnf ("Exceeded %d max bytes in line limit, skipped %d bytes line" , r .maxBytes , idx )
160+ err = r .inBuffer .Advance (idx + len (r .nl ))
161+ r .inBuffer .Reset ()
162+ r .inOffset = 0
163+ idx = r .inBuffer .IndexFrom (r .inOffset , r .nl )
164+ }
165+
166+ // If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
167+ if idx == - 1 && r .inBuffer .Len () > r .maxBytes {
168+ skipped , err := r .skipUntilNewLine (buf )
169+ if err != nil {
170+ r .logger .Error ("Error skipping until new line, err:" , err )
171+ return err
172+ }
173+ r .logger .Warnf ("Exceeded %d max bytes in line limit, skipped %d bytes line" , r .maxBytes , skipped )
174+ idx = r .inBuffer .IndexFrom (r .inOffset , r .nl )
175+ }
176+ }
150177 }
151178
152- // found encoded byte sequence for newline in buffer
179+ // Found encoded byte sequence for newline in buffer
153180 // -> decode input sequence into outBuffer
154181 sz , err := r .decode (idx + len (r .nl ))
155182 if err != nil {
@@ -158,20 +185,63 @@ func (r *LineReader) advance() error {
158185 sz = idx + len (r .nl )
159186 }
160187
161- // consume transformed bytes from input buffer
188+ // Consume transformed bytes from input buffer
162189 err = r .inBuffer .Advance (sz )
163190 r .inBuffer .Reset ()
164191
165- // continue scanning input buffer from last position + 1
192+ // Continue scanning input buffer from last position + 1
166193 r .inOffset = idx + 1 - sz
167194 if r .inOffset < 0 {
168- // fix inOffset if newline has encoding > 8bits + firl line has been decoded
195+ // Fix inOffset if newline has encoding > 8bits + firl line has been decoded
169196 r .inOffset = 0
170197 }
171198
172199 return err
173200}
174201
202+ func (r * LineReader ) skipUntilNewLine (buf []byte ) (int , error ) {
203+ // The length of the line skipped
204+ skipped := r .inBuffer .Len ()
205+
206+ // Clean up the buffer
207+ err := r .inBuffer .Advance (skipped )
208+ r .inBuffer .Reset ()
209+
210+ // Reset inOffset
211+ r .inOffset = 0
212+
213+ if err != nil {
214+ return 0 , err
215+ }
216+
217+ // Read until the new line is found
218+ for idx := - 1 ; idx == - 1 ; {
219+ n , err := r .reader .Read (buf )
220+
221+ // Check bytes read for newLine
222+ if n > 0 {
223+ idx = bytes .Index (buf [:n ], r .nl )
224+
225+ if idx != - 1 {
226+ r .inBuffer .Append (buf [idx + len (r .nl ) : n ])
227+ skipped += idx
228+ } else {
229+ skipped += n
230+ }
231+ }
232+
233+ if err != nil {
234+ return skipped , err
235+ }
236+
237+ if n == 0 {
238+ return skipped , streambuf .ErrNoMoreBytes
239+ }
240+ }
241+
242+ return skipped , nil
243+ }
244+
175245func (r * LineReader ) decode (end int ) (int , error ) {
176246 var err error
177247 buffer := make ([]byte , 1024 )
0 commit comments