Skip to content

Commit 6502f0e

Browse files
authored
GH-43790: [Go][Parquet] Add support for LZ4_RAW compression codec (#43835)
### Rationale for this change Fixes: #43790 The LZ4 compression codec for Parquet is no longer ambiguous, as it has been superceded by the [LZ4_RAW](https://github.com/apache/parquet-format/blob/master/Compression.md#lz4_raw) spec. ### What changes are included in this PR? - Add `LZ4Raw` compression codec - Split out `StreamingCodec` methods from core `Codec` interface - Various conformance/roundtrip tests - Set of benchmarks for reading/writing an Arrow table to/from Parquet, using each compression codec ### Are these changes tested? Yes ### Are there any user-facing changes? - New codec `LZ4Raw` is available - `Codec` interface no long provides the following methods, which are now part of `StreamingCodec`: - `NewReader` - `NewWriter` - `NewWriterLevel` * GitHub Issue: #43790 Authored-by: Joel Lubinitsky <joellubi@gmail.com> Signed-off-by: Joel Lubinitsky <joellubi@gmail.com>
1 parent b836662 commit 6502f0e

6 files changed

Lines changed: 380 additions & 12 deletions

File tree

go/parquet/compress/compress.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ var Codecs = struct {
4949
Brotli Compression
5050
// LZ4 unsupported in this library due to problematic issues between the Hadoop LZ4 spec vs regular lz4
5151
// see: http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3CCAAri41v24xuA8MGHLDvgSnE+7AAgOhiEukemW_oPNHMvfMmrWw@mail.gmail.com%3E
52-
Lz4 Compression
53-
Zstd Compression
52+
Lz4 Compression
53+
Zstd Compression
54+
Lz4Raw Compression
5455
}{
5556
Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
5657
Snappy: Compression(parquet.CompressionCodec_SNAPPY),
@@ -59,17 +60,12 @@ var Codecs = struct {
5960
Brotli: Compression(parquet.CompressionCodec_BROTLI),
6061
Lz4: Compression(parquet.CompressionCodec_LZ4),
6162
Zstd: Compression(parquet.CompressionCodec_ZSTD),
63+
Lz4Raw: Compression(parquet.CompressionCodec_LZ4_RAW),
6264
}
6365

6466
// Codec is an interface which is implemented for each compression type in order to make the interactions easy to
6567
// implement. Most consumers won't be calling GetCodec directly.
6668
type Codec interface {
67-
// NewReader provides a reader that wraps a stream with compressed data to stream the uncompressed data
68-
NewReader(io.Reader) io.ReadCloser
69-
// NewWriter provides a wrapper around a write stream to compress data before writing it.
70-
NewWriter(io.Writer) io.WriteCloser
71-
// NewWriterLevel is like NewWriter but allows specifying the compression level
72-
NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
7369
// Encode encodes a block of data given by src and returns the compressed block. dst should be either nil
7470
// or sized large enough to fit the compressed block (use CompressBound to allocate). dst and src should not
7571
// overlap since some of the compression types don't allow it.
@@ -90,6 +86,16 @@ type Codec interface {
9086
Decode(dst, src []byte) []byte
9187
}
9288

89+
// StreamingCodec is an interface that may be implemented for compression codecs that expose a streaming API.
90+
type StreamingCodec interface {
91+
// NewReader provides a reader that wraps a stream with compressed data to stream the uncompressed data
92+
NewReader(io.Reader) io.ReadCloser
93+
// NewWriter provides a wrapper around a write stream to compress data before writing it.
94+
NewWriter(io.Writer) io.WriteCloser
95+
// NewWriterLevel is like NewWriter but allows specifying the compression level
96+
NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
97+
}
98+
9399
var codecs = map[Compression]Codec{}
94100

95101
// RegisterCodec adds or overrides a codec implementation for a given compression algorithm.

go/parquet/compress/compress_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ func TestCompressDataOneShot(t *testing.T) {
6666
{compress.Codecs.Gzip},
6767
{compress.Codecs.Brotli},
6868
{compress.Codecs.Zstd},
69+
{compress.Codecs.Lz4Raw},
6970
// {compress.Codecs.Lzo},
70-
// {compress.Codecs.Lz4},
7171
}
7272

7373
for _, tt := range tests {
@@ -107,9 +107,11 @@ func TestCompressReaderWriter(t *testing.T) {
107107
var buf bytes.Buffer
108108
codec, err := compress.GetCodec(tt.c)
109109
assert.NoError(t, err)
110+
streamingCodec, ok := codec.(compress.StreamingCodec)
111+
assert.True(t, ok)
110112
data := makeRandomData(RandomDataSize)
111113

112-
wr := codec.NewWriter(&buf)
114+
wr := streamingCodec.NewWriter(&buf)
113115

114116
const chunkSize = 1111
115117
input := data
@@ -129,7 +131,7 @@ func TestCompressReaderWriter(t *testing.T) {
129131
}
130132
wr.Close()
131133

132-
rdr := codec.NewReader(&buf)
134+
rdr := streamingCodec.NewReader(&buf)
133135
out, err := io.ReadAll(rdr)
134136
assert.NoError(t, err)
135137
assert.Exactly(t, data, out)

go/parquet/compress/lz4_raw.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package compress
18+
19+
import (
20+
"sync"
21+
22+
"github.com/pierrec/lz4/v4"
23+
)
24+
25+
// lz4.Compressor is not goroutine-safe, so we use a pool to amortize the cost
26+
// of allocating a new one for each call to Encode().
27+
var compressorPool = sync.Pool{New: func() interface{} { return new(lz4.Compressor) }}
28+
29+
func compressBlock(src, dst []byte) (int, error) {
30+
c := compressorPool.Get().(*lz4.Compressor)
31+
defer compressorPool.Put(c)
32+
return c.CompressBlock(src, dst)
33+
}
34+
35+
type lz4RawCodec struct{}
36+
37+
func (c lz4RawCodec) Encode(dst, src []byte) []byte {
38+
n, err := compressBlock(src, dst[:cap(dst)])
39+
if err != nil {
40+
panic(err)
41+
}
42+
43+
return dst[:n]
44+
}
45+
46+
func (c lz4RawCodec) EncodeLevel(dst, src []byte, _ int) []byte {
47+
// the lz4 block implementation does not allow level to be set
48+
return c.Encode(dst, src)
49+
}
50+
51+
func (lz4RawCodec) Decode(dst, src []byte) []byte {
52+
n, err := lz4.UncompressBlock(src, dst)
53+
if err != nil {
54+
panic(err)
55+
}
56+
57+
return dst[:n]
58+
}
59+
60+
func (c lz4RawCodec) CompressBound(len int64) int64 {
61+
return int64(lz4.CompressBlockBound(int(len)))
62+
}
63+
64+
func init() {
65+
RegisterCodec(Codecs.Lz4Raw, lz4RawCodec{})
66+
}

go/parquet/file/file_reader_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,3 +644,130 @@ func TestDeltaBinaryPackedMultipleBatches(t *testing.T) {
644644

645645
require.Equalf(t, size, totalRows, "Expected %d rows, but got %d rows", size, totalRows)
646646
}
647+
648+
// Test read file lz4_raw_compressed.parquet
649+
// Contents documented at https://github.com/apache/parquet-testing/commit/ddd898958803cb89b7156c6350584d1cda0fe8de
650+
func TestLZ4RawFileRead(t *testing.T) {
651+
dir := os.Getenv("PARQUET_TEST_DATA")
652+
if dir == "" {
653+
t.Skip("no path supplied with PARQUET_TEST_DATA")
654+
}
655+
require.DirExists(t, dir)
656+
657+
props := parquet.NewReaderProperties(memory.DefaultAllocator)
658+
fileReader, err := file.OpenParquetFile(path.Join(dir, "lz4_raw_compressed.parquet"),
659+
false, file.WithReadProps(props))
660+
require.NoError(t, err)
661+
defer fileReader.Close()
662+
663+
nRows := 4
664+
nCols := 3
665+
require.Equal(t, 1, fileReader.NumRowGroups())
666+
rgr := fileReader.RowGroup(0)
667+
require.EqualValues(t, nRows, rgr.NumRows())
668+
require.EqualValues(t, nCols, rgr.NumColumns())
669+
670+
rdr, err := rgr.Column(0)
671+
require.NoError(t, err)
672+
673+
rowsInt64, ok := rdr.(*file.Int64ColumnChunkReader)
674+
require.True(t, ok)
675+
676+
valsInt64 := make([]int64, nRows)
677+
total, read, err := rowsInt64.ReadBatch(int64(nRows), valsInt64, nil, nil)
678+
require.NoError(t, err)
679+
require.Equal(t, int64(nRows), total)
680+
require.Equal(t, nRows, read)
681+
682+
expectedValsInt64 := []int64{
683+
1593604800,
684+
1593604800,
685+
1593604801,
686+
1593604801,
687+
}
688+
require.Equal(t, expectedValsInt64, valsInt64)
689+
690+
rdr, err = rgr.Column(1)
691+
require.NoError(t, err)
692+
693+
rowsByteArray, ok := rdr.(*file.ByteArrayColumnChunkReader)
694+
require.True(t, ok)
695+
696+
valsByteArray := make([]parquet.ByteArray, nRows)
697+
total, read, err = rowsByteArray.ReadBatch(int64(nRows), valsByteArray, nil, nil)
698+
require.NoError(t, err)
699+
require.Equal(t, int64(nRows), total)
700+
require.Equal(t, nRows, read)
701+
702+
expectedValsByteArray := []parquet.ByteArray{
703+
[]byte("abc"),
704+
[]byte("def"),
705+
[]byte("abc"),
706+
[]byte("def"),
707+
}
708+
require.Equal(t, expectedValsByteArray, valsByteArray)
709+
710+
rdr, err = rgr.Column(2)
711+
require.NoError(t, err)
712+
713+
rowsFloat64, ok := rdr.(*file.Float64ColumnChunkReader)
714+
require.True(t, ok)
715+
716+
valsFloat64 := make([]float64, nRows)
717+
total, read, err = rowsFloat64.ReadBatch(int64(nRows), valsFloat64, nil, nil)
718+
require.NoError(t, err)
719+
require.Equal(t, int64(nRows), total)
720+
require.Equal(t, nRows, read)
721+
722+
expectedValsFloat64 := []float64{
723+
42.0,
724+
7.7,
725+
42.125,
726+
7.7,
727+
}
728+
require.Equal(t, expectedValsFloat64, valsFloat64)
729+
}
730+
731+
// Test read file lz4_raw_compressed_larger.parquet
732+
// Contents documented at https://github.com/apache/parquet-testing/commit/ddd898958803cb89b7156c6350584d1cda0fe8de
733+
func TestLZ4RawLargerFileRead(t *testing.T) {
734+
dir := os.Getenv("PARQUET_TEST_DATA")
735+
if dir == "" {
736+
t.Skip("no path supplied with PARQUET_TEST_DATA")
737+
}
738+
require.DirExists(t, dir)
739+
740+
props := parquet.NewReaderProperties(memory.DefaultAllocator)
741+
fileReader, err := file.OpenParquetFile(path.Join(dir, "lz4_raw_compressed_larger.parquet"),
742+
false, file.WithReadProps(props))
743+
require.NoError(t, err)
744+
defer fileReader.Close()
745+
746+
nRows := 10000
747+
nCols := 1
748+
require.Equal(t, 1, fileReader.NumRowGroups())
749+
rgr := fileReader.RowGroup(0)
750+
require.EqualValues(t, nRows, rgr.NumRows())
751+
require.EqualValues(t, nCols, rgr.NumColumns())
752+
753+
rdr, err := rgr.Column(0)
754+
require.NoError(t, err)
755+
756+
rows, ok := rdr.(*file.ByteArrayColumnChunkReader)
757+
require.True(t, ok)
758+
759+
vals := make([]parquet.ByteArray, nRows)
760+
total, read, err := rows.ReadBatch(int64(nRows), vals, nil, nil)
761+
require.NoError(t, err)
762+
require.Equal(t, int64(nRows), total)
763+
require.Equal(t, nRows, read)
764+
765+
expectedValsHead := []parquet.ByteArray{
766+
[]byte("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b"),
767+
[]byte("e8fb9197-cb9f-4118-b67f-fbfa65f61843"),
768+
[]byte("885136e1-0aa1-4fdb-8847-63d87b07c205"),
769+
[]byte("ce7b2019-8ebe-4906-a74d-0afa2409e5df"),
770+
[]byte("a9ee2527-821b-4b71-a926-03f73c3fc8b7"),
771+
}
772+
require.Equal(t, expectedValsHead, vals[:len(expectedValsHead)])
773+
}

go/parquet/file/file_writer_test.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (t *SerializeTestSuite) TestSmallFile() {
260260
compress.Codecs.Brotli,
261261
compress.Codecs.Gzip,
262262
compress.Codecs.Zstd,
263-
// compress.Codecs.Lz4,
263+
compress.Codecs.Lz4Raw,
264264
// compress.Codecs.Lzo,
265265
}
266266
for _, c := range codecs {
@@ -540,3 +540,59 @@ func TestBatchedByteStreamSplitFileRoundtrip(t *testing.T) {
540540

541541
require.NoError(t, rdr.Close())
542542
}
543+
544+
func TestLZ4RawFileRoundtrip(t *testing.T) {
545+
input := []int64{
546+
-1, 0, 1, 2, 3, 4, 5, 123456789, -123456789,
547+
}
548+
549+
size := len(input)
550+
551+
field, err := schema.NewPrimitiveNodeLogical("int64", parquet.Repetitions.Required, nil, parquet.Types.Int64, 0, 1)
552+
require.NoError(t, err)
553+
554+
schema, err := schema.NewGroupNode("test", parquet.Repetitions.Required, schema.FieldList{field}, 0)
555+
require.NoError(t, err)
556+
557+
sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
558+
writer := file.NewParquetWriter(sink, schema, file.WithWriterProps(parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Lz4Raw))))
559+
560+
rgw := writer.AppendRowGroup()
561+
cw, err := rgw.NextColumn()
562+
require.NoError(t, err)
563+
564+
i64ColumnWriter, ok := cw.(*file.Int64ColumnChunkWriter)
565+
require.True(t, ok)
566+
567+
nVals, err := i64ColumnWriter.WriteBatch(input, nil, nil)
568+
require.NoError(t, err)
569+
require.EqualValues(t, size, nVals)
570+
571+
require.NoError(t, cw.Close())
572+
require.NoError(t, rgw.Close())
573+
require.NoError(t, writer.Close())
574+
575+
rdr, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
576+
require.NoError(t, err)
577+
578+
require.Equal(t, 1, rdr.NumRowGroups())
579+
require.EqualValues(t, size, rdr.NumRows())
580+
581+
rgr := rdr.RowGroup(0)
582+
cr, err := rgr.Column(0)
583+
require.NoError(t, err)
584+
585+
i64ColumnReader, ok := cr.(*file.Int64ColumnChunkReader)
586+
require.True(t, ok)
587+
588+
output := make([]int64, size)
589+
590+
total, valuesRead, err := i64ColumnReader.ReadBatch(int64(size), output, nil, nil)
591+
require.NoError(t, err)
592+
require.EqualValues(t, size, total)
593+
require.EqualValues(t, size, valuesRead)
594+
595+
require.Equal(t, input, output)
596+
597+
require.NoError(t, rdr.Close())
598+
}

0 commit comments

Comments
 (0)