Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1142,10 +1142,6 @@ python/ray/data/_internal/execution/interfaces/executor.py
DOC201: Method `Executor.execute` does not have a return section in docstring
--------------------
python/ray/data/_internal/execution/interfaces/physical_operator.py
DOC001: Method `__init__` Potential formatting errors in docstring. Error message: No specification for "Args": ""
DOC001: Function/method `__init__`: Potential formatting errors in docstring. Error message: No specification for "Args": "" (Note: DOC001 could trigger other unrelated violations under this function/method too. Please fix the docstring formatting first.)
DOC101: Method `DataOpTask.__init__`: Docstring contains fewer arguments than in function signature.
DOC103: Method `DataOpTask.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [output_ready_callback: Callable[[RefBundle], None], streaming_gen: ObjectRefGenerator, task_done_callback: Callable[[Optional[Exception]], None], task_index: int, task_resource_bundle: Optional[ExecutionResources]].
DOC201: Method `DataOpTask.on_data_ready` does not have a return section in docstring
DOC001: Method `__init__` Potential formatting errors in docstring. Error message: No specification for "Args": ""
DOC001: Function/method `__init__`: Potential formatting errors in docstring. Error message: No specification for "Args": "" (Note: DOC001 could trigger other unrelated violations under this function/method too. Please fix the docstring formatting first.)
Expand Down Expand Up @@ -1174,10 +1170,6 @@ python/ray/data/_internal/execution/operators/hash_shuffle.py
DOC104: Function `_shuffle_block`: Arguments are the same in the docstring and the function signature, but are in a different order.
DOC105: Function `_shuffle_block`: Argument names match, but type hints in these args do not match: block, input_index, key_columns, pool, block_transformer, send_empty_blocks, override_partition_id
--------------------
python/ray/data/_internal/execution/operators/input_data_buffer.py
DOC101: Method `InputDataBuffer.__init__`: Docstring contains fewer arguments than in function signature.
DOC103: Method `InputDataBuffer.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [data_context: DataContext].
--------------------
python/ray/data/_internal/execution/operators/map_operator.py
DOC103: Method `MapOperator.create`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [data_context: DataContext, map_transformer: MapTransformer]. Arguments in the docstring but not in the function signature: [init_fn: , transform_fn: ].
DOC201: Method `MapOperator.create` does not have a return section in docstring
Expand Down Expand Up @@ -1396,8 +1388,6 @@ python/ray/data/datasource/file_datasink.py
DOC103: Method `BlockBasedFileDatasink.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**file_datasink_kwargs: , min_rows_per_file: Optional[int], path: ].
--------------------
python/ray/data/datasource/file_meta_provider.py
DOC101: Method `FileMetadataProvider._get_block_metadata`: Docstring contains fewer arguments than in function signature.
DOC103: Method `FileMetadataProvider._get_block_metadata`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**kwargs: ].
DOC101: Method `BaseFileMetadataProvider.expand_paths`: Docstring contains fewer arguments than in function signature.
DOC103: Method `BaseFileMetadataProvider.expand_paths`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [partitioning: Optional[Partitioning]].
DOC101: Function `_expand_directory`: Docstring contains fewer arguments than in function signature.
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
BlockColumn,
BlockColumnAccessor,
BlockExecStats,
BlockMetadata,
BlockMetadataWithSchema,
BlockType,
U,
)
Expand Down Expand Up @@ -399,7 +399,7 @@ def sort_and_partition(
@staticmethod
def merge_sorted_blocks(
blocks: List[Block], sort_key: "SortKey"
) -> Tuple[Block, BlockMetadata]:
) -> Tuple[Block, BlockMetadataWithSchema]:
stats = BlockExecStats.builder()
blocks = [b for b in blocks if b.num_rows > 0]
if len(blocks) == 0:
Expand All @@ -409,7 +409,7 @@ def merge_sorted_blocks(
blocks = TableBlockAccessor.normalize_block_types(blocks, BlockType.ARROW)
concat_and_sort = get_concat_and_sort_transform(DataContext.get_current())
ret = concat_and_sort(blocks, sort_key, promote_types=True)
return ret, ArrowBlockAccessor(ret).get_metadata(exec_stats=stats.build())
return ret, BlockMetadataWithSchema.from_block(ret, stats=stats.build())

def block_type(self) -> BlockType:
return BlockType.ARROW
Expand Down
16 changes: 13 additions & 3 deletions python/ray/data/_internal/block_list.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Iterator, List, Tuple
from typing import Iterator, List, Optional, Tuple

from ray.data._internal.memory_tracing import trace_allocation
from ray.data.block import Block, BlockMetadata
from ray.data.block import Block, BlockMetadata, Schema
from ray.types import ObjectRef


Expand All @@ -15,6 +15,7 @@ def __init__(
self,
blocks: List[ObjectRef[Block]],
metadata: List[BlockMetadata],
schema: Optional["Schema"] = None,
*,
owned_by_consumer: bool,
):
Expand All @@ -30,18 +31,27 @@ def __init__(
# This field can be set to indicate the number of estimated output blocks,
# since each read task may produce multiple output blocks after splitting.
self._estimated_num_blocks = None
# The schema of the blocks in this block list. This is optional, and may be None.
self._schema = schema

def __repr__(self):
return f"BlockList(owned_by_consumer={self._owned_by_consumer})"

def get_schema(self) -> Optional["Schema"]:
"""Get the schema for all blocks."""
return self._schema

def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]:
"""Get the metadata for all blocks."""
return self._metadata.copy()

def copy(self) -> "BlockList":
"""Perform a shallow copy of this BlockList."""
return BlockList(
self._blocks, self._metadata, owned_by_consumer=self._owned_by_consumer
self._blocks,
self._metadata,
owned_by_consumer=self._owned_by_consumer,
schema=self._schema,
)

def clear(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def _read_single_partition(stream) -> Block:
metadata = BlockMetadata(
num_rows=None,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,10 @@ def _get_read_task(
BlockMetadata(
num_rows=block_rows,
size_bytes=estimated_size_bytes_per_row * block_rows,
schema=sample_block_schema,
input_files=None,
exec_stats=None,
),
schema=sample_block_schema,
)

if parallelism == 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ def get_read_task(task_index, parallelism):
metadata = BlockMetadata(
num_rows=num_rows,
size_bytes=size_bytes,
schema=None,
input_files=None,
exec_stats=None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
files = files.tolist()
metadata = BlockMetadata(
num_rows=None,
schema=None,
input_files=files,
size_bytes=None,
exec_stats=None,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/datasource/hudi_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def _perform_read(

metadata = BlockMetadata(
num_rows=num_rows,
schema=schema,
input_files=input_files,
size_bytes=size_bytes,
exec_stats=None,
Expand All @@ -77,6 +76,7 @@ def _perform_read(
self._table_uri, paths, reader_options
),
metadata=metadata,
schema=schema,
)
read_tasks.append(read_task)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ def get_read_tasks(
meta = BlockMetadata(
num_rows=None,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,14 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
num_rows=sum(task.file.record_count for task in chunk_tasks)
- position_delete_count,
size_bytes=sum(task.length for task in chunk_tasks),
schema=pya_schema,
input_files=[task.file.file_path for task in chunk_tasks],
exec_stats=None,
)
read_tasks.append(
ReadTask(
read_fn=lambda tasks=chunk_tasks: get_read_task(tasks),
metadata=metadata,
schema=pya_schema,
)
)

Expand Down
3 changes: 1 addition & 2 deletions python/ray/data/_internal/datasource/image_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,12 @@ def _set_encoding_ratio(self, encoding_ratio: int):
def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
rows_per_file: Optional[int],
file_sizes: List[Optional[int]],
) -> BlockMetadata:
metadata = super()._get_block_metadata(
paths, schema, rows_per_file=rows_per_file, file_sizes=file_sizes
paths, rows_per_file=rows_per_file, file_sizes=file_sizes
)
if metadata.size_bytes is not None:
metadata.size_bytes = int(metadata.size_bytes * self._encoding_ratio)
Expand Down
8 changes: 5 additions & 3 deletions python/ray/data/_internal/datasource/lance_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

import numpy as np

from ray.data._internal.util import _check_import, call_with_retry
from ray.data._internal.util import (
_check_import,
call_with_retry,
)
from ray.data.block import BlockMetadata
from ray.data.context import DataContext
from ray.data.datasource.datasource import Datasource, ReadTask
Expand Down Expand Up @@ -71,7 +74,6 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
# TODO(chengsu): Take column projection into consideration for schema.
metadata = BlockMetadata(
num_rows=num_rows,
schema=fragments[0].schema,
input_files=input_files,
size_bytes=None,
exec_stats=None,
Expand All @@ -88,9 +90,9 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
retry_params,
),
metadata,
schema=fragments[0].schema,
)
read_tasks.append(read_task)

return read_tasks

def estimate_inmemory_data_size(self) -> Optional[int]:
Expand Down
1 change: 0 additions & 1 deletion python/ray/data/_internal/datasource/mongo_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def make_block(
metadata = BlockMetadata(
num_rows=partition["count"],
size_bytes=partition["count"] * self._avg_obj_size,
schema=None,
input_files=None,
exec_stats=None,
)
Expand Down
7 changes: 5 additions & 2 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:

meta = self._meta_provider(
paths,
self._inferred_schema,
num_fragments=len(fragments),
prefetched_metadata=metadata,
)
Expand Down Expand Up @@ -418,6 +417,7 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
partitioning,
),
meta,
schema=self._inferred_schema,
)
)

Expand Down Expand Up @@ -726,7 +726,10 @@ def _add_partition_fields_to_schema(
field_type = pa.from_numpy_dtype(partitioning.field_types[field_name])
else:
field_type = pa.string()
schema = schema.append(pa.field(field_name, field_type))
if field_name not in schema.names:
# Without this check, we would add the same partition field multiple times,
# which silently fails when asking for `pa.field()`.
schema = schema.append(pa.field(field_name, field_type))

return schema

Expand Down
3 changes: 1 addition & 2 deletions python/ray/data/_internal/datasource/range_datasource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import builtins
import functools
from copy import copy
from typing import Iterable, List, Optional, Tuple

import numpy as np
Expand Down Expand Up @@ -96,7 +95,6 @@ def make_blocks(
meta = BlockMetadata(
num_rows=count,
size_bytes=8 * count * element_size,
schema=copy(self._schema),
input_files=None,
exec_stats=None,
)
Expand All @@ -106,6 +104,7 @@ def make_blocks(
i, count, target_rows_per_block
),
meta,
schema=self._schema,
)
)
i += block_size
Expand Down
3 changes: 1 addition & 2 deletions python/ray/data/_internal/datasource/sql_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def fallback_read_fn() -> Iterable[Block]:
"Sharding is not supported. "
"Falling back to reading all data in a single task."
)
metadata = BlockMetadata(None, None, None, None, None)
metadata = BlockMetadata(None, None, None, None)
return [ReadTask(fallback_read_fn, metadata)]

tasks = []
Expand All @@ -158,7 +158,6 @@ def fallback_read_fn() -> Iterable[Block]:
metadata = BlockMetadata(
num_rows=num_rows,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)
Expand Down
1 change: 0 additions & 1 deletion python/ray/data/_internal/datasource/torch_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def get_read_tasks(self, parallelism):
# iterating through IterableDataset, which can cause OOM.
num_rows=None,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)
Expand Down
8 changes: 6 additions & 2 deletions python/ray/data/_internal/equalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from ray.data._internal.execution.interfaces import RefBundle
from ray.data._internal.split import _calculate_blocks_rows, _split_at_indices
from ray.data._internal.util import unify_ref_bundles_schema
from ray.data.block import Block, BlockMetadata, BlockPartition
from ray.types import ObjectRef

Expand Down Expand Up @@ -40,7 +41,8 @@ def _equalize(

# phase 2: based on the num rows needed for each shaved split, split the leftovers
# in the shape that exactly matches the rows needed.
leftover_bundle = RefBundle(leftovers, owns_blocks=owned_by_consumer)
schema = unify_ref_bundles_schema(per_split_bundles)
leftover_bundle = RefBundle(leftovers, owns_blocks=owned_by_consumer, schema=schema)
leftover_splits = _split_leftovers(leftover_bundle, per_split_needed_rows)

# phase 3: merge the shaved_splits and leftoever splits and return.
Expand All @@ -54,7 +56,9 @@ def _equalize(
# Compose the result back to RefBundle
equalized_ref_bundles: List[RefBundle] = []
for split in shaved_splits:
equalized_ref_bundles.append(RefBundle(split, owns_blocks=owned_by_consumer))
equalized_ref_bundles.append(
RefBundle(split, owns_blocks=owned_by_consumer, schema=schema)
)
return equalized_ref_bundles


Expand Down
Loading