[data] fix flakey schema#53901
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
2c584cb to
2b2e879
Compare
| # memory footprint of multiple schemas by keeping only one copy. | ||
| diverged = False | ||
| if old_schema is None: | ||
| if not old_schema: |
There was a problem hiding this comment.
In our code, with a UDF function, empty blocks have no schema
By no schema, you mean an empty schema, or None (I assume empty)?
There was a problem hiding this comment.
ya, ur understanding correct
schema = None (we manually set)
schema empty = BlockAccessor.for_block().schema()
by no schema, i mean schema empty
| @@ -756,15 +756,12 @@ def dedupe_schemas_with_validation( | |||
| # Note, often times the refbundles correspond to only one schema. We can reduce the | |||
There was a problem hiding this comment.
Should map operators yield empty blocks in the first place? If not, is it worthwhile to fix that?
There was a problem hiding this comment.
my intention with this PR (and the schema one) was to keep the existing behavior (which kept empty blocks). With that said, however, I don't think we can skip empty blocks because Operators themselves keep a dictionary of DataOpTasks, which the StreamingExecutor uses to poll ready tasks. If the tasks don't yield anything, than the StreamingExecutor will be waiting on tasks that don't yield, ie, may deadlock.
There was a problem hiding this comment.
Oh, we have an assumption that all DataOpTasks produce at least one output? I see...
| @@ -756,15 +756,12 @@ def dedupe_schemas_with_validation( | |||
| # Note, often times the refbundles correspond to only one schema. We can reduce the | |||
There was a problem hiding this comment.
Can you add a test for dedupe_schemas_with_validation with an empty schema, so that we don't regress?
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
bedaf08 to
2c0dff5
Compare
| # old_schema is valid | ||
| assert diverged, (old_schema, incoming_schema) |
There was a problem hiding this comment.
This assumes old_schema is valid, and that old_schema != new_schema?
There was a problem hiding this comment.
wrote another assert for that
| "exclusive", | ||
| "team:data", | ||
| ], | ||
| deps = [], |
There was a problem hiding this comment.
Do we need these?
| deps = [], | |
| deps = [ | |
| ":conftest", | |
| "//:ray_lib", | |
| ],``` |
There was a problem hiding this comment.
i don't think so, these are pure unit tests, doesn't rely on any fixtures or libs, but i'll double check
| if allow_divergent: | ||
| assert out_bundle.schema == pa.schema(list(old_schema) + list(incoming_schema)) | ||
| else: | ||
| assert out_bundle.schema == old_schema |
There was a problem hiding this comment.
You need to add this for the test to actually run:
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
700d4a6 to
4fca243
Compare
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Consider the following code ```python ds = ray.data.range(10) ds = ds.repartition() ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) doesn't contain schema sometimes??? ``` ^ This is flakey ```python ds = ray.data.range(10) ds = ds.repartition() # ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) contains schema??? ``` ^ This isn't flakey??? Explanation: - There are many scenarios where we produce empty blocks (ie, in shuffle task map where we slice the blocks and send them to reduce). - Empty blocks in pyarrow still have a schema - When deduping a schema, we default take the FIRST schema as the source of truth. However, we should take the first NON-EMPTY schema, because order is non-deterministic. But that's what i was doing before ray-project#53454. with the check`schema is None`. Oh wait, it should be `not schema` (this covers `None` and empty schemas) - The first block of code failed because the user mapped empty block -> empty block. In our code, *with a UDF function*, empty blocks have no schema - The second block of code secretly succeeded because the user did not use UDF. Instead, we created empty blocks with the original schema. I ran it a gazillion times, it should not be flakey anymore <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Consider the following code ```python ds = ray.data.range(10) ds = ds.repartition() ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) doesn't contain schema sometimes??? ``` ^ This is flakey ```python ds = ray.data.range(10) ds = ds.repartition() # ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) contains schema??? ``` ^ This isn't flakey??? Explanation: - There are many scenarios where we produce empty blocks (ie, in shuffle task map where we slice the blocks and send them to reduce). - Empty blocks in pyarrow still have a schema - When deduping a schema, we default take the FIRST schema as the source of truth. However, we should take the first NON-EMPTY schema, because order is non-deterministic. But that's what i was doing before #53454. with the check`schema is None`. Oh wait, it should be `not schema` (this covers `None` and empty schemas) - The first block of code failed because the user mapped empty block -> empty block. In our code, *with a UDF function*, empty blocks have no schema - The second block of code secretly succeeded because the user did not use UDF. Instead, we created empty blocks with the original schema. I ran it a gazillion times, it should not be flakey anymore <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>

Why are these changes needed?
Consider the following code
^ This is flakey
^ This isn't flakey???
Explanation:
schema is None. Oh wait, it should benot schema(this coversNoneand empty schemas)I ran it a gazillion times, it should not be flakey anymore
Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.