Skip to content

Commit b5b4460

Browse files
authored
Support creating a DatasetPipeline windowed by bytes (#22577)
1 parent 1cedb1b commit b5b4460

File tree

5 files changed

+158
-2
lines changed

5 files changed

+158
-2
lines changed

doc/source/data/advanced-pipelines.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,16 @@ Tune the throughput vs latency of your pipeline with the ``blocks_per_window`` s
106106

107107
.. image:: images/dataset-pipeline-3.svg
108108

109+
You can also specify the size of each window using ``bytes_per_window``. In this mode, Datasets will determine the size of each window based on the target byte size, giving each window at least 1 block but not otherwise exceeding the target bytes per window. This mode can be useful to limit the memory usage of a pipeline. As a rule of thumb, the cluster memory should be at least 2-5x the window size to avoid spilling.
110+
111+
.. code-block:: python
112+
113+
# Create a DatasetPipeline with up to 10GB of data per window.
114+
pipe: DatasetPipeline = ray.data \
115+
.read_binary_files("s3://bucket/image-dir") \
116+
.window(bytes_per_window=10e9)
117+
# -> INFO -- Created DatasetPipeline with 73 windows: 9120MiB min, 9431MiB max, 9287MiB mean
118+
109119
.. _dataset-pipeline-per-epoch-shuffle:
110120

111121
Per-Epoch Shuffle Pipeline

python/ray/data/dataset.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2575,7 +2575,12 @@ def pipeline(self, *, parallelism: int = 10) -> "DatasetPipeline[T]":
25752575
"Use .window(blocks_per_window=n) instead of " ".pipeline(parallelism=n)"
25762576
)
25772577

2578-
def window(self, *, blocks_per_window: int = 10) -> "DatasetPipeline[T]":
2578+
def window(
2579+
self,
2580+
*,
2581+
blocks_per_window: Optional[int] = None,
2582+
bytes_per_window: Optional[int] = None,
2583+
) -> "DatasetPipeline[T]":
25792584
"""Convert this into a DatasetPipeline by windowing over data blocks.
25802585
25812586
Transformations prior to the call to ``window()`` are evaluated in
@@ -2621,9 +2626,19 @@ def window(self, *, blocks_per_window: int = 10) -> "DatasetPipeline[T]":
26212626
increases the latency to initial output, since it decreases the
26222627
length of the pipeline. Setting this to infinity effectively
26232628
disables pipelining.
2629+
bytes_per_window: Specify the window size in bytes instead of blocks.
2630+
This will be treated as an upper bound for the window size, but each
2631+
window will still include at least one block. This is mutually
2632+
exclusive with ``blocks_per_window``.
26242633
"""
26252634
from ray.data.dataset_pipeline import DatasetPipeline
26262635

2636+
if blocks_per_window is not None and bytes_per_window is not None:
2637+
raise ValueError("Only one windowing scheme can be specified.")
2638+
2639+
if blocks_per_window is None:
2640+
blocks_per_window = 10
2641+
26272642
# If optimizations are enabled, rewrite the read stage into a OneToOneStage
26282643
# to enable fusion with downstream map stages.
26292644
ctx = DatasetContext.get_current()
@@ -2656,7 +2671,37 @@ def gen():
26562671

26572672
class Iterable:
26582673
def __init__(self, blocks, epoch):
2659-
self._splits = blocks.split(split_size=blocks_per_window)
2674+
if bytes_per_window:
2675+
self._splits = blocks.split_by_bytes(bytes_per_window)
2676+
else:
2677+
self._splits = blocks.split(split_size=blocks_per_window)
2678+
try:
2679+
sizes = [s.size_bytes() for s in self._splits]
2680+
assert [s > 0 for s in sizes], sizes
2681+
2682+
def fmt(size_bytes):
2683+
if size_bytes > 10 * 1024:
2684+
return "{}MiB".format(round(size_bytes / (1024 * 1024), 2))
2685+
else:
2686+
return "{}b".format(size_bytes)
2687+
2688+
logger.info(
2689+
"Created DatasetPipeline with {} windows: "
2690+
"{} min, {} max, {} mean".format(
2691+
len(self._splits),
2692+
fmt(min(sizes)),
2693+
fmt(max(sizes)),
2694+
fmt(int(np.mean(sizes))),
2695+
)
2696+
)
2697+
except Exception as e:
2698+
logger.info(
2699+
"Created DatasetPipeline with {} windows; "
2700+
"error getting sizes: {}".format(
2701+
len(self._splits),
2702+
e,
2703+
)
2704+
)
26602705
self._epoch = epoch
26612706

26622707
def __iter__(self):

python/ray/data/impl/block_list.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,48 @@ def split(self, split_size: int) -> List["BlockList"]:
6565
output.append(BlockList(b.tolist(), m.tolist()))
6666
return output
6767

68+
def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
69+
"""Split this BlockList into multiple lists.
70+
71+
Args:
72+
bytes_per_split: The max number of bytes per split.
73+
"""
74+
self._check_if_cleared()
75+
output = []
76+
cur_blocks = []
77+
cur_meta = []
78+
cur_size = 0
79+
for b, m in zip(self._blocks, self._metadata):
80+
if m.size_bytes is None:
81+
raise RuntimeError(
82+
"Block has unknown size, cannot use split_by_bytes()"
83+
)
84+
size = m.size_bytes
85+
if cur_blocks and cur_size + size > bytes_per_split:
86+
output.append(BlockList(cur_blocks, cur_meta))
87+
cur_blocks = []
88+
cur_meta = []
89+
cur_size = 0
90+
cur_blocks.append(b)
91+
cur_meta.append(m)
92+
cur_size += size
93+
if cur_blocks:
94+
output.append(BlockList(cur_blocks, cur_meta))
95+
return output
96+
97+
def size_bytes(self) -> int:
98+
"""Returns the total size in bytes of the blocks, or -1 if not known."""
99+
size = 0
100+
has_size = False
101+
for m in self.get_metadata():
102+
if m.size_bytes is not None:
103+
has_size = True
104+
size += m.size_bytes
105+
if not has_size:
106+
return -1
107+
else:
108+
return size
109+
68110
def divide(self, block_idx: int) -> ("BlockList", "BlockList"):
69111
"""Divide into two BlockLists by the given block index.
70112

python/ray/data/impl/lazy_block_list.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,30 @@ def split(self, split_size: int) -> List["LazyBlockList"]:
6767
output.append(LazyBlockList(c.tolist(), m.tolist(), b.tolist()))
6868
return output
6969

70+
# Note: does not force execution prior to splitting.
71+
def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
72+
self._check_if_cleared()
73+
output = []
74+
cur_calls, cur_meta, cur_blocks = [], [], []
75+
cur_size = 0
76+
for c, m, b in zip(self._calls, self._metadata, self._block_partitions):
77+
if m.size_bytes is None:
78+
raise RuntimeError(
79+
"Block has unknown size, cannot use split_by_bytes()"
80+
)
81+
size = m.size_bytes
82+
if cur_blocks and cur_size + size > bytes_per_split:
83+
output.append(LazyBlockList(cur_calls, cur_meta, cur_blocks))
84+
cur_calls, cur_meta, cur_blocks = [], [], []
85+
cur_size = 0
86+
cur_calls.append(c)
87+
cur_meta.append(m)
88+
cur_blocks.append(b)
89+
cur_size += size
90+
if cur_blocks:
91+
output.append(LazyBlockList(cur_calls, cur_meta, cur_blocks))
92+
return output
93+
7094
# Note: does not force execution prior to division.
7195
def divide(self, part_idx: int) -> ("LazyBlockList", "LazyBlockList"):
7296
left = LazyBlockList(

python/ray/data/tests/test_dataset_pipeline.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,41 @@ def block_on_ones(x: int) -> int:
3838
assert pipe.take(1) == [0]
3939

4040

41+
def test_window_by_bytes(ray_start_regular_shared):
42+
with pytest.raises(ValueError):
43+
ray.data.range_arrow(10).window(blocks_per_window=2, bytes_per_window=2)
44+
45+
pipe = ray.data.range_arrow(10000000, parallelism=100).window(blocks_per_window=2)
46+
assert str(pipe) == "DatasetPipeline(num_windows=50, num_stages=1)"
47+
48+
pipe = ray.data.range_arrow(10000000, parallelism=100).window(
49+
bytes_per_window=10 * 1024 * 1024
50+
)
51+
assert str(pipe) == "DatasetPipeline(num_windows=8, num_stages=1)"
52+
dss = list(pipe.iter_datasets())
53+
assert len(dss) == 8, dss
54+
for ds in dss[:-1]:
55+
assert ds.num_blocks() in [12, 13]
56+
57+
pipe = ray.data.range_arrow(10000000, parallelism=100).window(bytes_per_window=1)
58+
assert str(pipe) == "DatasetPipeline(num_windows=100, num_stages=1)"
59+
for ds in pipe.iter_datasets():
60+
assert ds.num_blocks() == 1
61+
62+
pipe = ray.data.range_arrow(10000000, parallelism=100).window(bytes_per_window=1e9)
63+
assert str(pipe) == "DatasetPipeline(num_windows=1, num_stages=1)"
64+
for ds in pipe.iter_datasets():
65+
assert ds.num_blocks() == 100
66+
67+
# Test creating from non-lazy BlockList.
68+
pipe = (
69+
ray.data.range_arrow(10000000, parallelism=100)
70+
.map_batches(lambda x: x)
71+
.window(bytes_per_window=10 * 1024 * 1024)
72+
)
73+
assert str(pipe) == "DatasetPipeline(num_windows=8, num_stages=1)"
74+
75+
4176
def test_epoch(ray_start_regular_shared):
4277
# Test dataset repeat.
4378
pipe = ray.data.range(5).map(lambda x: x * 2).repeat(3).map(lambda x: x * 2)

0 commit comments

Comments
 (0)