Skip to content

test_gpu_write_parquet_simple failure on gpuCI #9391

@jrbourbeau

Description

@jrbourbeau

Over in #9388 we observed dask/dataframe/io/tests/test_parquet.py::test_gpu_write_parquet_simple failed with

09:49:43 E           MemoryError: Parquet data was larger than the available GPU memory!
09:49:43 E           
09:49:43 E           See the notes on split_row_groups in the read_parquet documentation.
09:49:43 E           
09:49:43 E           Original Error: std::bad_alloc
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/dask_cudf/io/parquet.py:265: MemoryError

in this gpuCI build

Full treacback:
09:49:43 ________________________ test_gpu_write_parquet_simple _________________________
09:49:43 [gw2] linux -- Python 3.9.13 /opt/conda/envs/dask/bin/python
09:49:43 
09:49:43 cls = <class 'dask_cudf.io.parquet.CudfEngine'>
09:49:43 fs = <fsspec.implementations.local.LocalFileSystem object at 0x7ff4bc75f1f0>
09:49:43 pieces = [('/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet', None, None)]
09:49:43 columns = ['a', 'b', '__null_dask_index__'], index = ['__null_dask_index__']
09:49:43 categories = [], partitions = [], partitioning = None
09:49:43 schema = __null_dask_index__: int64 not null
09:49:43 a: string not null
09:49:43 b: string not null
09:49:43 -- schema metadata --
09:49:43 pandas: '{"index_columns": ["__null_dask_index__"], "column_indexes": [{"' + 553
09:49:43 open_file_options = None
09:49:43 kwargs = {'dataset': {'format': <ParquetFileFormat read_options=<ParquetReadOptions dictionary_columns=set() coerce_int96_timestamp_unit=ns>>, 'partitioning': 'hive'}, 'filters': None, 'read': {'open_file_options': {}}}
09:49:43 read_columns = None, ignored = set(), strings_to_cats = False
09:49:43 read_kwargs = {'open_file_options': {}}, check_file_size = 500000000
09:49:43 paths = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 rgs = [None]
09:49:43 
09:49:43     @classmethod
09:49:43     def read_partition(
09:49:43         cls,
09:49:43         fs,
09:49:43         pieces,
09:49:43         columns,
09:49:43         index,
09:49:43         categories=(),
09:49:43         partitions=(),
09:49:43         partitioning=None,
09:49:43         schema=None,
09:49:43         open_file_options=None,
09:49:43         **kwargs,
09:49:43     ):
09:49:43     
09:49:43         if columns is not None:
09:49:43             columns = [c for c in columns]
09:49:43         if isinstance(index, list):
09:49:43             columns += index
09:49:43     
09:49:43         # Check if we are actually selecting any columns
09:49:43         read_columns = columns
09:49:43         if schema and columns:
09:49:43             ignored = set(schema.names) - set(columns)
09:49:43             if not ignored:
09:49:43                 read_columns = None
09:49:43     
09:49:43         if not isinstance(pieces, list):
09:49:43             pieces = [pieces]
09:49:43     
09:49:43         # Extract supported kwargs from `kwargs`
09:49:43         strings_to_cats = kwargs.get("strings_to_categorical", False)
09:49:43         read_kwargs = kwargs.get("read", {})
09:49:43         read_kwargs.update(open_file_options or {})
09:49:43         check_file_size = read_kwargs.pop("check_file_size", None)
09:49:43     
09:49:43         # Wrap reading logic in a `try` block so that we can
09:49:43         # inform the user that the `read_parquet` partition
09:49:43         # size is too large for the available memory
09:49:43         try:
09:49:43     
09:49:43             # Assume multi-piece read
09:49:43             paths = []
09:49:43             rgs = []
09:49:43             last_partition_keys = None
09:49:43             dfs = []
09:49:43     
09:49:43             for i, piece in enumerate(pieces):
09:49:43     
09:49:43                 (path, row_group, partition_keys) = piece
09:49:43                 row_group = None if row_group == [None] else row_group
09:49:43     
09:49:43                 # File-size check to help "protect" users from change
09:49:43                 # to up-stream `split_row_groups` default. We only
09:49:43                 # check the file size if this partition corresponds
09:49:43                 # to a full file, and `check_file_size` is defined
09:49:43                 if check_file_size and len(pieces) == 1 and row_group is None:
09:49:43                     file_size = fs.size(path)
09:49:43                     if file_size > check_file_size:
09:49:43                         warnings.warn(
09:49:43                             f"A large parquet file ({file_size}B) is being "
09:49:43                             f"used to create a DataFrame partition in "
09:49:43                             f"read_parquet. This may cause out of memory "
09:49:43                             f"exceptions in operations downstream. See the "
09:49:43                             f"notes on split_row_groups in the read_parquet "
09:49:43                             f"documentation. Setting split_row_groups "
09:49:43                             f"explicitly will silence this warning."
09:49:43                         )
09:49:43     
09:49:43                 if i > 0 and partition_keys != last_partition_keys:
09:49:43                     dfs.append(
09:49:43                         cls._read_paths(
09:49:43                             paths,
09:49:43                             fs,
09:49:43                             columns=read_columns,
09:49:43                             row_groups=rgs if rgs else None,
09:49:43                             strings_to_categorical=strings_to_cats,
09:49:43                             partitions=partitions,
09:49:43                             partitioning=partitioning,
09:49:43                             partition_keys=last_partition_keys,
09:49:43                             **read_kwargs,
09:49:43                         )
09:49:43                     )
09:49:43                     paths = rgs = []
09:49:43                     last_partition_keys = None
09:49:43                 paths.append(path)
09:49:43                 rgs.append(
09:49:43                     [row_group]
09:49:43                     if not isinstance(row_group, list)
09:49:43                     and row_group is not None
09:49:43                     else row_group
09:49:43                 )
09:49:43                 last_partition_keys = partition_keys
09:49:43     
09:49:43             dfs.append(
09:49:43 >               cls._read_paths(
09:49:43                     paths,
09:49:43                     fs,
09:49:43                     columns=read_columns,
09:49:43                     row_groups=rgs if rgs else None,
09:49:43                     strings_to_categorical=strings_to_cats,
09:49:43                     partitions=partitions,
09:49:43                     partitioning=partitioning,
09:49:43                     partition_keys=last_partition_keys,
09:49:43                     **read_kwargs,
09:49:43                 )
09:49:43             )
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/dask_cudf/io/parquet.py:241: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 cls = <class 'dask_cudf.io.parquet.CudfEngine'>
09:49:43 paths = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 fs = <fsspec.implementations.local.LocalFileSystem object at 0x7ff4bc75f1f0>
09:49:43 columns = None, row_groups = None, strings_to_categorical = False
09:49:43 partitions = [], partitioning = None, partition_keys = None
09:49:43 open_file_options = {}, kwargs = {}
09:49:43 stack = <contextlib.ExitStack object at 0x7ff4bc75f880>
09:49:43 paths_or_fobs = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 
09:49:43     @classmethod
09:49:43     def _read_paths(
09:49:43         cls,
09:49:43         paths,
09:49:43         fs,
09:49:43         columns=None,
09:49:43         row_groups=None,
09:49:43         strings_to_categorical=None,
09:49:43         partitions=None,
09:49:43         partitioning=None,
09:49:43         partition_keys=None,
09:49:43         open_file_options=None,
09:49:43         **kwargs,
09:49:43     ):
09:49:43     
09:49:43         # Simplify row_groups if all None
09:49:43         if row_groups == [None for path in paths]:
09:49:43             row_groups = None
09:49:43     
09:49:43         with ExitStack() as stack:
09:49:43     
09:49:43             # Non-local filesystem handling
09:49:43             paths_or_fobs = paths
09:49:43             if not _is_local_filesystem(fs):
09:49:43                 paths_or_fobs = _open_remote_files(
09:49:43                     paths_or_fobs,
09:49:43                     fs,
09:49:43                     context_stack=stack,
09:49:43                     **_default_open_file_options(
09:49:43                         open_file_options, columns, row_groups
09:49:43                     ),
09:49:43                 )
09:49:43     
09:49:43             # Use cudf to read in data
09:49:43 >           df = cudf.read_parquet(
09:49:43                 paths_or_fobs,
09:49:43                 engine="cudf",
09:49:43                 columns=columns,
09:49:43                 row_groups=row_groups if row_groups else None,
09:49:43                 strings_to_categorical=strings_to_categorical,
09:49:43                 **kwargs,
09:49:43             )
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/dask_cudf/io/parquet.py:92: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 args = (['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'],)
09:49:43 kwds = {'columns': None, 'engine': 'cudf', 'row_groups': None, 'strings_to_categorical': False}
09:49:43 
09:49:43     @wraps(func)
09:49:43     def inner(*args, **kwds):
09:49:43         with self._recreate_cm():
09:49:43 >           return func(*args, **kwds)
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/contextlib.py:79: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 filepath_or_buffer = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 engine = 'cudf', columns = None, filters = None, row_groups = None
09:49:43 strings_to_categorical = False, use_pandas_metadata = True
09:49:43 use_python_file_object = True, categorical_partitions = True
09:49:43 open_file_options = {}, args = (), kwargs = {}
09:49:43 fs = <fsspec.implementations.local.LocalFileSystem object at 0x7ff4bc75f1f0>
09:49:43 paths = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 partition_keys = [], partition_categories = {}
09:49:43 filepaths_or_buffers = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 i = 0
09:49:43 source = '/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'
09:49:43 tmp_source = '/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'
09:49:43 compression = None
09:49:43 
09:49:43     @ioutils.doc_read_parquet()
09:49:43     @_cudf_nvtx_annotate
09:49:43     def read_parquet(
09:49:43         filepath_or_buffer,
09:49:43         engine="cudf",
09:49:43         columns=None,
09:49:43         filters=None,
09:49:43         row_groups=None,
09:49:43         strings_to_categorical=False,
09:49:43         use_pandas_metadata=True,
09:49:43         use_python_file_object=True,
09:49:43         categorical_partitions=True,
09:49:43         open_file_options=None,
09:49:43         *args,
09:49:43         **kwargs,
09:49:43     ):
09:49:43         """{docstring}"""
09:49:43     
09:49:43         # Do not allow the user to set file-opening options
09:49:43         # when `use_python_file_object=False` is specified
09:49:43         if use_python_file_object is False:
09:49:43             if open_file_options:
09:49:43                 raise ValueError(
09:49:43                     "open_file_options is not currently supported when "
09:49:43                     "use_python_file_object is set to False."
09:49:43                 )
09:49:43             open_file_options = {}
09:49:43     
09:49:43         # Multiple sources are passed as a list. If a single source is passed,
09:49:43         # wrap it in a list for unified processing downstream.
09:49:43         if not is_list_like(filepath_or_buffer):
09:49:43             filepath_or_buffer = [filepath_or_buffer]
09:49:43     
09:49:43         # a list of row groups per source should be passed. make the list of
09:49:43         # lists that is expected for multiple sources
09:49:43         if row_groups is not None:
09:49:43             if not is_list_like(row_groups):
09:49:43                 row_groups = [[row_groups]]
09:49:43             elif not is_list_like(row_groups[0]):
09:49:43                 row_groups = [row_groups]
09:49:43     
09:49:43         # Check columns input
09:49:43         if columns is not None:
09:49:43             if not is_list_like(columns):
09:49:43                 raise ValueError("Expected list like for columns")
09:49:43     
09:49:43         # Start by trying construct a filesystem object, so we
09:49:43         # can apply filters on remote file-systems
09:49:43         fs, paths = ioutils._get_filesystem_and_paths(filepath_or_buffer, **kwargs)
09:49:43     
09:49:43         # Use pyarrow dataset to detect/process directory-partitioned
09:49:43         # data and apply filters. Note that we can only support partitioned
09:49:43         # data and filtering if the input is a single directory or list of
09:49:43         # paths.
09:49:43         partition_keys = []
09:49:43         partition_categories = {}
09:49:43         if fs and paths:
09:49:43             (
09:49:43                 paths,
09:49:43                 row_groups,
09:49:43                 partition_keys,
09:49:43                 partition_categories,
09:49:43             ) = _process_dataset(
09:49:43                 paths,
09:49:43                 fs,
09:49:43                 filters=filters,
09:49:43                 row_groups=row_groups,
09:49:43                 categorical_partitions=categorical_partitions,
09:49:43             )
09:49:43         elif filters is not None:
09:49:43             raise ValueError("cudf cannot apply filters to open file objects.")
09:49:43         filepath_or_buffer = paths if paths else filepath_or_buffer
09:49:43     
09:49:43         filepaths_or_buffers = []
09:49:43         if use_python_file_object:
09:49:43             open_file_options = _default_open_file_options(
09:49:43                 open_file_options,
09:49:43                 columns,
09:49:43                 row_groups,
09:49:43                 fs=fs,
09:49:43             )
09:49:43         for i, source in enumerate(filepath_or_buffer):
09:49:43             tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
09:49:43                 path_or_data=source,
09:49:43                 compression=None,
09:49:43                 fs=fs,
09:49:43                 use_python_file_object=use_python_file_object,
09:49:43                 open_file_options=open_file_options,
09:49:43                 **kwargs,
09:49:43             )
09:49:43     
09:49:43             if compression is not None:
09:49:43                 raise ValueError(
09:49:43                     "URL content-encoding decompression is not supported"
09:49:43                 )
09:49:43             if isinstance(tmp_source, list):
09:49:43                 filepath_or_buffer.extend(tmp_source)
09:49:43             else:
09:49:43                 filepaths_or_buffers.append(tmp_source)
09:49:43     
09:49:43         # Warn user if they are not using cudf for IO
09:49:43         # (There is a good chance this was not the intention)
09:49:43         if engine != "cudf":
09:49:43             warnings.warn(
09:49:43                 "Using CPU via PyArrow to read Parquet dataset. "
09:49:43                 "This option is both inefficient and unstable!"
09:49:43             )
09:49:43             if filters is not None:
09:49:43                 warnings.warn(
09:49:43                     "Parquet row-group filtering is only supported with "
09:49:43                     "'engine=cudf'. Use pandas or pyarrow API directly "
09:49:43                     "for full CPU-based filtering functionality."
09:49:43                 )
09:49:43     
09:49:43 >       return _parquet_to_frame(
09:49:43             filepaths_or_buffers,
09:49:43             engine,
09:49:43             *args,
09:49:43             columns=columns,
09:49:43             row_groups=row_groups,
09:49:43             strings_to_categorical=strings_to_categorical,
09:49:43             use_pandas_metadata=use_pandas_metadata,
09:49:43             partition_keys=partition_keys,
09:49:43             partition_categories=partition_categories,
09:49:43             **kwargs,
09:49:43         )
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/cudf/io/parquet.py:472: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 args = (['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'], 'cudf')
09:49:43 kwds = {'columns': None, 'partition_categories': {}, 'partition_keys': [], 'row_groups': None, ...}
09:49:43 
09:49:43     @wraps(func)
09:49:43     def inner(*args, **kwds):
09:49:43         with self._recreate_cm():
09:49:43 >           return func(*args, **kwds)
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/contextlib.py:79: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 paths_or_buffers = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 row_groups = None, partition_keys = [], partition_categories = {}
09:49:43 args = ('cudf',)
09:49:43 kwargs = {'columns': None, 'strings_to_categorical': False, 'use_pandas_metadata': True}
09:49:43 
09:49:43     @_cudf_nvtx_annotate
09:49:43     def _parquet_to_frame(
09:49:43         paths_or_buffers,
09:49:43         *args,
09:49:43         row_groups=None,
09:49:43         partition_keys=None,
09:49:43         partition_categories=None,
09:49:43         **kwargs,
09:49:43     ):
09:49:43     
09:49:43         # If this is not a partitioned read, only need
09:49:43         # one call to `_read_parquet`
09:49:43         if not partition_keys:
09:49:43 >           return _read_parquet(
09:49:43                 paths_or_buffers,
09:49:43                 *args,
09:49:43                 row_groups=row_groups,
09:49:43                 **kwargs,
09:49:43             )
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/cudf/io/parquet.py:499: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 args = (['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet'], 'cudf')
09:49:43 kwds = {'columns': None, 'row_groups': None, 'strings_to_categorical': False, 'use_pandas_metadata': True}
09:49:43 
09:49:43     @wraps(func)
09:49:43     def inner(*args, **kwds):
09:49:43         with self._recreate_cm():
09:49:43 >           return func(*args, **kwds)
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/contextlib.py:79: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 filepaths_or_buffers = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 engine = 'cudf', columns = None, row_groups = None
09:49:43 strings_to_categorical = False, use_pandas_metadata = True, args = ()
09:49:43 kwargs = {}
09:49:43 
09:49:43     @_cudf_nvtx_annotate
09:49:43     def _read_parquet(
09:49:43         filepaths_or_buffers,
09:49:43         engine,
09:49:43         columns=None,
09:49:43         row_groups=None,
09:49:43         strings_to_categorical=None,
09:49:43         use_pandas_metadata=None,
09:49:43         *args,
09:49:43         **kwargs,
09:49:43     ):
09:49:43         # Simple helper function to dispatch between
09:49:43         # cudf and pyarrow to read parquet data
09:49:43         if engine == "cudf":
09:49:43 >           return libparquet.read_parquet(
09:49:43                 filepaths_or_buffers,
09:49:43                 columns=columns,
09:49:43                 row_groups=row_groups,
09:49:43                 strings_to_categorical=strings_to_categorical,
09:49:43                 use_pandas_metadata=use_pandas_metadata,
09:49:43             )
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/cudf/io/parquet.py:574: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 >   ???
09:49:43 
09:49:43 parquet.pyx:127: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 >   ???
09:49:43 E   MemoryError: std::bad_alloc
09:49:43 
09:49:43 parquet.pyx:186: MemoryError
09:49:43 
09:49:43 During handling of the above exception, another exception occurred:
09:49:43 
09:49:43 tmpdir = local('/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0')
09:49:43 
09:49:43     @pytest.mark.gpu
09:49:43     def test_gpu_write_parquet_simple(tmpdir):
09:49:43         fn = str(tmpdir)
09:49:43         cudf = pytest.importorskip("cudf")
09:49:43         dask_cudf = pytest.importorskip("dask_cudf")
09:49:43         from dask.dataframe.dispatch import pyarrow_schema_dispatch
09:49:43     
09:49:43         @pyarrow_schema_dispatch.register((cudf.DataFrame,))
09:49:43         def get_pyarrow_schema_cudf(obj):
09:49:43             return obj.to_arrow().schema
09:49:43     
09:49:43         df = cudf.DataFrame(
09:49:43             {
09:49:43                 "a": ["abc", "def"],
09:49:43                 "b": ["a", "z"],
09:49:43             }
09:49:43         )
09:49:43         ddf = dask_cudf.from_cudf(df, 3)
09:49:43         ddf.to_parquet(fn)
09:49:43         got = dask_cudf.read_parquet(fn)
09:49:43 >       assert_eq(df, got)
09:49:43 
09:49:43 dask/dataframe/io/tests/test_parquet.py:4233: 
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 dask/dataframe/utils.py:556: in assert_eq
09:49:43     b = _check_dask(
09:49:43 dask/dataframe/utils.py:464: in _check_dask
09:49:43     result = dsk.compute(scheduler=scheduler)
09:49:43 dask/base.py:315: in compute
09:49:43     (result,) = compute(self, traverse=False, **kwargs)
09:49:43 dask/base.py:600: in compute
09:49:43     results = schedule(dsk, keys, **kwargs)
09:49:43 dask/local.py:557: in get_sync
09:49:43     return get_async(
09:49:43 dask/local.py:500: in get_async
09:49:43     for key, res_info, failed in queue_get(queue).result():
09:49:43 /opt/conda/envs/dask/lib/python3.9/concurrent/futures/_base.py:439: in result
09:49:43     return self.__get_result()
09:49:43 /opt/conda/envs/dask/lib/python3.9/concurrent/futures/_base.py:391: in __get_result
09:49:43     raise self._exception
09:49:43 dask/local.py:542: in submit
09:49:43     fut.set_result(fn(*args, **kwargs))
09:49:43 dask/local.py:238: in batch_execute_tasks
09:49:43     return [execute_task(*a) for a in it]
09:49:43 dask/local.py:238: in <listcomp>
09:49:43     return [execute_task(*a) for a in it]
09:49:43 dask/local.py:229: in execute_task
09:49:43     result = pack_exception(e, dumps)
09:49:43 dask/local.py:224: in execute_task
09:49:43     result = _execute_task(task, data)
09:49:43 dask/core.py:119: in _execute_task
09:49:43     return func(*(_execute_task(a, cache) for a in args))
09:49:43 dask/optimization.py:990: in __call__
09:49:43     return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
09:49:43 dask/core.py:149: in get
09:49:43     result = _execute_task(task, cache)
09:49:43 dask/core.py:119: in _execute_task
09:49:43     return func(*(_execute_task(a, cache) for a in args))
09:49:43 dask/dataframe/io/parquet/core.py:89: in __call__
09:49:43     return read_parquet_part(
09:49:43 dask/dataframe/io/parquet/core.py:587: in read_parquet_part
09:49:43     dfs = [
09:49:43 dask/dataframe/io/parquet/core.py:588: in <listcomp>
09:49:43     func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
09:49:43 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
09:49:43 
09:49:43 cls = <class 'dask_cudf.io.parquet.CudfEngine'>
09:49:43 fs = <fsspec.implementations.local.LocalFileSystem object at 0x7ff4bc75f1f0>
09:49:43 pieces = [('/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet', None, None)]
09:49:43 columns = ['a', 'b', '__null_dask_index__'], index = ['__null_dask_index__']
09:49:43 categories = [], partitions = [], partitioning = None
09:49:43 schema = __null_dask_index__: int64 not null
09:49:43 a: string not null
09:49:43 b: string not null
09:49:43 -- schema metadata --
09:49:43 pandas: '{"index_columns": ["__null_dask_index__"], "column_indexes": [{"' + 553
09:49:43 open_file_options = None
09:49:43 kwargs = {'dataset': {'format': <ParquetFileFormat read_options=<ParquetReadOptions dictionary_columns=set() coerce_int96_timestamp_unit=ns>>, 'partitioning': 'hive'}, 'filters': None, 'read': {'open_file_options': {}}}
09:49:43 read_columns = None, ignored = set(), strings_to_cats = False
09:49:43 read_kwargs = {'open_file_options': {}}, check_file_size = 500000000
09:49:43 paths = ['/tmp/pytest-of-jenkins/pytest-589/popen-gw2/test_gpu_write_parquet_simple0/part.0.parquet']
09:49:43 rgs = [None]
09:49:43 
09:49:43     @classmethod
09:49:43     def read_partition(
09:49:43         cls,
09:49:43         fs,
09:49:43         pieces,
09:49:43         columns,
09:49:43         index,
09:49:43         categories=(),
09:49:43         partitions=(),
09:49:43         partitioning=None,
09:49:43         schema=None,
09:49:43         open_file_options=None,
09:49:43         **kwargs,
09:49:43     ):
09:49:43     
09:49:43         if columns is not None:
09:49:43             columns = [c for c in columns]
09:49:43         if isinstance(index, list):
09:49:43             columns += index
09:49:43     
09:49:43         # Check if we are actually selecting any columns
09:49:43         read_columns = columns
09:49:43         if schema and columns:
09:49:43             ignored = set(schema.names) - set(columns)
09:49:43             if not ignored:
09:49:43                 read_columns = None
09:49:43     
09:49:43         if not isinstance(pieces, list):
09:49:43             pieces = [pieces]
09:49:43     
09:49:43         # Extract supported kwargs from `kwargs`
09:49:43         strings_to_cats = kwargs.get("strings_to_categorical", False)
09:49:43         read_kwargs = kwargs.get("read", {})
09:49:43         read_kwargs.update(open_file_options or {})
09:49:43         check_file_size = read_kwargs.pop("check_file_size", None)
09:49:43     
09:49:43         # Wrap reading logic in a `try` block so that we can
09:49:43         # inform the user that the `read_parquet` partition
09:49:43         # size is too large for the available memory
09:49:43         try:
09:49:43     
09:49:43             # Assume multi-piece read
09:49:43             paths = []
09:49:43             rgs = []
09:49:43             last_partition_keys = None
09:49:43             dfs = []
09:49:43     
09:49:43             for i, piece in enumerate(pieces):
09:49:43     
09:49:43                 (path, row_group, partition_keys) = piece
09:49:43                 row_group = None if row_group == [None] else row_group
09:49:43     
09:49:43                 # File-size check to help "protect" users from change
09:49:43                 # to up-stream `split_row_groups` default. We only
09:49:43                 # check the file size if this partition corresponds
09:49:43                 # to a full file, and `check_file_size` is defined
09:49:43                 if check_file_size and len(pieces) == 1 and row_group is None:
09:49:43                     file_size = fs.size(path)
09:49:43                     if file_size > check_file_size:
09:49:43                         warnings.warn(
09:49:43                             f"A large parquet file ({file_size}B) is being "
09:49:43                             f"used to create a DataFrame partition in "
09:49:43                             f"read_parquet. This may cause out of memory "
09:49:43                             f"exceptions in operations downstream. See the "
09:49:43                             f"notes on split_row_groups in the read_parquet "
09:49:43                             f"documentation. Setting split_row_groups "
09:49:43                             f"explicitly will silence this warning."
09:49:43                         )
09:49:43     
09:49:43                 if i > 0 and partition_keys != last_partition_keys:
09:49:43                     dfs.append(
09:49:43                         cls._read_paths(
09:49:43                             paths,
09:49:43                             fs,
09:49:43                             columns=read_columns,
09:49:43                             row_groups=rgs if rgs else None,
09:49:43                             strings_to_categorical=strings_to_cats,
09:49:43                             partitions=partitions,
09:49:43                             partitioning=partitioning,
09:49:43                             partition_keys=last_partition_keys,
09:49:43                             **read_kwargs,
09:49:43                         )
09:49:43                     )
09:49:43                     paths = rgs = []
09:49:43                     last_partition_keys = None
09:49:43                 paths.append(path)
09:49:43                 rgs.append(
09:49:43                     [row_group]
09:49:43                     if not isinstance(row_group, list)
09:49:43                     and row_group is not None
09:49:43                     else row_group
09:49:43                 )
09:49:43                 last_partition_keys = partition_keys
09:49:43     
09:49:43             dfs.append(
09:49:43                 cls._read_paths(
09:49:43                     paths,
09:49:43                     fs,
09:49:43                     columns=read_columns,
09:49:43                     row_groups=rgs if rgs else None,
09:49:43                     strings_to_categorical=strings_to_cats,
09:49:43                     partitions=partitions,
09:49:43                     partitioning=partitioning,
09:49:43                     partition_keys=last_partition_keys,
09:49:43                     **read_kwargs,
09:49:43                 )
09:49:43             )
09:49:43             df = cudf.concat(dfs) if len(dfs) > 1 else dfs[0]
09:49:43     
09:49:43             # Re-set "object" dtypes align with pa schema
09:49:43             set_object_dtypes_from_pa_schema(df, schema)
09:49:43     
09:49:43             if index and (index[0] in df.columns):
09:49:43                 df = df.set_index(index[0])
09:49:43             elif index is False and df.index.names != (None,):
09:49:43                 # If index=False, we shouldn't have a named index
09:49:43                 df.reset_index(inplace=True)
09:49:43     
09:49:43         except MemoryError as err:
09:49:43 >           raise MemoryError(
09:49:43                 "Parquet data was larger than the available GPU memory!\n\n"
09:49:43                 "See the notes on split_row_groups in the read_parquet "
09:49:43                 "documentation.\n\n"
09:49:43                 "Original Error: " + str(err)
09:49:43             )
09:49:43 E           MemoryError: Parquet data was larger than the available GPU memory!
09:49:43 E           
09:49:43 E           See the notes on split_row_groups in the read_parquet documentation.
09:49:43 E           
09:49:43 E           Original Error: std::bad_alloc
09:49:43 
09:49:43 /opt/conda/envs/dask/lib/python3.9/site-packages/dask_cudf/io/parquet.py:265: MemoryError

cc @charlesbluca @rjzamora

Metadata

Metadata

Assignees

Labels

gputestsUnit tests and/or continuous integration

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions