Remove Schema From BlockMetadata#53454
Conversation
a765c6b to
0948584
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
0948584 to
0127f7d
Compare
dc8da44 to
fbf29fa
Compare
return type; addin schema=; Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
fbf29fa to
2ff27fe
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
4f8a2c5 to
e54f195
Compare
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
067c136 to
38db8f4
Compare
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
| meta_schema.schema, | ||
| ) | ||
| bytes_read += meta.size_bytes | ||
| bytes_read += meta_schema.metadata.size_bytes |
There was a problem hiding this comment.
Not really a fan of that double wrapping -- why not just return a tuple and unpack it above?
There was a problem hiding this comment.
oh i was trying to be consistent using MetadataAndSchema. even though it double unwraps, the alternative of a tuple makes the code inconsistent with how we handle metadata and schema
There was a problem hiding this comment.
@iamjustinhsu not sure i follow your point. Please elaborate
There was a problem hiding this comment.
iiuc, u want
(metadata, schema): Tuple[BlockMetadata, Schema] = ray.get(...)
bytes_read += metadata.size_bytesbut since we use MetadataAndSchema everywhere, there's no reason why we shouldn't be able to do
meta_schema: MetadataAndSchema = ray.get(...)
bytes_read += meta_schema.metadata.size_bytes
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/plan.py
Outdated
| break | ||
|
|
||
| self._schema = schema | ||
| for _ in iter_ref_bundles: |
There was a problem hiding this comment.
Schemas shouldn't be in the Operator hierarchy, but we can keep them inside OpState (for now).
There was a problem hiding this comment.
to hao's point, physical operators do contain a lot of state. in fact, the doc string says they are stateful
Physical operators are stateful and non-serializable; they live on the driver side
of the Dataset only.
I'm a little unclear why we can't keep schemas at the PhysicalOperator Level?
There was a problem hiding this comment.
Let's consolidate this discussion in the other thread
raulchen
left a comment
There was a problem hiding this comment.
the overall change looks good to me.
Most of the comments are about code structure and code style.
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/logical/interfaces/logical_operator.py
Outdated
Show resolved
Hide resolved
|
|
||
| # TODO(jusitn): split this into 2, it's not always the case | ||
| # that both schema and metadata are correlated | ||
| class GuessMetadataMixin(ABC): |
There was a problem hiding this comment.
I think I still prefer just putting these methods in the LogicalOperator for the following reasons:
- this
guess_schemacan also return None. So knowing an operator is a GuessMetadataMixin doesn't ensure knowing the schema. guess_schema/guess_metadataisn't specific to source operators.- Code would be cleaner. no need for the isinstance checks.
of course, the downside is that subclasses need to remember to override the implementation. But with this, you need to remember to make a subclass inherit from GuessMetadataMixin as well. So no much difference.
also a couple of other suggestions:
- I slightly prefer
infer_overguess_for the method names. - (now a new issue) I feel it's weird to have this
guess_metadataoraggregate_output_metadata. Because a BlockMetadata is supposed to be for one single block, instead of the entire op.. I checked the use cases, and we are only usingschema/input_files/num_rowsfrom the metadata. So we can probably just have separateinfer_schemaandinfer_num_rows.
input_filesshould probably also be removed from schema. Because it should be an attribute of a source logical operator, instead of a block. but no need to handle this in this PR.
SourceOperatorMixindoesn't need to be a mixin. It can just be a subclass of LogicalOperator with aninput_filesmethod.- for
output_data, we can introduce another subclassExistingDataSourceOperator.
There was a problem hiding this comment.
ok i removed the GuessMetadataMixin, but still kept SourceOperator (renamed from SourceOperatorMixin to hold def output_data).
for output_data, we can introduce another subclass ExistingDataSourceOperator.
I just found it weird that we didn't have a standard way of checking if a an operator is a source operator (prior to this, we were looking is_source_op == (isinstance(op, Read) or len(op.input_dependencies) == 0). ExistingDataSourceOperator looks good, but it doesn't apply to Read which is a SourceOperator, so I just applied SourceOperator to Read but let it return None
- done
- yea i'll make a TODO
- Not sure I entirely followed because
AbstractFromandInputDatadon't necessarily containinput_files, but I just foldedSourceOperatorinto one class to keep things simple - addressed above
There was a problem hiding this comment.
to clarify, what I proposed is such a hierarchy:
- LogicalOperator
- SourceOperator
- Read (
input_files) - ExistingDataSourceOperator (
output_data)- AbstractFrom
- InputDataBuffer
- Read (
- SourceOperator
only ExistingDataSourceOperator has output_data
There was a problem hiding this comment.
oh i see, I think it would good just not in this PR, as it's already large enough. The main motivation for SourceOperator was because checking for SourceOperator was non standard.
a952a2e to
0ca6ef1
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
0ca6ef1 to
824f3ff
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
51af022 to
6946adc
Compare
8a15552 to
0543925
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
0543925 to
39c3fb1
Compare
…u/remove-schema-from-block-metadata Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
e0e93b0 to
bef52c4
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
5454b3f to
c043762
Compare
raulchen
left a comment
There was a problem hiding this comment.
LGTM. Let's follow up with some TODOs in different PRs.
Currently, each block has a schema. If there are many blocks in a ref bundle, then that schema is duplicated everywhere. we should attach the concept of schema at the dataset/operator level, not block/bundle level. This PR removes schema from blockmetadata, moves it to physicaloperator level. This should decrease block overhead/serde runtime I think it would be advantageous to combine the construct of BlockMetadata + Schema into a 3rd class/named tuple to make it easier to pass around for certain scenarios, but for now it made it a tuple since it makes it easier to handle --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Consider the following code ```python ds = ray.data.range(10) ds = ds.repartition() ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) doesn't contain schema sometimes??? ``` ^ This is flakey ```python ds = ray.data.range(10) ds = ds.repartition() # ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) contains schema??? ``` ^ This isn't flakey??? Explanation: - There are many scenarios where we produce empty blocks (ie, in shuffle task map where we slice the blocks and send them to reduce). - Empty blocks in pyarrow still have a schema - When deduping a schema, we default take the FIRST schema as the source of truth. However, we should take the first NON-EMPTY schema, because order is non-deterministic. But that's what i was doing before #53454. with the check`schema is None`. Oh wait, it should be `not schema` (this covers `None` and empty schemas) - The first block of code failed because the user mapped empty block -> empty block. In our code, *with a UDF function*, empty blocks have no schema - The second block of code secretly succeeded because the user did not use UDF. Instead, we created empty blocks with the original schema. I ran it a gazillion times, it should not be flakey anymore <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Consider the following code ```python ds = ray.data.range(10) ds = ds.repartition() ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) doesn't contain schema sometimes??? ``` ^ This is flakey ```python ds = ray.data.range(10) ds = ds.repartition() # ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) contains schema??? ``` ^ This isn't flakey??? Explanation: - There are many scenarios where we produce empty blocks (ie, in shuffle task map where we slice the blocks and send them to reduce). - Empty blocks in pyarrow still have a schema - When deduping a schema, we default take the FIRST schema as the source of truth. However, we should take the first NON-EMPTY schema, because order is non-deterministic. But that's what i was doing before ray-project#53454. with the check`schema is None`. Oh wait, it should be `not schema` (this covers `None` and empty schemas) - The first block of code failed because the user mapped empty block -> empty block. In our code, *with a UDF function*, empty blocks have no schema - The second block of code secretly succeeded because the user did not use UDF. Instead, we created empty blocks with the original schema. I ran it a gazillion times, it should not be flakey anymore <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Currently, each block has a schema. If there are many blocks in a ref bundle, then that schema is duplicated everywhere. we should attach the concept of schema at the dataset/operator level, not block/bundle level. This PR removes schema from blockmetadata, moves it to physicaloperator level. This should decrease block overhead/serde runtime I think it would be advantageous to combine the construct of BlockMetadata + Schema into a 3rd class/named tuple to make it easier to pass around for certain scenarios, but for now it made it a tuple since it makes it easier to handle --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Consider the following code ```python ds = ray.data.range(10) ds = ds.repartition() ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) doesn't contain schema sometimes??? ``` ^ This is flakey ```python ds = ray.data.range(10) ds = ds.repartition() # ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) contains schema??? ``` ^ This isn't flakey??? Explanation: - There are many scenarios where we produce empty blocks (ie, in shuffle task map where we slice the blocks and send them to reduce). - Empty blocks in pyarrow still have a schema - When deduping a schema, we default take the FIRST schema as the source of truth. However, we should take the first NON-EMPTY schema, because order is non-deterministic. But that's what i was doing before #53454. with the check`schema is None`. Oh wait, it should be `not schema` (this covers `None` and empty schemas) - The first block of code failed because the user mapped empty block -> empty block. In our code, *with a UDF function*, empty blocks have no schema - The second block of code secretly succeeded because the user did not use UDF. Instead, we created empty blocks with the original schema. I ran it a gazillion times, it should not be flakey anymore <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Consider the following code ```python ds = ray.data.range(10) ds = ds.repartition() ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) doesn't contain schema sometimes??? ``` ^ This is flakey ```python ds = ray.data.range(10) ds = ds.repartition() # ds = ds.map_batches(lambda x : x) it1, it2 = ds.split(2) # repr(it2) contains schema??? ``` ^ This isn't flakey??? Explanation: - There are many scenarios where we produce empty blocks (ie, in shuffle task map where we slice the blocks and send them to reduce). - Empty blocks in pyarrow still have a schema - When deduping a schema, we default take the FIRST schema as the source of truth. However, we should take the first NON-EMPTY schema, because order is non-deterministic. But that's what i was doing before ray-project/ray#53454. with the check`schema is None`. Oh wait, it should be `not schema` (this covers `None` and empty schemas) - The first block of code failed because the user mapped empty block -> empty block. In our code, *with a UDF function*, empty blocks have no schema - The second block of code secretly succeeded because the user did not use UDF. Instead, we created empty blocks with the original schema. I ran it a gazillion times, it should not be flakey anymore <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Why are these changes needed?
Currently, each block has a schema. If there are many blocks in a ref bundle, then that schema is duplicated everywhere. we should attach the concept of schema at the dataset/operator level, not block/bundle level. This PR removes schema from blockmetadata, moves it to physicaloperator level. This should decrease block overhead/serde runtime
I think it would be advantageous to combine the construct of BlockMetadata + Schema into a 3rd class/named tuple to make it easier to pass around for certain scenarios, but for now it made it a tuple since it makes it easier to handle
Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.