util/parquet: add support for arrays#101860
Conversation
3e218ba to
5ebf4cd
Compare
miretskiy
left a comment
There was a problem hiding this comment.
Reviewed 4 of 6 files at r1.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)
pkg/util/parquet/write_functions.go line 97 at r1 (raw file):
// // For more info on definition levels and repetition levels, refer to // https://arrow.apache.org/blog/2022/10/08/arrow-parquet-encoding-part-2/
Awesome.
pkg/util/parquet/write_functions.go line 110 at r1 (raw file):
d tree.Datum, w file.ColumnChunkWriter, a *batchAlloc, wFn writeFn, isArray bool, ) error { if isArray {
I feel like this function should have remained as two different functions: one for writing arrays,
and another one for writing regular datums. You can do the switch on isArray at the single call site.
Better, yet, can't we change colWriter so that the right function is invoked; regardless of
what kind of column it is?
Basically, when you create schema, you current assign colWriter to be equal to
elementCol.colWriter
result.colWriter = elementCol.colWriter
But why do that? Why not do something like:
result.colWriter = func(d tree.Datum, w file.ColumnChunkWriter, a *batchAlloc) error {
writeArray(...., elementCol.colWriter)
}
(that is: just wrap result column writer (which writes array), with a function that calls "writeArray" using correct column type writer (elementCol.col.Writer)?)
5ebf4cd to
adf8c1a
Compare
jayshrivastava
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/util/parquet/write_functions.go line 110 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I feel like this function should have remained as two different functions: one for writing arrays,
and another one for writing regular datums. You can do the switch on isArray at the single call site.Better, yet, can't we change
colWriterso that the right function is invoked; regardless of
what kind of column it is?
Basically, when you create schema, you current assign colWriter to be equal to
elementCol.colWriterresult.colWriter = elementCol.colWriterBut why do that? Why not do something like:
result.colWriter = func(d tree.Datum, w file.ColumnChunkWriter, a *batchAlloc) error { writeArray(...., elementCol.colWriter) }(that is: just wrap result column writer (which writes array), with a function that calls "writeArray" using correct column type writer (elementCol.col.Writer)?)
I think we run into a loop with your last point. writeArray needs to decide the repLevels and defLevels and call the writeFn. However, result.writeFn below calls writeArray.
result.writeFn = func(d tree.Datum, w file.ColumnChunkWriter, a *batchAlloc) error {
writeArray(...., elementCol.writeFn)
}
So then instead, we can keep the write function as is and only change writeDatumToColChunk.
result.writeFn = elementCol.writeFn
func (w *Writer) writeDatumToColChunk (d tree.Datum ...) {
if d.isArray() {
writeArray(col.writeFn)
} else {
writeScalar(col.writeFn)
}
}
But I ended up going with this. I thought it would be nicer:
Schema column now has a writeFn and writeInvoker. The invokers are writeScalar and writeArray. They figure out the levels and call writeFn. The writeFn encodes and writes bytes to files.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewed 1 of 3 files at r2.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava)
pkg/util/parquet/write_functions.go line 110 at r1 (raw file):
Previously, jayshrivastava (Jayant) wrote…
I think we run into a loop with your last point.
writeArrayneeds to decide therepLevelsanddefLevelsand call thewriteFn. However,result.writeFnbelow callswriteArray.result.writeFn = func(d tree.Datum, w file.ColumnChunkWriter, a *batchAlloc) error { writeArray(...., elementCol.writeFn) }So then instead, we can keep the write function as is and only change
writeDatumToColChunk.result.writeFn = elementCol.writeFn func (w *Writer) writeDatumToColChunk (d tree.Datum ...) { if d.isArray() { writeArray(col.writeFn) } else { writeScalar(col.writeFn) } }But I ended up going with this. I thought it would be nicer:
Schema column now has awriteFnandwriteInvoker. The invokers arewriteScalarandwriteArray. They figure out the levels and callwriteFn. ThewriteFnencodes and writes bytes to files.
I like this this much better;
I'm not too thrilled about the split with writeInvoker and writeFn in the element... Do you think something like this would make things a bit cleaner (it's okay if not and/or you disagree):
// writer is responsible for writing datum into provided column.
// (basically, this is your write invoker)...
type writer interface {
Write(d tree.Datum, w file.ColumnChunkWriter, a *batchAlloc) error
}
// arrayWriter -- responsible for writing array values.
// Note: it's just a typedef on writeFn -- meaning we can close over the type
// of the array value we are writing.
type arrayWriter writeFn
func (w arrayWriter) Write(d tree.Datum, cw file.ColumnChunkWriter, a *batchAlloc) error {
return writeArray(d, cw, a, writeFn(w))
}
// Similarly, scalarWriter just forwards to your writeScalar function
type scalarWriter writeFn
func (w scalarWriter) Write(d tree.Datum, cw file.ColumnChunkWriter, a *batchAlloc) error {
return writeScalar(d, cw, a, writeFn(w))
}
I think, with the above, you can just have a single "writer" in the schema struct, and just invoke it.
d3f2ad7 to
3cbca9e
Compare
jayshrivastava
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/util/parquet/write_functions.go line 110 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I like this this much better;
I'm not too thrilled about the split with writeInvoker and writeFn in the element... Do you think something like this would make things a bit cleaner (it's okay if not and/or you disagree):// writer is responsible for writing datum into provided column. // (basically, this is your write invoker)... type writer interface { Write(d tree.Datum, w file.ColumnChunkWriter, a *batchAlloc) error } // arrayWriter -- responsible for writing array values. // Note: it's just a typedef on writeFn -- meaning we can close over the type // of the array value we are writing. type arrayWriter writeFn func (w arrayWriter) Write(d tree.Datum, cw file.ColumnChunkWriter, a *batchAlloc) error { return writeArray(d, cw, a, writeFn(w)) } // Similarly, scalarWriter just forwards to your writeScalar function type scalarWriter writeFn func (w scalarWriter) Write(d tree.Datum, cw file.ColumnChunkWriter, a *batchAlloc) error { return writeScalar(d, cw, a, writeFn(w)) }I think, with the above, you can just have a single "writer" in the schema struct, and just invoke it.
Done.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained
This change extends and refactors the util/parquet library to be able to read and write arrays. Release note: None Informs: cockroachdb#99028 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
3cbca9e to
42b37b2
Compare
|
bors r=miretskiy |
|
Build failed: |
|
bors retry |
|
bors ping |
|
pong |
|
bors r=miretskiy |
|
Already running a review |
|
Build succeeded: |
This change extends and refactors the util/parquet library to be able to read and write arrays.
Release note: None
Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071