feat: table overwrite functionality#674
Conversation
| metadata: meta, | ||
| fs: fs, | ||
| projectedSchema: t.meta.CurrentSchema(), | ||
| boundRowFilter: boundFilter, | ||
| caseSensitive: true, | ||
| rowLimit: -1, // No limit | ||
| concurrency: 1, | ||
| } |
There was a problem hiding this comment.
I don't know if this is correct
|
I'll give this a look to start reviewing, in the meantime can you fix the lint issues? |
|
This requires the replacing table to match the schema of the replaced table, right? Asking since I'm interested in something like |
Yes |
|
The spark integration tests for overwrite don't seem to be passing |
I definitely didn't do those correctly, any guidance on whether they're structured fine or if I'm going about it the right way? I couldn't figure out testing it locally |
|
I'll try to find some time next week to take a look at the spark tests and see if I can figure it out. |
92c5269 to
2ffd1bb
Compare
zeroshade
left a comment
There was a problem hiding this comment.
Gave this a bit more of a closer review, let me know what you think
zeroshade
left a comment
There was a problem hiding this comment.
Overall this looks like a good first pass at this, there's some possible optimizations I can think of but we can do that in a second pass.
I have one significant comment but otherwise everything looks great! Thanks!
| // OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then committing the transaction. | ||
| // The batchSize parameter refers to the batch size for reading the input data, not the batch size for writes. | ||
| // The concurrency parameter controls the level of parallelism. If concurrency <= 0, defaults to runtime.GOMAXPROCS(0). | ||
| func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int, snapshotProps iceberg.Properties) (*Table, error) { |
There was a problem hiding this comment.
Update the docstring to explain what the filter actually means (i.e. how does it affect the overwrite), add the same to the function below please
table/transaction.go
Outdated
| return nil, nil | ||
| } | ||
|
|
||
| arrowSchema, err := SchemaToArrowSchema(t.meta.CurrentSchema(), nil, false, false) |
There was a problem hiding this comment.
the first return value from scanner.GetRecords is the arrow schema, so we shouldn't need to do this again here, right?
table/transaction.go
Outdated
| table := array.NewTableFromRecords(arrowSchema, records) | ||
| defer table.Release() | ||
|
|
||
| rdr := array.NewTableReader(table, table.NumRows()) | ||
| defer rdr.Release() | ||
|
|
||
| var result []iceberg.DataFile | ||
| itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ | ||
| sc: rdr.Schema(), | ||
| itr: array.IterFromReader(rdr), | ||
| fs: fs.(io.WriteFileIO), | ||
| writeUUID: &commitUUID, | ||
| }) |
There was a problem hiding this comment.
you received a record iterator from scanner.GetRecords, why create a table from it, and a reader, instead of just passing in the same iterator that you got back from scanner.GetRecords?
i.e. something like:
sc, iter, err := scanner.GetRecords(ctx, []FileScanTask{*scanTask})
if err != nil { /* .... */ }
dfIter := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{
sc: sc,
itr: iter,
fs: fs.(io.WriteFileIO),
writeUUID: &commitUUID,
})In theory if there are no records, you should just get an empty list of data files at the end, right?
There was a problem hiding this comment.
Oops. Big miss by me here 😅, thanks for pointing it out
zeroshade
left a comment
There was a problem hiding this comment.
Looks great to me, there's some optimizations we can look into for follow-ups. but this is good for a first pass at it. Thanks!
 This makes a few tweaks to the API for the [recently added](#674) `Overwrite` API to favor options for parameters that have a reasonable sensible default and migrates them to `OverwriteOptions`. Technically, it also fixes an issue that was introduced where the caseSensitive parameter was just ignored and hardcoded to `true` in the functions that support the `Overwrite` functionality but this wasn't the primary motivation on my end. I just happened to notice it as I was refactoring.
 This adds support for performing delete operations on a table with the addition of `Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts ...DeleteOption)` on both transaction and table (as with the other methods, the table version just wraps creating a new transaction, doing the thing and then committing it). I did some refactoring to reuse the deletion code from the overwrite implementation and also updated the integration tests to make it easier to add validation sql statements. I hope people agree that adding a test function in `validation.py` that wraps a spark SQL statement to perform the validation is less than ideal. To keep the validation SQL closer to the test logic, I refactored `validation.py` to have a `sql` argument which allows the integration test to run the validation script and pass it the SQL needed to get the output used for the assertions. The tradeoff is that some of the validations were running two sql statements which makes for two spark sql round trips now instead of a single one. However, for most of them, the validation was redundant (doing a count and _then_ getting the full rows which implicitly also shows the number of rows) so only one test is ending up running two sql statements. Thanks to @dontirun for contributing the overwrite support in #674 which set the foundation to make this very easy since the `copy-on-write` deletion is essentially a subset of the overwrite with filter.
Resolves #668
There are quite a few things I'm not too sure about with this implementation (like the filtering logic or if I've properly tested it)