Skip to content

Revert "[Data] - Iceberg write datafiles in write() then commit"#59181

Merged
alexeykudinkin merged 1 commit intomasterfrom
revert-58601-goutam/iceberg_streaming_write
Dec 4, 2025
Merged

Revert "[Data] - Iceberg write datafiles in write() then commit"#59181
alexeykudinkin merged 1 commit intomasterfrom
revert-58601-goutam/iceberg_streaming_write

Conversation

@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor

Reverts #58601

@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner December 4, 2025 19:19
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Dec 4, 2025
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request reverts a previous change that introduced a distributed write strategy for Iceberg, returning to a simpler, driver-based write implementation. The revert appears to be correct and consistent across the modified files. My review includes two suggestions: one to restore several important tests for UPSERT functionality that were removed, and another to re-introduce a safety check in the datasink logic to prevent misuse of an internal PyIceberg parameter.

@@ -1211,248 +1173,6 @@ def test_upsert_matching_scenarios(
expected = _create_typed_dataframe(expected_data)
assert rows_same(result_df, expected)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This revert removes several valuable tests for UPSERT mode, including test_upsert_schema_evolution, test_upsert_null_in_join_keys, and test_upsert_empty_table. While the implementation now uses PyIceberg's native upsert, these tests are still important for verifying the integration and ensuring that edge cases like schema evolution and NULL handling in join keys work as expected within Ray Data. I recommend re-adding these tests to maintain test coverage and confidence in the UPSERT functionality.

    def test_upsert_schema_evolution(self, clean_table):
        """Test that upsert supports automatic schema evolution (adding new columns)."""
        # Initial data with 3 columns
        initial_data = _create_typed_dataframe(
            {"col_a": [1, 2], "col_b": ["row_1", "row_2"], "col_c": [10, 20]}
        )
        _write_to_iceberg(initial_data)

        # Upsert with new column col_d
        upsert_data = _create_typed_dataframe(
            {
                "col_a": [2, 3],
                "col_b": ["updated_2", "new_3"],
                "col_c": [200, 30],
                "col_d": ["extra_2", "extra_3"],
            }
        )
        _write_to_iceberg(
            upsert_data, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": ["col_a"]}
        )

        # Verify schema has col_d
        _verify_schema(
            {
                "col_a": pyi_types.IntegerType,
                "col_b": pyi_types.StringType,
                "col_c": pyi_types.IntegerType,
                "col_d": pyi_types.StringType,
            }
        )

        # Verify data
        result_df = _read_from_iceberg(sort_by="col_a")
        expected = _create_typed_dataframe(
            {
                "col_a": [1, 2, 3],
                "col_b": ["row_1", "updated_2", "new_3"],
                "col_c": [10, 200, 30],
                "col_d": [None, "extra_2", "extra_3"],
            }
        )
        assert rows_same(result_df, expected)

    @pytest.mark.parametrize(
        "join_cols,initial_data_dict,upsert_data_dict,expected_data_dict",
        [
            # NULL in initial data - row with NULL key isn't updated
            (
                ["col_a"],
                {
                    "col_a": [1, 2, None],
                    "col_b": ["A", "B", "C"],
                    "col_c": [10, 20, 30],
                },  # Initial data
                {
                    "col_a": [2, 3],
                    "col_b": ["B_updated", "new_3"],
                    "col_c": [200, 300],
                },  # Upsert data
                {
                    "col_a": [1, 2, 3, None],
                    "col_b": ["A", "B_updated", "new_3", "C"],
                    "col_c": [10, 200, 300, 30],
                },  # Expected data
            ),
            # NULL in upsert data - row with NULL key is inserted
            (
                ["col_a"],
                {
                    "col_a": [1, 2, 3],
                    "col_b": ["A", "B", "C"],
                    "col_c": [10, 20, 30],
                },  # Initial data
                {
                    "col_a": [2, None],
                    "col_b": ["B_updated", "null_row"],
                    "col_c": [200, 300],
                },  # Upsert data
                {
                    "col_a": [1, 2, 3, None],
                    "col_b": ["A", "B_updated", "C", "null_row"],
                    "col_c": [10, 200, 30, 300],
                },  # Expected data
            ),
            # Composite key with NULL in initial - row not updated
            (
                ["col_a", "col_b"],
                {
                    "col_a": [1, 2, None],
                    "col_b": ["X", "Y", None],
                    "col_c": [10, 20, 30],
                },  # Initial data
                {
                    "col_a": [2, 3],
                    "col_b": ["Y", "Z"],
                    "col_c": [200, 300],
                },  # Upsert data
                {
                    "col_a": [1, 2, 3, None],
                    "col_b": ["X", "Y", "Z", None],
                    "col_c": [10, 200, 300, 30],
                },  # Expected data
            ),
        ],
    )
    def test_upsert_null_in_join_keys(
        self,
        clean_table,
        join_cols,
        initial_data_dict,
        upsert_data_dict,
        expected_data_dict,
    ):
        """Test upsert behavior with NULL values in join key columns (NULL != NULL in SQL)."""
        initial_data = _create_typed_dataframe(initial_data_dict)
        upsert_data = _create_typed_dataframe(upsert_data_dict)
        expected = _create_typed_dataframe(expected_data_dict)

        _write_to_iceberg(initial_data)
        _write_to_iceberg(
            upsert_data, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": join_cols}
        )

        result_df = _read_from_iceberg(sort_by="col_a")
        assert rows_same(result_df, expected)

    def test_upsert_empty_table(self, clean_table):
        """Test upserting into a completely empty table (behaves like insert)."""
        upsert_data = _create_typed_dataframe(
            {
                "col_a": [1, 2, 3],
                "col_b": ["new_1", "new_2", "new_3"],
                "col_c": [10, 20, 30],
            }
        )
        _write_to_iceberg(
            upsert_data, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": ["col_a"]}
        )

        result_df = _read_from_iceberg(sort_by="col_a")
        expected = _create_typed_dataframe(
            {
                "col_a": [1, 2, 3],
                "col_b": ["new_1", "new_2", "new_3"],
                "col_c": [10, 20, 30],
            }
        )
        assert rows_same(result_df, expected)

Comment on lines +200 to +205
if "overwrite_filter" in self._overwrite_kwargs:
self._overwrite_kwargs.pop("overwrite_filter")
logger.warning(
"Use Ray Data's Expressions for overwrite filter instead of passing "
"it via PyIceberg's overwrite_filter parameter"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The original implementation that is being reverted included a check to prevent users from passing delete_filter in overwrite_kwargs, as it's an internal PyIceberg parameter. This revert removes that check. It would be good to add it back to avoid potential conflicts and user confusion, similar to how overwrite_filter is handled.

Suggested change
if "overwrite_filter" in self._overwrite_kwargs:
self._overwrite_kwargs.pop("overwrite_filter")
logger.warning(
"Use Ray Data's Expressions for overwrite filter instead of passing "
"it via PyIceberg's overwrite_filter parameter"
)
if "overwrite_filter" in self._overwrite_kwargs:
self._overwrite_kwargs.pop("overwrite_filter")
logger.warning(
"Use Ray Data's Expressions for overwrite filter instead of passing "
"it via PyIceberg's overwrite_filter parameter"
)
if "delete_filter" in self._overwrite_kwargs:
self._overwrite_kwargs.pop("delete_filter")
logger.warning(
"Removed 'delete_filter' from overwrite_kwargs: "
"it is an internal PyIceberg parameter; use 'overwrite_filter' instead"
)


def _complete_upsert(self, combined_table: "pa.Table") -> None:
"""Complete UPSERT mode write using PyIceberg's upsert API."""
self._table.upsert(df=combined_table, **self._upsert_kwargs)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: UPSERT mode silently ignores snapshot_properties parameter

The _complete_upsert method does not pass self._snapshot_properties to the table.upsert() call, unlike _complete_append and _complete_overwrite which both include snapshot_properties=self._snapshot_properties. This means any custom snapshot properties specified by users for UPSERT mode are silently ignored, even though the snapshot_properties parameter is documented to work for all write modes ("Custom properties to write to snapshot when committing to an iceberg table").

Fix in Cursor Fix in Web

@alexeykudinkin alexeykudinkin enabled auto-merge (squash) December 4, 2025 19:33
@alexeykudinkin alexeykudinkin merged commit 56bb1fc into master Dec 4, 2025
7 checks passed
@alexeykudinkin alexeykudinkin deleted the revert-58601-goutam/iceberg_streaming_write branch December 4, 2025 22:33
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants