Skip to content

[data] fix flakey schema#53901

Merged
bveeramani merged 4 commits intoray-project:masterfrom
iamjustinhsu:jhsu/fix-flakey-schema
Jun 19, 2025
Merged

[data] fix flakey schema#53901
bveeramani merged 4 commits intoray-project:masterfrom
iamjustinhsu:jhsu/fix-flakey-schema

Conversation

@iamjustinhsu
Copy link
Copy Markdown
Contributor

@iamjustinhsu iamjustinhsu commented Jun 17, 2025

Why are these changes needed?

Consider the following code

ds = ray.data.range(10)
ds = ds.repartition()
ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) doesn't contain schema sometimes???

^ This is flakey

ds = ray.data.range(10)
ds = ds.repartition()
# ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) contains schema??? 

^ This isn't flakey???

Explanation:

  • There are many scenarios where we produce empty blocks (ie, in shuffle task map where we slice the blocks and send them to reduce).
  • Empty blocks in pyarrow still have a schema
  • When deduping a schema, we default take the FIRST schema as the source of truth. However, we should take the first NON-EMPTY schema, because order is non-deterministic. But that's what i was doing before Remove Schema From BlockMetadata #53454. with the checkschema is None. Oh wait, it should be not schema (this covers None and empty schemas)
  • The first block of code failed because the user mapped empty block -> empty block. In our code, with a UDF function, empty blocks have no schema
  • The second block of code secretly succeeded because the user did not use UDF. Instead, we created empty blocks with the original schema.

I ran it a gazillion times, it should not be flakey anymore

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@iamjustinhsu iamjustinhsu requested a review from a team as a code owner June 17, 2025 22:54
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/fix-flakey-schema branch from 2c584cb to 2b2e879 Compare June 17, 2025 22:56
@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Jun 17, 2025
@iamjustinhsu iamjustinhsu changed the title fix flakey schema [data] fix flakey schema Jun 18, 2025
# memory footprint of multiple schemas by keeping only one copy.
diverged = False
if old_schema is None:
if not old_schema:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our code, with a UDF function, empty blocks have no schema

By no schema, you mean an empty schema, or None (I assume empty)?

Copy link
Copy Markdown
Contributor Author

@iamjustinhsu iamjustinhsu Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, ur understanding correct
schema = None (we manually set)
schema empty = BlockAccessor.for_block().schema()
by no schema, i mean schema empty

@@ -756,15 +756,12 @@ def dedupe_schemas_with_validation(
# Note, often times the refbundles correspond to only one schema. We can reduce the
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should map operators yield empty blocks in the first place? If not, is it worthwhile to fix that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my intention with this PR (and the schema one) was to keep the existing behavior (which kept empty blocks). With that said, however, I don't think we can skip empty blocks because Operators themselves keep a dictionary of DataOpTasks, which the StreamingExecutor uses to poll ready tasks. If the tasks don't yield anything, than the StreamingExecutor will be waiting on tasks that don't yield, ie, may deadlock.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we have an assumption that all DataOpTasks produce at least one output? I see...

@@ -756,15 +756,12 @@ def dedupe_schemas_with_validation(
# Note, often times the refbundles correspond to only one schema. We can reduce the
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for dedupe_schemas_with_validation with an empty schema, so that we don't regress?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea sure

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/fix-flakey-schema branch from bedaf08 to 2c0dff5 Compare June 18, 2025 19:59
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Comment on lines +44 to +45
# old_schema is valid
assert diverged, (old_schema, incoming_schema)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes old_schema is valid, and that old_schema != new_schema?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrote another assert for that

"exclusive",
"team:data",
],
deps = [],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these?

Suggested change
deps = [],
deps = [
":conftest",
"//:ray_lib",
],```

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think so, these are pure unit tests, doesn't rely on any fixtures or libs, but i'll double check

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

if allow_divergent:
assert out_bundle.schema == pa.schema(list(old_schema) + list(incoming_schema))
else:
assert out_bundle.schema == old_schema
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add this for the test to actually run:

if __name__ == "__main__":
    import sys

    sys.exit(pytest.main(["-v", __file__]))

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/fix-flakey-schema branch from 700d4a6 to 4fca243 Compare June 18, 2025 23:29
@bveeramani bveeramani merged commit 4685f3e into ray-project:master Jun 19, 2025
5 checks passed
@iamjustinhsu iamjustinhsu deleted the jhsu/fix-flakey-schema branch June 19, 2025 18:02
minerharry pushed a commit to minerharry/ray that referenced this pull request Jun 27, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Consider the following code
```python
ds = ray.data.range(10)
ds = ds.repartition()
ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) doesn't contain schema sometimes???
```
^ This is flakey

```python
ds = ray.data.range(10)
ds = ds.repartition()
# ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) contains schema??? 
```
^ This isn't flakey???

Explanation:
- There are many scenarios where we produce empty blocks (ie, in shuffle
task map where we slice the blocks and send them to reduce).
- Empty blocks in pyarrow still have a schema
- When deduping a schema, we default take the FIRST schema as the source
of truth. However, we should take the first NON-EMPTY schema, because
order is non-deterministic. But that's what i was doing before
ray-project#53454. with the check`schema is
None`. Oh wait, it should be `not schema` (this covers `None` and empty
schemas)
- The first block of code failed because the user mapped empty block ->
empty block. In our code, *with a UDF function*, empty blocks have no
schema
- The second block of code secretly succeeded because the user did not
use UDF. Instead, we created empty blocks with the original schema.


I ran it a gazillion times, it should not be flakey anymore
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Jul 2, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Consider the following code
```python
ds = ray.data.range(10)
ds = ds.repartition()
ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) doesn't contain schema sometimes???
```
^ This is flakey

```python
ds = ray.data.range(10)
ds = ds.repartition()
# ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) contains schema???
```
^ This isn't flakey???

Explanation:
- There are many scenarios where we produce empty blocks (ie, in shuffle
task map where we slice the blocks and send them to reduce).
- Empty blocks in pyarrow still have a schema
- When deduping a schema, we default take the FIRST schema as the source
of truth. However, we should take the first NON-EMPTY schema, because
order is non-deterministic. But that's what i was doing before
#53454. with the check`schema is
None`. Oh wait, it should be `not schema` (this covers `None` and empty
schemas)
- The first block of code failed because the user mapped empty block ->
empty block. In our code, *with a UDF function*, empty blocks have no
schema
- The second block of code secretly succeeded because the user did not
use UDF. Instead, we created empty blocks with the original schema.

I ran it a gazillion times, it should not be flakey anymore
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants