Skip to content

Remove Schema From BlockMetadata#53454

Merged
raulchen merged 20 commits intoray-project:masterfrom
iamjustinhsu:jhsu/remove-schema-from-block-metadata
Jun 12, 2025
Merged

Remove Schema From BlockMetadata#53454
raulchen merged 20 commits intoray-project:masterfrom
iamjustinhsu:jhsu/remove-schema-from-block-metadata

Conversation

@iamjustinhsu
Copy link
Copy Markdown
Contributor

@iamjustinhsu iamjustinhsu commented May 30, 2025

Why are these changes needed?

Currently, each block has a schema. If there are many blocks in a ref bundle, then that schema is duplicated everywhere. we should attach the concept of schema at the dataset/operator level, not block/bundle level. This PR removes schema from blockmetadata, moves it to physicaloperator level. This should decrease block overhead/serde runtime

I think it would be advantageous to combine the construct of BlockMetadata + Schema into a 3rd class/named tuple to make it easier to pass around for certain scenarios, but for now it made it a tuple since it makes it easier to handle

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch 5 times, most recently from a765c6b to 0948584 Compare May 31, 2025 16:28
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch from 0948584 to 0127f7d Compare May 31, 2025 17:32
@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label May 31, 2025
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch 3 times, most recently from dc8da44 to fbf29fa Compare May 31, 2025 23:07
return type; addin schema=;

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch from fbf29fa to 2ff27fe Compare June 1, 2025 02:37
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch from 4f8a2c5 to e54f195 Compare June 1, 2025 06:07
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch from 067c136 to 38db8f4 Compare June 3, 2025 22:38
meta_schema.schema,
)
bytes_read += meta.size_bytes
bytes_read += meta_schema.metadata.size_bytes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really a fan of that double wrapping -- why not just return a tuple and unpack it above?

Copy link
Copy Markdown
Contributor Author

@iamjustinhsu iamjustinhsu Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i was trying to be consistent using MetadataAndSchema. even though it double unwraps, the alternative of a tuple makes the code inconsistent with how we handle metadata and schema

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iamjustinhsu not sure i follow your point. Please elaborate

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, u want

(metadata, schema): Tuple[BlockMetadata, Schema] = ray.get(...)
bytes_read += metadata.size_bytes

but since we use MetadataAndSchema everywhere, there's no reason why we shouldn't be able to do

meta_schema: MetadataAndSchema = ray.get(...)
bytes_read += meta_schema.metadata.size_bytes

break

self._schema = schema
for _ in iter_ref_bundles:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schemas shouldn't be in the Operator hierarchy, but we can keep them inside OpState (for now).

Copy link
Copy Markdown
Contributor Author

@iamjustinhsu iamjustinhsu Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to hao's point, physical operators do contain a lot of state. in fact, the doc string says they are stateful

Physical operators are stateful and non-serializable; they live on the driver side
of the Dataset only.

I'm a little unclear why we can't keep schemas at the PhysicalOperator Level?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consolidate this discussion in the other thread

#53454 (comment)

Copy link
Copy Markdown
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the overall change looks good to me.
Most of the comments are about code structure and code style.


# TODO(jusitn): split this into 2, it's not always the case
# that both schema and metadata are correlated
class GuessMetadataMixin(ABC):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I still prefer just putting these methods in the LogicalOperator for the following reasons:

  • this guess_schema can also return None. So knowing an operator is a GuessMetadataMixin doesn't ensure knowing the schema.
  • guess_schema/guess_metadata isn't specific to source operators.
  • Code would be cleaner. no need for the isinstance checks.

of course, the downside is that subclasses need to remember to override the implementation. But with this, you need to remember to make a subclass inherit from GuessMetadataMixin as well. So no much difference.

also a couple of other suggestions:

  1. I slightly prefer infer_ over guess_ for the method names.
  2. (now a new issue) I feel it's weird to have this guess_metadata or aggregate_output_metadata. Because a BlockMetadata is supposed to be for one single block, instead of the entire op.. I checked the use cases, and we are only using schema/input_files/num_rows from the metadata. So we can probably just have separate infer_schema and infer_num_rows.
  • input_files should probably also be removed from schema. Because it should be an attribute of a source logical operator, instead of a block. but no need to handle this in this PR.
  1. SourceOperatorMixin doesn't need to be a mixin. It can just be a subclass of LogicalOperator with an input_files method.
  2. for output_data, we can introduce another subclass ExistingDataSourceOperator.

Copy link
Copy Markdown
Contributor Author

@iamjustinhsu iamjustinhsu Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i removed the GuessMetadataMixin, but still kept SourceOperator (renamed from SourceOperatorMixin to hold def output_data).

for output_data, we can introduce another subclass ExistingDataSourceOperator.

I just found it weird that we didn't have a standard way of checking if a an operator is a source operator (prior to this, we were looking is_source_op == (isinstance(op, Read) or len(op.input_dependencies) == 0). ExistingDataSourceOperator looks good, but it doesn't apply to Read which is a SourceOperator, so I just applied SourceOperator to Read but let it return None

  1. done
  2. yea i'll make a TODO
  3. Not sure I entirely followed because AbstractFrom and InputData don't necessarily contain input_files, but I just folded SourceOperator into one class to keep things simple
  4. addressed above

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to clarify, what I proposed is such a hierarchy:

  • LogicalOperator
    • SourceOperator
      • Read (input_files)
      • ExistingDataSourceOperator (output_data)
        • AbstractFrom
        • InputDataBuffer

only ExistingDataSourceOperator has output_data

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, I think it would good just not in this PR, as it's already large enough. The main motivation for SourceOperator was because checking for SourceOperator was non standard.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch 3 times, most recently from a952a2e to 0ca6ef1 Compare June 4, 2025 16:56
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch from 0ca6ef1 to 824f3ff Compare June 4, 2025 17:49
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch 3 times, most recently from 51af022 to 6946adc Compare June 4, 2025 19:08
@iamjustinhsu iamjustinhsu removed the request for review from a team June 10, 2025 23:15
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch 2 times, most recently from 8a15552 to 0543925 Compare June 10, 2025 23:28
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch from 0543925 to 39c3fb1 Compare June 10, 2025 23:33
…u/remove-schema-from-block-metadata

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch from e0e93b0 to bef52c4 Compare June 11, 2025 00:50
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/remove-schema-from-block-metadata branch from 5454b3f to c043762 Compare June 11, 2025 02:32
Copy link
Copy Markdown
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Let's follow up with some TODOs in different PRs.

@raulchen raulchen merged commit a1b9dc6 into ray-project:master Jun 12, 2025
5 checks passed
@iamjustinhsu iamjustinhsu deleted the jhsu/remove-schema-from-block-metadata branch June 12, 2025 21:29
raulchen pushed a commit that referenced this pull request Jun 13, 2025
Follow-up to #53454

Closes #53786

Signed-off-by: Matthew Deng <matt@anyscale.com>
@iamjustinhsu iamjustinhsu mentioned this pull request Jun 17, 2025
8 tasks
elliot-barn pushed a commit that referenced this pull request Jun 18, 2025
Currently, each block has a schema. If there are many blocks in a ref
bundle, then that schema is duplicated everywhere. we should attach the
concept of schema at the dataset/operator level, not block/bundle level.
This PR removes schema from blockmetadata, moves it to physicaloperator
level. This should decrease block overhead/serde runtime

I think it would be advantageous to combine the construct of
BlockMetadata + Schema into a 3rd class/named tuple to make it easier to
pass around for certain scenarios, but for now it made it a tuple since
it makes it easier to handle

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Jun 18, 2025
Follow-up to #53454

Closes #53786

Signed-off-by: Matthew Deng <matt@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
bveeramani pushed a commit that referenced this pull request Jun 19, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Consider the following code
```python
ds = ray.data.range(10)
ds = ds.repartition()
ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) doesn't contain schema sometimes???
```
^ This is flakey

```python
ds = ray.data.range(10)
ds = ds.repartition()
# ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) contains schema??? 
```
^ This isn't flakey???

Explanation:
- There are many scenarios where we produce empty blocks (ie, in shuffle
task map where we slice the blocks and send them to reduce).
- Empty blocks in pyarrow still have a schema
- When deduping a schema, we default take the FIRST schema as the source
of truth. However, we should take the first NON-EMPTY schema, because
order is non-deterministic. But that's what i was doing before
#53454. with the check`schema is
None`. Oh wait, it should be `not schema` (this covers `None` and empty
schemas)
- The first block of code failed because the user mapped empty block ->
empty block. In our code, *with a UDF function*, empty blocks have no
schema
- The second block of code secretly succeeded because the user did not
use UDF. Instead, we created empty blocks with the original schema.


I ran it a gazillion times, it should not be flakey anymore
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
minerharry pushed a commit to minerharry/ray that referenced this pull request Jun 27, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Consider the following code
```python
ds = ray.data.range(10)
ds = ds.repartition()
ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) doesn't contain schema sometimes???
```
^ This is flakey

```python
ds = ray.data.range(10)
ds = ds.repartition()
# ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) contains schema??? 
```
^ This isn't flakey???

Explanation:
- There are many scenarios where we produce empty blocks (ie, in shuffle
task map where we slice the blocks and send them to reduce).
- Empty blocks in pyarrow still have a schema
- When deduping a schema, we default take the FIRST schema as the source
of truth. However, we should take the first NON-EMPTY schema, because
order is non-deterministic. But that's what i was doing before
ray-project#53454. with the check`schema is
None`. Oh wait, it should be `not schema` (this covers `None` and empty
schemas)
- The first block of code failed because the user mapped empty block ->
empty block. In our code, *with a UDF function*, empty blocks have no
schema
- The second block of code secretly succeeded because the user did not
use UDF. Instead, we created empty blocks with the original schema.


I ran it a gazillion times, it should not be flakey anymore
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Jul 2, 2025
Currently, each block has a schema. If there are many blocks in a ref
bundle, then that schema is duplicated everywhere. we should attach the
concept of schema at the dataset/operator level, not block/bundle level.
This PR removes schema from blockmetadata, moves it to physicaloperator
level. This should decrease block overhead/serde runtime

I think it would be advantageous to combine the construct of
BlockMetadata + Schema into a 3rd class/named tuple to make it easier to
pass around for certain scenarios, but for now it made it a tuple since
it makes it easier to handle

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Jul 2, 2025
Follow-up to #53454

Closes #53786

Signed-off-by: Matthew Deng <matt@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Jul 2, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Consider the following code
```python
ds = ray.data.range(10)
ds = ds.repartition()
ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) doesn't contain schema sometimes???
```
^ This is flakey

```python
ds = ray.data.range(10)
ds = ds.repartition()
# ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) contains schema???
```
^ This isn't flakey???

Explanation:
- There are many scenarios where we produce empty blocks (ie, in shuffle
task map where we slice the blocks and send them to reduce).
- Empty blocks in pyarrow still have a schema
- When deduping a schema, we default take the FIRST schema as the source
of truth. However, we should take the first NON-EMPTY schema, because
order is non-deterministic. But that's what i was doing before
#53454. with the check`schema is
None`. Oh wait, it should be `not schema` (this covers `None` and empty
schemas)
- The first block of code failed because the user mapped empty block ->
empty block. In our code, *with a UDF function*, empty blocks have no
schema
- The second block of code secretly succeeded because the user did not
use UDF. Instead, we created empty blocks with the original schema.

I ran it a gazillion times, it should not be flakey anymore
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
snorkelopstesting3-bot pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_53949_c2fd26f9-40af-409d-a8b3-21ac5455fce7 that referenced this pull request Oct 22, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Consider the following code
```python
ds = ray.data.range(10)
ds = ds.repartition()
ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) doesn't contain schema sometimes???
```
^ This is flakey

```python
ds = ray.data.range(10)
ds = ds.repartition()
# ds = ds.map_batches(lambda x : x)
it1, it2 = ds.split(2)
# repr(it2) contains schema???
```
^ This isn't flakey???

Explanation:
- There are many scenarios where we produce empty blocks (ie, in shuffle
task map where we slice the blocks and send them to reduce).
- Empty blocks in pyarrow still have a schema
- When deduping a schema, we default take the FIRST schema as the source
of truth. However, we should take the first NON-EMPTY schema, because
order is non-deterministic. But that's what i was doing before
ray-project/ray#53454. with the check`schema is
None`. Oh wait, it should be `not schema` (this covers `None` and empty
schemas)
- The first block of code failed because the user mapped empty block ->
empty block. In our code, *with a UDF function*, empty blocks have no
schema
- The second block of code secretly succeeded because the user did not
use UDF. Instead, we created empty blocks with the original schema.

I ran it a gazillion times, it should not be flakey anymore
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants