[Datasets] Streaming executor fixes #5#32951
Conversation
There was a problem hiding this comment.
Hmm it seems like this cached_metadata should always be homogeneous, i.e. it should always contain a BlockMetadata for each element. Do you know how this heterogeneity is happening?
There was a problem hiding this comment.
We set only the first block when figuring out the schema, so e.g. with this:
import ray
inputs = ["example://iris.csv"] * 100
ds = ray.data.read_csv(inputs, parallelism=10)
print("before:", ds._plan._in_blocks._cached_metadata)
ds.schema()
print("after:", ds._plan._in_blocks._cached_metadata)
ds._plan._in_blocks.split(2)
It's producing:
before: [None, None, None, None, None, None, None, None, None, None]
after: [[BlockMetadata(num_rows=1500, size_bytes=66500, schema=sepal.length: double
sepal.width: double
petal.length: double
petal.width: double
variety: string, input_files=array(['/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv',
'/home/ubuntu/ray/python/ray/data/examples/data/iris.csv'],
dtype='<U55'), exec_stats={'wall_time_s': 0.36291642487049103, 'cpu_time_s': 0.3410033880000001, 'node_id': 'f3e389087180baf4bcde82efe3873d1139be957718e69465786af17d'})], None, None, None, None, None, None, None, None, None]
There was a problem hiding this comment.
Could we pull this out into an array split util and use it for the above splits too?
Otherwise it's not clear this is doing the same thing as array split.
There was a problem hiding this comment.
The purpose of this test is testing ExecutionPlan.execute() so we run it directly. Running ds.take() may invoke the new execution backend when the flag is on.
11c5af8 to
90954c5
Compare
90954c5 to
6b7130b
Compare
|
Tests passing (the failures are relevant to this pr). |
Signed-off-by: Jack He <jackhe2345@gmail.com>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: elliottower <elliot@elliottower.com>
Signed-off-by: Jack He <jackhe2345@gmail.com>
Why are these changes needed?
ExecutionPlan.execute()directly (instead ofds.take()) when the purpose is to testExecutionPlan.execute(): if usingds.take()it may run the new execution backend which doesn't invoke the `ExecutionPlan.execute()'Related issue number
#32132
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.