Revert "[Data] - Iceberg write datafiles in write() then commit"#59181
Revert "[Data] - Iceberg write datafiles in write() then commit"#59181alexeykudinkin merged 1 commit intomasterfrom
Conversation
…)" This reverts commit aecff3c.
There was a problem hiding this comment.
Code Review
This pull request reverts a previous change that introduced a distributed write strategy for Iceberg, returning to a simpler, driver-based write implementation. The revert appears to be correct and consistent across the modified files. My review includes two suggestions: one to restore several important tests for UPSERT functionality that were removed, and another to re-introduce a safety check in the datasink logic to prevent misuse of an internal PyIceberg parameter.
| @@ -1211,248 +1173,6 @@ def test_upsert_matching_scenarios( | |||
| expected = _create_typed_dataframe(expected_data) | |||
| assert rows_same(result_df, expected) | |||
|
|
|||
There was a problem hiding this comment.
This revert removes several valuable tests for UPSERT mode, including test_upsert_schema_evolution, test_upsert_null_in_join_keys, and test_upsert_empty_table. While the implementation now uses PyIceberg's native upsert, these tests are still important for verifying the integration and ensuring that edge cases like schema evolution and NULL handling in join keys work as expected within Ray Data. I recommend re-adding these tests to maintain test coverage and confidence in the UPSERT functionality.
def test_upsert_schema_evolution(self, clean_table):
"""Test that upsert supports automatic schema evolution (adding new columns)."""
# Initial data with 3 columns
initial_data = _create_typed_dataframe(
{"col_a": [1, 2], "col_b": ["row_1", "row_2"], "col_c": [10, 20]}
)
_write_to_iceberg(initial_data)
# Upsert with new column col_d
upsert_data = _create_typed_dataframe(
{
"col_a": [2, 3],
"col_b": ["updated_2", "new_3"],
"col_c": [200, 30],
"col_d": ["extra_2", "extra_3"],
}
)
_write_to_iceberg(
upsert_data, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": ["col_a"]}
)
# Verify schema has col_d
_verify_schema(
{
"col_a": pyi_types.IntegerType,
"col_b": pyi_types.StringType,
"col_c": pyi_types.IntegerType,
"col_d": pyi_types.StringType,
}
)
# Verify data
result_df = _read_from_iceberg(sort_by="col_a")
expected = _create_typed_dataframe(
{
"col_a": [1, 2, 3],
"col_b": ["row_1", "updated_2", "new_3"],
"col_c": [10, 200, 30],
"col_d": [None, "extra_2", "extra_3"],
}
)
assert rows_same(result_df, expected)
@pytest.mark.parametrize(
"join_cols,initial_data_dict,upsert_data_dict,expected_data_dict",
[
# NULL in initial data - row with NULL key isn't updated
(
["col_a"],
{
"col_a": [1, 2, None],
"col_b": ["A", "B", "C"],
"col_c": [10, 20, 30],
}, # Initial data
{
"col_a": [2, 3],
"col_b": ["B_updated", "new_3"],
"col_c": [200, 300],
}, # Upsert data
{
"col_a": [1, 2, 3, None],
"col_b": ["A", "B_updated", "new_3", "C"],
"col_c": [10, 200, 300, 30],
}, # Expected data
),
# NULL in upsert data - row with NULL key is inserted
(
["col_a"],
{
"col_a": [1, 2, 3],
"col_b": ["A", "B", "C"],
"col_c": [10, 20, 30],
}, # Initial data
{
"col_a": [2, None],
"col_b": ["B_updated", "null_row"],
"col_c": [200, 300],
}, # Upsert data
{
"col_a": [1, 2, 3, None],
"col_b": ["A", "B_updated", "C", "null_row"],
"col_c": [10, 200, 30, 300],
}, # Expected data
),
# Composite key with NULL in initial - row not updated
(
["col_a", "col_b"],
{
"col_a": [1, 2, None],
"col_b": ["X", "Y", None],
"col_c": [10, 20, 30],
}, # Initial data
{
"col_a": [2, 3],
"col_b": ["Y", "Z"],
"col_c": [200, 300],
}, # Upsert data
{
"col_a": [1, 2, 3, None],
"col_b": ["X", "Y", "Z", None],
"col_c": [10, 200, 300, 30],
}, # Expected data
),
],
)
def test_upsert_null_in_join_keys(
self,
clean_table,
join_cols,
initial_data_dict,
upsert_data_dict,
expected_data_dict,
):
"""Test upsert behavior with NULL values in join key columns (NULL != NULL in SQL)."""
initial_data = _create_typed_dataframe(initial_data_dict)
upsert_data = _create_typed_dataframe(upsert_data_dict)
expected = _create_typed_dataframe(expected_data_dict)
_write_to_iceberg(initial_data)
_write_to_iceberg(
upsert_data, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": join_cols}
)
result_df = _read_from_iceberg(sort_by="col_a")
assert rows_same(result_df, expected)
def test_upsert_empty_table(self, clean_table):
"""Test upserting into a completely empty table (behaves like insert)."""
upsert_data = _create_typed_dataframe(
{
"col_a": [1, 2, 3],
"col_b": ["new_1", "new_2", "new_3"],
"col_c": [10, 20, 30],
}
)
_write_to_iceberg(
upsert_data, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": ["col_a"]}
)
result_df = _read_from_iceberg(sort_by="col_a")
expected = _create_typed_dataframe(
{
"col_a": [1, 2, 3],
"col_b": ["new_1", "new_2", "new_3"],
"col_c": [10, 20, 30],
}
)
assert rows_same(result_df, expected)| if "overwrite_filter" in self._overwrite_kwargs: | ||
| self._overwrite_kwargs.pop("overwrite_filter") | ||
| logger.warning( | ||
| "Use Ray Data's Expressions for overwrite filter instead of passing " | ||
| "it via PyIceberg's overwrite_filter parameter" | ||
| ) |
There was a problem hiding this comment.
The original implementation that is being reverted included a check to prevent users from passing delete_filter in overwrite_kwargs, as it's an internal PyIceberg parameter. This revert removes that check. It would be good to add it back to avoid potential conflicts and user confusion, similar to how overwrite_filter is handled.
| if "overwrite_filter" in self._overwrite_kwargs: | |
| self._overwrite_kwargs.pop("overwrite_filter") | |
| logger.warning( | |
| "Use Ray Data's Expressions for overwrite filter instead of passing " | |
| "it via PyIceberg's overwrite_filter parameter" | |
| ) | |
| if "overwrite_filter" in self._overwrite_kwargs: | |
| self._overwrite_kwargs.pop("overwrite_filter") | |
| logger.warning( | |
| "Use Ray Data's Expressions for overwrite filter instead of passing " | |
| "it via PyIceberg's overwrite_filter parameter" | |
| ) | |
| if "delete_filter" in self._overwrite_kwargs: | |
| self._overwrite_kwargs.pop("delete_filter") | |
| logger.warning( | |
| "Removed 'delete_filter' from overwrite_kwargs: " | |
| "it is an internal PyIceberg parameter; use 'overwrite_filter' instead" | |
| ) |
|
|
||
| def _complete_upsert(self, combined_table: "pa.Table") -> None: | ||
| """Complete UPSERT mode write using PyIceberg's upsert API.""" | ||
| self._table.upsert(df=combined_table, **self._upsert_kwargs) |
There was a problem hiding this comment.
Bug: UPSERT mode silently ignores snapshot_properties parameter
The _complete_upsert method does not pass self._snapshot_properties to the table.upsert() call, unlike _complete_append and _complete_overwrite which both include snapshot_properties=self._snapshot_properties. This means any custom snapshot properties specified by users for UPSERT mode are silently ignored, even though the snapshot_properties parameter is documented to work for all write modes ("Custom properties to write to snapshot when committing to an iceberg table").
…-project#59181) Reverts ray-project#58601 Signed-off-by: peterxcli <peterxcli@gmail.com>
Reverts #58601