1818package readjson
1919
2020import (
21+ "io"
2122 "testing"
2223 "time"
2324
@@ -53,7 +54,7 @@ func TestDockerJSON(t *testing.T) {
5354 name : "Wrong JSON" ,
5455 input : [][]byte {[]byte (`this is not JSON` )},
5556 stream : "all" ,
56- expectedError : reader . ErrLineUnparsable ,
57+ expectedError : io . EOF ,
5758 expectedMessage : reader.Message {
5859 Bytes : 16 ,
5960 },
@@ -73,7 +74,7 @@ func TestDockerJSON(t *testing.T) {
7374 name : "Wrong CRI" ,
7475 input : [][]byte {[]byte (`2017-09-12T22:32:21.212861448Z stdout` )},
7576 stream : "all" ,
76- expectedError : reader . ErrLineUnparsable ,
77+ expectedError : io . EOF ,
7778 expectedMessage : reader.Message {
7879 Bytes : 37 ,
7980 },
@@ -82,7 +83,7 @@ func TestDockerJSON(t *testing.T) {
8283 name : "Wrong CRI" ,
8384 input : [][]byte {[]byte (`{this is not JSON nor CRI` )},
8485 stream : "all" ,
85- expectedError : reader . ErrLineUnparsable ,
86+ expectedError : io . EOF ,
8687 expectedMessage : reader.Message {
8788 Bytes : 25 ,
8889 },
@@ -91,7 +92,7 @@ func TestDockerJSON(t *testing.T) {
9192 name : "Missing time" ,
9293 input : [][]byte {[]byte (`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}` )},
9394 stream : "all" ,
94- expectedError : reader . ErrLineUnparsable ,
95+ expectedError : io . EOF ,
9596 expectedMessage : reader.Message {
9697 Bytes : 82 ,
9798 },
@@ -218,7 +219,7 @@ func TestDockerJSON(t *testing.T) {
218219 input : [][]byte {[]byte (`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}` )},
219220 stream : "all" ,
220221 format : "cri" ,
221- expectedError : reader . ErrLineUnparsable ,
222+ expectedError : io . EOF ,
222223 expectedMessage : reader.Message {
223224 Bytes : 82 ,
224225 },
@@ -228,7 +229,7 @@ func TestDockerJSON(t *testing.T) {
228229 input : [][]byte {[]byte (`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache` )},
229230 stream : "all" ,
230231 format : "docker" ,
231- expectedError : reader . ErrLineUnparsable ,
232+ expectedError : io . EOF ,
232233 expectedMessage : reader.Message {
233234 Bytes : 115 ,
234235 },
@@ -300,7 +301,7 @@ func TestDockerJSON(t *testing.T) {
300301 []byte (`{"log":"shutdown...\n","stream` ),
301302 },
302303 stream : "stdout" ,
303- expectedError : reader . ErrLineUnparsable ,
304+ expectedError : io . EOF ,
304305 expectedMessage : reader.Message {
305306 Bytes : 139 ,
306307 },
@@ -324,11 +325,25 @@ func TestDockerJSON(t *testing.T) {
324325 name : "Corrupted log message line" ,
325326 input : [][]byte {[]byte (`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}` )},
326327 stream : "all" ,
327- expectedError : reader . ErrLineUnparsable ,
328+ expectedError : io . EOF ,
328329 expectedMessage : reader.Message {
329330 Bytes : 97 ,
330331 },
331332 },
333+ {
334+ name : "Corrupted log message line is skipped, keep correct bytes count" ,
335+ input : [][]byte {
336+ []byte (`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}` ),
337+ []byte (`{"log":"1:M 09 Nov 13:27:36.276 # User requested","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}` ),
338+ },
339+ stream : "all" ,
340+ expectedMessage : reader.Message {
341+ Content : []byte ("1:M 09 Nov 13:27:36.276 # User requested" ),
342+ Fields : common.MapStr {"stream" : "stdout" },
343+ Ts : time .Date (2017 , 11 , 9 , 13 , 27 , 36 , 277747246 , time .UTC ),
344+ Bytes : 205 ,
345+ },
346+ },
332347 }
333348
334349 for _ , test := range tests {
@@ -358,6 +373,12 @@ type mockReader struct {
358373}
359374
360375func (m * mockReader ) Next () (reader.Message , error ) {
376+ if len (m .messages ) < 1 {
377+ return reader.Message {
378+ Content : []byte {},
379+ Bytes : 0 ,
380+ }, io .EOF
381+ }
361382 message := m .messages [0 ]
362383 m .messages = m .messages [1 :]
363384 return reader.Message {
0 commit comments