-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closed
Labels
Description
Over in #9388 we observed dask/dataframe/io/tests/test_parquet.py::test_gpu_write_parquet_simple failed with
09:49:43 E MemoryError: Parquet data was larger than the available GPU memory!
09:49:43 E
09:49:43 E See the notes on split_row_groups in the read_parquet documentation.
09:49:43 E
09:49:43 E Original Error: std::bad_alloc
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/dask_cudf/io/parquet.py:265: MemoryErrorFull treacback:
09:49:43 ________________________ test_gpu_write_parquet_simple _________________________
09:49:43 [gw2] linux -- Python 3.9.13 /opt/conda/envs/dask/bin/python
09:49:43
09:49:43 cls = <class 'dask_cudf.io.parquet.CudfEngine'>
09:49:43 fs = <fsspec.implementations.local.LocalFileSystem object at 0x7ff4bc75f1f0>
09:49:43 pieces = [('/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet', None, None)]
09:49:43 columns = ['a', 'b', '__null_dask_index__'], index = ['__null_dask_index__']
09:49:43 categories = [], partitions = [], partitioning = None
09:49:43 schema = __null_dask_index__: int64 not null
09:49:43 a: string not null
09:49:43 b: string not null
09:49:43 -- schema metadata --
09:49:43 pandas: '{"index_columns": ["__null_dask_index__"], "column_indexes": [{"' + 553
09:49:43 open_file_options = None
09:49:43 kwargs = {'dataset': {'format': <ParquetFileFormat read_options=<ParquetReadOptions dictionary_columns=set() coerce_int96_timestamp_unit=ns>>, 'partitioning': 'hive'}, 'filters': None, 'read': {'open_file_options': {}}}
09:49:43 read_columns = None, ignored = set(), strings_to_cats = False
09:49:43 read_kwargs = {'open_file_options': {}}, check_file_size = 500000000
09:49:43 paths = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 rgs = [None]
09:49:43
09:49:43 @classmethod
09:49:43 def read_partition(
09:49:43 cls,
09:49:43 fs,
09:49:43 pieces,
09:49:43 columns,
09:49:43 index,
09:49:43 categories=(),
09:49:43 partitions=(),
09:49:43 partitioning=None,
09:49:43 schema=None,
09:49:43 open_file_options=None,
09:49:43 **kwargs,
09:49:43 ):
09:49:43
09:49:43 if columns is not None:
09:49:43 columns = [c for c in columns]
09:49:43 if isinstance(index, list):
09:49:43 columns += index
09:49:43
09:49:43 # Check if we are actually selecting any columns
09:49:43 read_columns = columns
09:49:43 if schema and columns:
09:49:43 ignored = set(schema.names) - set(columns)
09:49:43 if not ignored:
09:49:43 read_columns = None
09:49:43
09:49:43 if not isinstance(pieces, list):
09:49:43 pieces = [pieces]
09:49:43
09:49:43 # Extract supported kwargs from `kwargs`
09:49:43 strings_to_cats = kwargs.get("strings_to_categorical", False)
09:49:43 read_kwargs = kwargs.get("read", {})
09:49:43 read_kwargs.update(open_file_options or {})
09:49:43 check_file_size = read_kwargs.pop("check_file_size", None)
09:49:43
09:49:43 # Wrap reading logic in a `try` block so that we can
09:49:43 # inform the user that the `read_parquet` partition
09:49:43 # size is too large for the available memory
09:49:43 try:
09:49:43
09:49:43 # Assume multi-piece read
09:49:43 paths = []
09:49:43 rgs = []
09:49:43 last_partition_keys = None
09:49:43 dfs = []
09:49:43
09:49:43 for i, piece in enumerate(pieces):
09:49:43
09:49:43 (path, row_group, partition_keys) = piece
09:49:43 row_group = None if row_group == [None] else row_group
09:49:43
09:49:43 # File-size check to help "protect" users from change
09:49:43 # to up-stream `split_row_groups` default. We only
09:49:43 # check the file size if this partition corresponds
09:49:43 # to a full file, and `check_file_size` is defined
09:49:43 if check_file_size and len(pieces) == 1 and row_group is None:
09:49:43 file_size = fs.size(path)
09:49:43 if file_size > check_file_size:
09:49:43 warnings.warn(
09:49:43 f"A large parquet file ({file_size}B) is being "
09:49:43 f"used to create a DataFrame partition in "
09:49:43 f"read_parquet. This may cause out of memory "
09:49:43 f"exceptions in operations downstream. See the "
09:49:43 f"notes on split_row_groups in the read_parquet "
09:49:43 f"documentation. Setting split_row_groups "
09:49:43 f"explicitly will silence this warning."
09:49:43 )
09:49:43
09:49:43 if i > 0 and partition_keys != last_partition_keys:
09:49:43 dfs.append(
09:49:43 cls._read_paths(
09:49:43 paths,
09:49:43 fs,
09:49:43 columns=read_columns,
09:49:43 row_groups=rgs if rgs else None,
09:49:43 strings_to_categorical=strings_to_cats,
09:49:43 partitions=partitions,
09:49:43 partitioning=partitioning,
09:49:43 partition_keys=last_partition_keys,
09:49:43 **read_kwargs,
09:49:43 )
09:49:43 )
09:49:43 paths = rgs = []
09:49:43 last_partition_keys = None
09:49:43 paths.append(path)
09:49:43 rgs.append(
09:49:43 [row_group]
09:49:43 if not isinstance(row_group, list)
09:49:43 and row_group is not None
09:49:43 else row_group
09:49:43 )
09:49:43 last_partition_keys = partition_keys
09:49:43
09:49:43 dfs.append(
09:49:43 > cls._read_paths(
09:49:43 paths,
09:49:43 fs,
09:49:43 columns=read_columns,
09:49:43 row_groups=rgs if rgs else None,
09:49:43 strings_to_categorical=strings_to_cats,
09:49:43 partitions=partitions,
09:49:43 partitioning=partitioning,
09:49:43 partition_keys=last_partition_keys,
09:49:43 **read_kwargs,
09:49:43 )
09:49:43 )
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/dask_cudf/io/parquet.py:241:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 cls = <class 'dask_cudf.io.parquet.CudfEngine'>
09:49:43 paths = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 fs = <fsspec.implementations.local.LocalFileSystem object at 0x7ff4bc75f1f0>
09:49:43 columns = None, row_groups = None, strings_to_categorical = False
09:49:43 partitions = [], partitioning = None, partition_keys = None
09:49:43 open_file_options = {}, kwargs = {}
09:49:43 stack = <contextlib.ExitStack object at 0x7ff4bc75f880>
09:49:43 paths_or_fobs = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43
09:49:43 @classmethod
09:49:43 def _read_paths(
09:49:43 cls,
09:49:43 paths,
09:49:43 fs,
09:49:43 columns=None,
09:49:43 row_groups=None,
09:49:43 strings_to_categorical=None,
09:49:43 partitions=None,
09:49:43 partitioning=None,
09:49:43 partition_keys=None,
09:49:43 open_file_options=None,
09:49:43 **kwargs,
09:49:43 ):
09:49:43
09:49:43 # Simplify row_groups if all None
09:49:43 if row_groups == [None for path in paths]:
09:49:43 row_groups = None
09:49:43
09:49:43 with ExitStack() as stack:
09:49:43
09:49:43 # Non-local filesystem handling
09:49:43 paths_or_fobs = paths
09:49:43 if not _is_local_filesystem(fs):
09:49:43 paths_or_fobs = _open_remote_files(
09:49:43 paths_or_fobs,
09:49:43 fs,
09:49:43 context_stack=stack,
09:49:43 **_default_open_file_options(
09:49:43 open_file_options, columns, row_groups
09:49:43 ),
09:49:43 )
09:49:43
09:49:43 # Use cudf to read in data
09:49:43 > df = cudf.read_parquet(
09:49:43 paths_or_fobs,
09:49:43 engine="cudf",
09:49:43 columns=columns,
09:49:43 row_groups=row_groups if row_groups else None,
09:49:43 strings_to_categorical=strings_to_categorical,
09:49:43 **kwargs,
09:49:43 )
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/dask_cudf/io/parquet.py:92:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 args = (['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'],)
09:49:43 kwds = {'columns': None, 'engine': 'cudf', 'row_groups': None, 'strings_to_categorical': False}
09:49:43
09:49:43 @wraps(func)
09:49:43 def inner(*args, **kwds):
09:49:43 with self._recreate_cm():
09:49:43 > return func(*args, **kwds)
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/contextlib.py:79:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 filepath_or_buffer = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 engine = 'cudf', columns = None, filters = None, row_groups = None
09:49:43 strings_to_categorical = False, use_pandas_metadata = True
09:49:43 use_python_file_object = True, categorical_partitions = True
09:49:43 open_file_options = {}, args = (), kwargs = {}
09:49:43 fs = <fsspec.implementations.local.LocalFileSystem object at 0x7ff4bc75f1f0>
09:49:43 paths = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 partition_keys = [], partition_categories = {}
09:49:43 filepaths_or_buffers = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 i = 0
09:49:43 source = '/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'
09:49:43 tmp_source = '/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'
09:49:43 compression = None
09:49:43
09:49:43 @ioutils.doc_read_parquet()
09:49:43 @_cudf_nvtx_annotate
09:49:43 def read_parquet(
09:49:43 filepath_or_buffer,
09:49:43 engine="cudf",
09:49:43 columns=None,
09:49:43 filters=None,
09:49:43 row_groups=None,
09:49:43 strings_to_categorical=False,
09:49:43 use_pandas_metadata=True,
09:49:43 use_python_file_object=True,
09:49:43 categorical_partitions=True,
09:49:43 open_file_options=None,
09:49:43 *args,
09:49:43 **kwargs,
09:49:43 ):
09:49:43 """{docstring}"""
09:49:43
09:49:43 # Do not allow the user to set file-opening options
09:49:43 # when `use_python_file_object=False` is specified
09:49:43 if use_python_file_object is False:
09:49:43 if open_file_options:
09:49:43 raise ValueError(
09:49:43 "open_file_options is not currently supported when "
09:49:43 "use_python_file_object is set to False."
09:49:43 )
09:49:43 open_file_options = {}
09:49:43
09:49:43 # Multiple sources are passed as a list. If a single source is passed,
09:49:43 # wrap it in a list for unified processing downstream.
09:49:43 if not is_list_like(filepath_or_buffer):
09:49:43 filepath_or_buffer = [filepath_or_buffer]
09:49:43
09:49:43 # a list of row groups per source should be passed. make the list of
09:49:43 # lists that is expected for multiple sources
09:49:43 if row_groups is not None:
09:49:43 if not is_list_like(row_groups):
09:49:43 row_groups = [[row_groups]]
09:49:43 elif not is_list_like(row_groups[0]):
09:49:43 row_groups = [row_groups]
09:49:43
09:49:43 # Check columns input
09:49:43 if columns is not None:
09:49:43 if not is_list_like(columns):
09:49:43 raise ValueError("Expected list like for columns")
09:49:43
09:49:43 # Start by trying construct a filesystem object, so we
09:49:43 # can apply filters on remote file-systems
09:49:43 fs, paths = ioutils._get_filesystem_and_paths(filepath_or_buffer, **kwargs)
09:49:43
09:49:43 # Use pyarrow dataset to detect/process directory-partitioned
09:49:43 # data and apply filters. Note that we can only support partitioned
09:49:43 # data and filtering if the input is a single directory or list of
09:49:43 # paths.
09:49:43 partition_keys = []
09:49:43 partition_categories = {}
09:49:43 if fs and paths:
09:49:43 (
09:49:43 paths,
09:49:43 row_groups,
09:49:43 partition_keys,
09:49:43 partition_categories,
09:49:43 ) = _process_dataset(
09:49:43 paths,
09:49:43 fs,
09:49:43 filters=filters,
09:49:43 row_groups=row_groups,
09:49:43 categorical_partitions=categorical_partitions,
09:49:43 )
09:49:43 elif filters is not None:
09:49:43 raise ValueError("cudf cannot apply filters to open file objects.")
09:49:43 filepath_or_buffer = paths if paths else filepath_or_buffer
09:49:43
09:49:43 filepaths_or_buffers = []
09:49:43 if use_python_file_object:
09:49:43 open_file_options = _default_open_file_options(
09:49:43 open_file_options,
09:49:43 columns,
09:49:43 row_groups,
09:49:43 fs=fs,
09:49:43 )
09:49:43 for i, source in enumerate(filepath_or_buffer):
09:49:43 tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
09:49:43 path_or_data=source,
09:49:43 compression=None,
09:49:43 fs=fs,
09:49:43 use_python_file_object=use_python_file_object,
09:49:43 open_file_options=open_file_options,
09:49:43 **kwargs,
09:49:43 )
09:49:43
09:49:43 if compression is not None:
09:49:43 raise ValueError(
09:49:43 "URL content-encoding decompression is not supported"
09:49:43 )
09:49:43 if isinstance(tmp_source, list):
09:49:43 filepath_or_buffer.extend(tmp_source)
09:49:43 else:
09:49:43 filepaths_or_buffers.append(tmp_source)
09:49:43
09:49:43 # Warn user if they are not using cudf for IO
09:49:43 # (There is a good chance this was not the intention)
09:49:43 if engine != "cudf":
09:49:43 warnings.warn(
09:49:43 "Using CPU via PyArrow to read Parquet dataset. "
09:49:43 "This option is both inefficient and unstable!"
09:49:43 )
09:49:43 if filters is not None:
09:49:43 warnings.warn(
09:49:43 "Parquet row-group filtering is only supported with "
09:49:43 "'engine=cudf'. Use pandas or pyarrow API directly "
09:49:43 "for full CPU-based filtering functionality."
09:49:43 )
09:49:43
09:49:43 > return _parquet_to_frame(
09:49:43 filepaths_or_buffers,
09:49:43 engine,
09:49:43 *args,
09:49:43 columns=columns,
09:49:43 row_groups=row_groups,
09:49:43 strings_to_categorical=strings_to_categorical,
09:49:43 use_pandas_metadata=use_pandas_metadata,
09:49:43 partition_keys=partition_keys,
09:49:43 partition_categories=partition_categories,
09:49:43 **kwargs,
09:49:43 )
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/cudf/io/parquet.py:472:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 args = (['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'], 'cudf')
09:49:43 kwds = {'columns': None, 'partition_categories': {}, 'partition_keys': [], 'row_groups': None, ...}
09:49:43
09:49:43 @wraps(func)
09:49:43 def inner(*args, **kwds):
09:49:43 with self._recreate_cm():
09:49:43 > return func(*args, **kwds)
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/contextlib.py:79:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 paths_or_buffers = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 row_groups = None, partition_keys = [], partition_categories = {}
09:49:43 args = ('cudf',)
09:49:43 kwargs = {'columns': None, 'strings_to_categorical': False, 'use_pandas_metadata': True}
09:49:43
09:49:43 @_cudf_nvtx_annotate
09:49:43 def _parquet_to_frame(
09:49:43 paths_or_buffers,
09:49:43 *args,
09:49:43 row_groups=None,
09:49:43 partition_keys=None,
09:49:43 partition_categories=None,
09:49:43 **kwargs,
09:49:43 ):
09:49:43
09:49:43 # If this is not a partitioned read, only need
09:49:43 # one call to `_read_parquet`
09:49:43 if not partition_keys:
09:49:43 > return _read_parquet(
09:49:43 paths_or_buffers,
09:49:43 *args,
09:49:43 row_groups=row_groups,
09:49:43 **kwargs,
09:49:43 )
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/cudf/io/parquet.py:499:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 args = (['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'], 'cudf')
09:49:43 kwds = {'columns': None, 'row_groups': None, 'strings_to_categorical': False, 'use_pandas_metadata': True}
09:49:43
09:49:43 @wraps(func)
09:49:43 def inner(*args, **kwds):
09:49:43 with self._recreate_cm():
09:49:43 > return func(*args, **kwds)
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/contextlib.py:79:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 filepaths_or_buffers = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 engine = 'cudf', columns = None, row_groups = None
09:49:43 strings_to_categorical = False, use_pandas_metadata = True, args = ()
09:49:43 kwargs = {}
09:49:43
09:49:43 @_cudf_nvtx_annotate
09:49:43 def _read_parquet(
09:49:43 filepaths_or_buffers,
09:49:43 engine,
09:49:43 columns=None,
09:49:43 row_groups=None,
09:49:43 strings_to_categorical=None,
09:49:43 use_pandas_metadata=None,
09:49:43 *args,
09:49:43 **kwargs,
09:49:43 ):
09:49:43 # Simple helper function to dispatch between
09:49:43 # cudf and pyarrow to read parquet data
09:49:43 if engine == "cudf":
09:49:43 > return libparquet.read_parquet(
09:49:43 filepaths_or_buffers,
09:49:43 columns=columns,
09:49:43 row_groups=row_groups,
09:49:43 strings_to_categorical=strings_to_categorical,
09:49:43 use_pandas_metadata=use_pandas_metadata,
09:49:43 )
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/cudf/io/parquet.py:574:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 > ???
09:49:43
09:49:43 parquet.pyx:127:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 > ???
09:49:43 E MemoryError: std::bad_alloc
09:49:43
09:49:43 parquet.pyx:186: MemoryError
09:49:43
09:49:43 During handling of the above exception, another exception occurred:
09:49:43
09:49:43 tmpdir = local('/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0')
09:49:43
09:49:43 @pytest.mark.gpu
09:49:43 def test_gpu_write_parquet_simple(tmpdir):
09:49:43 fn = str(tmpdir)
09:49:43 cudf = pytest.importorskip("cudf")
09:49:43 dask_cudf = pytest.importorskip("dask_cudf")
09:49:43 from dask.dataframe.dispatch import pyarrow_schema_dispatch
09:49:43
09:49:43 @pyarrow_schema_dispatch.register((cudf.DataFrame,))
09:49:43 def get_pyarrow_schema_cudf(obj):
09:49:43 return obj.to_arrow().schema
09:49:43
09:49:43 df = cudf.DataFrame(
09:49:43 {
09:49:43 "a": ["abc", "def"],
09:49:43 "b": ["a", "z"],
09:49:43 }
09:49:43 )
09:49:43 ddf = dask_cudf.from_cudf(df, 3)
09:49:43 ddf.to_parquet(fn)
09:49:43 got = dask_cudf.read_parquet(fn)
09:49:43 > assert_eq(df, got)
09:49:43
09:49:43 dask/dataframe/io/tests/test_parquet.py:4233:
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43 dask/dataframe/utils.py:556: in assert_eq
09:49:43 b = _check_dask(
09:49:43 dask/dataframe/utils.py:464: in _check_dask
09:49:43 result = dsk.compute(scheduler=scheduler)
09:49:43 dask/base.py:315: in compute
09:49:43 (result,) = compute(self, traverse=False, **kwargs)
09:49:43 dask/base.py:600: in compute
09:49:43 results = schedule(dsk, keys, **kwargs)
09:49:43 dask/local.py:557: in get_sync
09:49:43 return get_async(
09:49:43 dask/local.py:500: in get_async
09:49:43 for key, res_info, failed in queue_get(queue).result():
09:49:43 /opt/conda/envs/dask/lib/python3.9/concurrent/futures/_base.py:439: in result
09:49:43 return self.__get_result()
09:49:43 /opt/conda/envs/dask/lib/python3.9/concurrent/futures/_base.py:391: in __get_result
09:49:43 raise self._exception
09:49:43 dask/local.py:542: in submit
09:49:43 fut.set_result(fn(*args, **kwargs))
09:49:43 dask/local.py:238: in batch_execute_tasks
09:49:43 return [execute_task(*a) for a in it]
09:49:43 dask/local.py:238: in <listcomp>
09:49:43 return [execute_task(*a) for a in it]
09:49:43 dask/local.py:229: in execute_task
09:49:43 result = pack_exception(e, dumps)
09:49:43 dask/local.py:224: in execute_task
09:49:43 result = _execute_task(task, data)
09:49:43 dask/core.py:119: in _execute_task
09:49:43 return func(*(_execute_task(a, cache) for a in args))
09:49:43 dask/optimization.py:990: in __call__
09:49:43 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
09:49:43 dask/core.py:149: in get
09:49:43 result = _execute_task(task, cache)
09:49:43 dask/core.py:119: in _execute_task
09:49:43 return func(*(_execute_task(a, cache) for a in args))
09:49:43 dask/dataframe/io/parquet/core.py:89: in __call__
09:49:43 return read_parquet_part(
09:49:43 dask/dataframe/io/parquet/core.py:587: in read_parquet_part
09:49:43 dfs = [
09:49:43 dask/dataframe/io/parquet/core.py:588: in <listcomp>
09:49:43 func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
09:49:43
09:49:43 cls = <class 'dask_cudf.io.parquet.CudfEngine'>
09:49:43 fs = <fsspec.implementations.local.LocalFileSystem object at 0x7ff4bc75f1f0>
09:49:43 pieces = [('/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet', None, None)]
09:49:43 columns = ['a', 'b', '__null_dask_index__'], index = ['__null_dask_index__']
09:49:43 categories = [], partitions = [], partitioning = None
09:49:43 schema = __null_dask_index__: int64 not null
09:49:43 a: string not null
09:49:43 b: string not null
09:49:43 -- schema metadata --
09:49:43 pandas: '{"index_columns": ["__null_dask_index__"], "column_indexes": [{"' + 553
09:49:43 open_file_options = None
09:49:43 kwargs = {'dataset': {'format': <ParquetFileFormat read_options=<ParquetReadOptions dictionary_columns=set() coerce_int96_timestamp_unit=ns>>, 'partitioning': 'hive'}, 'filters': None, 'read': {'open_file_options': {}}}
09:49:43 read_columns = None, ignored = set(), strings_to_cats = False
09:49:43 read_kwargs = {'open_file_options': {}}, check_file_size = 500000000
09:49:43 paths = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 rgs = [None]
09:49:43
09:49:43 @classmethod
09:49:43 def read_partition(
09:49:43 cls,
09:49:43 fs,
09:49:43 pieces,
09:49:43 columns,
09:49:43 index,
09:49:43 categories=(),
09:49:43 partitions=(),
09:49:43 partitioning=None,
09:49:43 schema=None,
09:49:43 open_file_options=None,
09:49:43 **kwargs,
09:49:43 ):
09:49:43
09:49:43 if columns is not None:
09:49:43 columns = [c for c in columns]
09:49:43 if isinstance(index, list):
09:49:43 columns += index
09:49:43
09:49:43 # Check if we are actually selecting any columns
09:49:43 read_columns = columns
09:49:43 if schema and columns:
09:49:43 ignored = set(schema.names) - set(columns)
09:49:43 if not ignored:
09:49:43 read_columns = None
09:49:43
09:49:43 if not isinstance(pieces, list):
09:49:43 pieces = [pieces]
09:49:43
09:49:43 # Extract supported kwargs from `kwargs`
09:49:43 strings_to_cats = kwargs.get("strings_to_categorical", False)
09:49:43 read_kwargs = kwargs.get("read", {})
09:49:43 read_kwargs.update(open_file_options or {})
09:49:43 check_file_size = read_kwargs.pop("check_file_size", None)
09:49:43
09:49:43 # Wrap reading logic in a `try` block so that we can
09:49:43 # inform the user that the `read_parquet` partition
09:49:43 # size is too large for the available memory
09:49:43 try:
09:49:43
09:49:43 # Assume multi-piece read
09:49:43 paths = []
09:49:43 rgs = []
09:49:43 last_partition_keys = None
09:49:43 dfs = []
09:49:43
09:49:43 for i, piece in enumerate(pieces):
09:49:43
09:49:43 (path, row_group, partition_keys) = piece
09:49:43 row_group = None if row_group == [None] else row_group
09:49:43
09:49:43 # File-size check to help "protect" users from change
09:49:43 # to up-stream `split_row_groups` default. We only
09:49:43 # check the file size if this partition corresponds
09:49:43 # to a full file, and `check_file_size` is defined
09:49:43 if check_file_size and len(pieces) == 1 and row_group is None:
09:49:43 file_size = fs.size(path)
09:49:43 if file_size > check_file_size:
09:49:43 warnings.warn(
09:49:43 f"A large parquet file ({file_size}B) is being "
09:49:43 f"used to create a DataFrame partition in "
09:49:43 f"read_parquet. This may cause out of memory "
09:49:43 f"exceptions in operations downstream. See the "
09:49:43 f"notes on split_row_groups in the read_parquet "
09:49:43 f"documentation. Setting split_row_groups "
09:49:43 f"explicitly will silence this warning."
09:49:43 )
09:49:43
09:49:43 if i > 0 and partition_keys != last_partition_keys:
09:49:43 dfs.append(
09:49:43 cls._read_paths(
09:49:43 paths,
09:49:43 fs,
09:49:43 columns=read_columns,
09:49:43 row_groups=rgs if rgs else None,
09:49:43 strings_to_categorical=strings_to_cats,
09:49:43 partitions=partitions,
09:49:43 partitioning=partitioning,
09:49:43 partition_keys=last_partition_keys,
09:49:43 **read_kwargs,
09:49:43 )
09:49:43 )
09:49:43 paths = rgs = []
09:49:43 last_partition_keys = None
09:49:43 paths.append(path)
09:49:43 rgs.append(
09:49:43 [row_group]
09:49:43 if not isinstance(row_group, list)
09:49:43 and row_group is not None
09:49:43 else row_group
09:49:43 )
09:49:43 last_partition_keys = partition_keys
09:49:43
09:49:43 dfs.append(
09:49:43 cls._read_paths(
09:49:43 paths,
09:49:43 fs,
09:49:43 columns=read_columns,
09:49:43 row_groups=rgs if rgs else None,
09:49:43 strings_to_categorical=strings_to_cats,
09:49:43 partitions=partitions,
09:49:43 partitioning=partitioning,
09:49:43 partition_keys=last_partition_keys,
09:49:43 **read_kwargs,
09:49:43 )
09:49:43 )
09:49:43 df = cudf.concat(dfs) if len(dfs) > 1 else dfs[0]
09:49:43
09:49:43 # Re-set "object" dtypes align with pa schema
09:49:43 set_object_dtypes_from_pa_schema(df, schema)
09:49:43
09:49:43 if index and (index[0] in df.columns):
09:49:43 df = df.set_index(index[0])
09:49:43 elif index is False and df.index.names != (None,):
09:49:43 # If index=False, we shouldn't have a named index
09:49:43 df.reset_index(inplace=True)
09:49:43
09:49:43 except MemoryError as err:
09:49:43 > raise MemoryError(
09:49:43 "Parquet data was larger than the available GPU memory!\n\n"
09:49:43 "See the notes on split_row_groups in the read_parquet "
09:49:43 "documentation.\n\n"
09:49:43 "Original Error: " + str(err)
09:49:43 )
09:49:43 E MemoryError: Parquet data was larger than the available GPU memory!
09:49:43 E
09:49:43 E See the notes on split_row_groups in the read_parquet documentation.
09:49:43 E
09:49:43 E Original Error: std::bad_alloc
09:49:43
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/dask_cudf/io/parquet.py:265: MemoryErrorReactions are currently unavailable