Skip to content

Commit a3b447a

Browse files
authored
[libbeat] Fix encoding and file offset issues in the disk queue (#26484)
1 parent 1c9a488 commit a3b447a

7 files changed

Lines changed: 132 additions & 36 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
236236
- Fix ILM alias creation when write alias exists and initial index does not exist {pull}26143[26143]
237237
- Omit full index template from errors that occur while loading the template. {pull}25743[25743]
238238
- In the script processor, the `decode_xml` and `decode_xml_wineventlog` processors are now available as `DecodeXML` and `DecodeXMLWineventlog` respectively.
239+
- Fix encoding errors when using the disk queue on nested data with multi-byte characters {pull}26484[26484]
239240

240241
*Auditbeat*
241242

libbeat/publisher/queue/diskqueue/core_loop_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {
235235

236236
// Write to one segment (no segments should be moved to reading list)
237237
dq.handleWriterLoopResponse(writerLoopResponse{
238-
segments: []writerLoopResponseSegment{
238+
segments: []writerLoopSegmentResponse{
239239
{bytesWritten: 100},
240240
},
241241
})
@@ -250,7 +250,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {
250250

251251
// Write to two segments (the first one should be moved to reading list)
252252
dq.handleWriterLoopResponse(writerLoopResponse{
253-
segments: []writerLoopResponseSegment{
253+
segments: []writerLoopSegmentResponse{
254254
{bytesWritten: 100},
255255
{bytesWritten: 100},
256256
},
@@ -270,7 +270,7 @@ func TestHandleWriterLoopResponse(t *testing.T) {
270270

271271
// Write to three segments (the first two should be moved to reading list)
272272
dq.handleWriterLoopResponse(writerLoopResponse{
273-
segments: []writerLoopResponseSegment{
273+
segments: []writerLoopSegmentResponse{
274274
{bytesWritten: 100},
275275
{bytesWritten: 100},
276276
{bytesWritten: 500},

libbeat/publisher/queue/diskqueue/reader_loop.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package diskqueue
2020
import (
2121
"encoding/binary"
2222
"fmt"
23+
"io"
2324
"os"
2425
)
2526

@@ -100,13 +101,12 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon
100101

101102
// Open the file and seek to the starting position.
102103
handle, err := request.segment.getReader(rl.settings)
104+
rl.decoder.useJSON = request.segment.shouldUseJSON()
103105
if err != nil {
104106
return readerLoopResponse{err: err}
105107
}
106108
defer handle.Close()
107-
// getReader positions us at the start of the data region, so we use
108-
// a relative seek to advance to the request position.
109-
_, err = handle.Seek(int64(request.startPosition), os.SEEK_CUR)
109+
_, err = handle.Seek(int64(request.startPosition), io.SeekStart)
110110
if err != nil {
111111
return readerLoopResponse{err: err}
112112
}
@@ -179,58 +179,58 @@ func (rl *readerLoop) nextFrame(
179179
// Ensure we are allowed to read the frame header.
180180
if maxLength < frameHeaderSize {
181181
return nil, fmt.Errorf(
182-
"Can't read next frame: remaining length %d is too low", maxLength)
182+
"can't read next frame: remaining length %d is too low", maxLength)
183183
}
184184
// Wrap the handle to retry non-fatal errors and always return the full
185185
// requested data length if possible.
186186
reader := autoRetryReader{handle}
187187
var frameLength uint32
188188
err := binary.Read(reader, binary.LittleEndian, &frameLength)
189189
if err != nil {
190-
return nil, fmt.Errorf("Couldn't read data frame header: %w", err)
190+
return nil, fmt.Errorf("couldn't read data frame header: %w", err)
191191
}
192192

193193
// If the frame extends past the area we were told to read, return an error.
194194
// This should never happen unless the segment file is corrupted.
195195
if maxLength < uint64(frameLength) {
196196
return nil, fmt.Errorf(
197-
"Can't read next frame: frame size is %d but remaining data is only %d",
197+
"can't read next frame: frame size is %d but remaining data is only %d",
198198
frameLength, maxLength)
199199
}
200200
if frameLength <= frameMetadataSize {
201201
// Valid enqueued data must have positive length
202202
return nil, fmt.Errorf(
203-
"Data frame with no data (length %d)", frameLength)
203+
"data frame with no data (length %d)", frameLength)
204204
}
205205

206206
// Read the actual frame data
207207
dataLength := frameLength - frameMetadataSize
208208
bytes := rl.decoder.Buffer(int(dataLength))
209209
_, err = reader.Read(bytes)
210210
if err != nil {
211-
return nil, fmt.Errorf("Couldn't read data frame content: %w", err)
211+
return nil, fmt.Errorf("couldn't read data frame content: %w", err)
212212
}
213213

214214
// Read the footer (checksum + duplicate length)
215215
var checksum uint32
216216
err = binary.Read(reader, binary.LittleEndian, &checksum)
217217
if err != nil {
218-
return nil, fmt.Errorf("Couldn't read data frame checksum: %w", err)
218+
return nil, fmt.Errorf("couldn't read data frame checksum: %w", err)
219219
}
220220
expected := computeChecksum(bytes)
221221
if checksum != expected {
222222
return nil, fmt.Errorf(
223-
"Data frame checksum mismatch (%x != %x)", checksum, expected)
223+
"data frame checksum mismatch (%x != %x)", checksum, expected)
224224
}
225225

226226
var duplicateLength uint32
227227
err = binary.Read(reader, binary.LittleEndian, &duplicateLength)
228228
if err != nil {
229-
return nil, fmt.Errorf("Couldn't read data frame footer: %w", err)
229+
return nil, fmt.Errorf("couldn't read data frame footer: %w", err)
230230
}
231231
if duplicateLength != frameLength {
232232
return nil, fmt.Errorf(
233-
"Inconsistent data frame length (%d vs %d)",
233+
"inconsistent data frame length (%d vs %d)",
234234
frameLength, duplicateLength)
235235
}
236236

@@ -242,7 +242,7 @@ func (rl *readerLoop) nextFrame(
242242
// TODO: Rather than pass this error back to the read request, which
243243
// discards the rest of the segment, we should just log the error and
244244
// advance to the next frame, which is likely still valid.
245-
return nil, fmt.Errorf("Couldn't decode data frame: %w", err)
245+
return nil, fmt.Errorf("couldn't decode data frame: %w", err)
246246
}
247247

248248
frame := &readFrame{

libbeat/publisher/queue/diskqueue/segments.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ type queueSegment struct {
9191
// If this segment was loaded from a previous session, schemaVersion
9292
// points to the file schema version that was read from its header.
9393
// This is only used by queueSegment.headerSize(), which is used in
94-
// maybeReadPending to calculate the position of the first data frame.
94+
// maybeReadPending to calculate the position of the first data frame,
95+
// and by queueSegment.shouldUseJSON(), which is used in the reader
96+
// loop to detect old segments that used JSON encoding instead of
97+
// the current CBOR.
9598
schemaVersion *uint32
9699

97100
// The number of bytes occupied by this segment on-disk, as of the most
@@ -198,6 +201,14 @@ func (segment *queueSegment) headerSize() uint64 {
198201
return segmentHeaderSize
199202
}
200203

204+
// The initial release of the disk queue used JSON to encode events
205+
// on disk. Since then, we have switched to CBOR to address issues
206+
// with encoding multi-byte characters, and for lower encoding
207+
// overhead.
208+
func (segment *queueSegment) shouldUseJSON() bool {
209+
return segment.schemaVersion != nil && *segment.schemaVersion == 0
210+
}
211+
201212
// Should only be called from the reader loop. If successful, returns an open
202213
// file handle positioned at the beginning of the segment's data region.
203214
func (segment *queueSegment) getReader(
@@ -207,14 +218,14 @@ func (segment *queueSegment) getReader(
207218
file, err := os.Open(path)
208219
if err != nil {
209220
return nil, fmt.Errorf(
210-
"Couldn't open segment %d: %w", segment.id, err)
221+
"couldn't open segment %d: %w", segment.id, err)
211222
}
212223
// We don't need the header contents here, we just want to advance past the
213224
// header region, so discard the return value.
214225
_, err = readSegmentHeader(file)
215226
if err != nil {
216227
file.Close()
217-
return nil, fmt.Errorf("Couldn't read segment header: %w", err)
228+
return nil, fmt.Errorf("couldn't read segment header: %w", err)
218229
}
219230

220231
return file, nil
@@ -231,7 +242,7 @@ func (segment *queueSegment) getWriter(
231242
}
232243
err = writeSegmentHeader(file, 0)
233244
if err != nil {
234-
return nil, fmt.Errorf("Couldn't write segment header: %w", err)
245+
return nil, fmt.Errorf("couldn't write segment header: %w", err)
235246
}
236247

237248
return file, nil
@@ -306,7 +317,7 @@ func readSegmentHeaderWithFrameCount(path string) (*segmentHeader, error) {
306317
// the current frame to make sure the trailing length matches before
307318
// advancing to the next frame (otherwise we might accept an impossible
308319
// length).
309-
_, err = file.Seek(int64(frameLength-8), os.SEEK_CUR)
320+
_, err = file.Seek(int64(frameLength-8), io.SeekCurrent)
310321
if err != nil {
311322
break
312323
}
@@ -341,7 +352,7 @@ func readSegmentHeader(in io.Reader) (*segmentHeader, error) {
341352
return nil, err
342353
}
343354
if header.version > currentSegmentVersion {
344-
return nil, fmt.Errorf("Unrecognized schema version %d", header.version)
355+
return nil, fmt.Errorf("unrecognized schema version %d", header.version)
345356
}
346357
if header.version >= 1 {
347358
err = binary.Read(in, binary.LittleEndian, &header.frameCount)

libbeat/publisher/queue/diskqueue/serialize.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/elastic/beats/v7/libbeat/common"
2929
"github.com/elastic/beats/v7/libbeat/outputs/codec"
3030
"github.com/elastic/beats/v7/libbeat/publisher"
31+
"github.com/elastic/go-structform/cborl"
3132
"github.com/elastic/go-structform/gotype"
3233
"github.com/elastic/go-structform/json"
3334
)
@@ -40,7 +41,13 @@ type eventEncoder struct {
4041
type eventDecoder struct {
4142
buf []byte
4243

43-
parser *json.Parser
44+
jsonParser *json.Parser
45+
cborlParser *cborl.Parser
46+
47+
// Current serialization all uses CBOR. Set this flag when decoding
48+
// from old (schema 0) segment files generated by the disk queue beta.
49+
useJSON bool
50+
4451
unfolder *gotype.Unfolder
4552
}
4653

@@ -60,7 +67,7 @@ func newEventEncoder() *eventEncoder {
6067
func (e *eventEncoder) reset() {
6168
e.folder = nil
6269

63-
visitor := json.NewVisitor(&e.buf)
70+
visitor := cborl.NewVisitor(&e.buf)
6471
// This can't return an error: NewIterator is deterministic based on its
6572
// input, and doesn't return an error when called with valid options. In
6673
// this case the options are hard-coded to fixed values, so they are
@@ -109,7 +116,8 @@ func (d *eventDecoder) reset() {
109116
unfolder, _ := gotype.NewUnfolder(nil)
110117

111118
d.unfolder = unfolder
112-
d.parser = json.NewParser(unfolder)
119+
d.jsonParser = json.NewParser(unfolder)
120+
d.cborlParser = cborl.NewParser(unfolder)
113121
}
114122

115123
// Buffer prepares the read buffer to hold the next event of n bytes.
@@ -131,7 +139,11 @@ func (d *eventDecoder) Decode() (publisher.Event, error) {
131139
d.unfolder.SetTarget(&to)
132140
defer d.unfolder.Reset()
133141

134-
err = d.parser.Parse(d.buf)
142+
if d.useJSON {
143+
err = d.jsonParser.Parse(d.buf)
144+
} else {
145+
err = d.cborlParser.Parse(d.buf)
146+
}
135147

136148
if err != nil {
137149
d.reset() // reset parser just in case
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package diskqueue
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
25+
"github.com/elastic/beats/v7/libbeat/beat"
26+
"github.com/elastic/beats/v7/libbeat/common"
27+
"github.com/elastic/beats/v7/libbeat/publisher"
28+
)
29+
30+
// A test to make sure serialization works correctly on multi-byte characters.
31+
func TestSerialize(t *testing.T) {
32+
testCases := []struct {
33+
name string
34+
value string
35+
}{
36+
{name: "Ascii only", value: "{\"name\": \"Momotaro\"}"},
37+
{name: "Multi-byte", value: "{\"name\": \"桃太郎\"}"},
38+
}
39+
40+
for _, test := range testCases {
41+
encoder := newEventEncoder()
42+
event := publisher.Event{
43+
Content: beat.Event{
44+
Fields: common.MapStr{
45+
"test_field": test.value,
46+
},
47+
},
48+
}
49+
serialized, err := encoder.encode(&event)
50+
if err != nil {
51+
t.Fatalf("[%v] Couldn't encode event: %v", test.name, err)
52+
}
53+
54+
// Use decoder to decode the serialized bytes.
55+
decoder := newEventDecoder()
56+
buf := decoder.Buffer(len(serialized))
57+
copy(buf, serialized)
58+
decoded, err := decoder.Decode()
59+
if err != nil {
60+
t.Fatalf("[%v] Couldn't decode serialized data: %v", test.name, err)
61+
}
62+
63+
decodedValue, err := decoded.Content.Fields.GetValue("test_field")
64+
if err != nil {
65+
t.Fatalf("[%v] Couldn't get field 'test_field': %v", test.name, err)
66+
}
67+
assert.Equal(t, test.value, decodedValue)
68+
}
69+
}

0 commit comments

Comments
 (0)