1111
1212from dask .base import tokenize
1313from dask .core import flatten
14+ from dask .dataframe ._compat import PANDAS_GT_120
1415from dask .dataframe .backends import pyarrow_schema_dispatch
1516from dask .dataframe .io .parquet .utils import (
1617 Engine ,
3738partitioning_supported = _pa_version >= parse_version ("5.0.0" )
3839del _pa_version
3940
41+ PYARROW_NULLABLE_DTYPE_MAPPING = {
42+ pa .int8 (): pd .Int8Dtype (),
43+ pa .int16 (): pd .Int16Dtype (),
44+ pa .int32 (): pd .Int32Dtype (),
45+ pa .int64 (): pd .Int64Dtype (),
46+ pa .uint8 (): pd .UInt8Dtype (),
47+ pa .uint16 (): pd .UInt16Dtype (),
48+ pa .uint32 (): pd .UInt32Dtype (),
49+ pa .uint64 (): pd .UInt64Dtype (),
50+ pa .bool_ (): pd .BooleanDtype (),
51+ pa .string (): pd .StringDtype (),
52+ }
53+
54+ if PANDAS_GT_120 :
55+ PYARROW_NULLABLE_DTYPE_MAPPING [pa .float32 ()] = pd .Float32Dtype ()
56+ PYARROW_NULLABLE_DTYPE_MAPPING [pa .float64 ()] = pd .Float64Dtype ()
57+
4058#
4159# Helper Utilities
4260#
@@ -321,6 +339,7 @@ def read_metadata(
321339 paths ,
322340 categories = None ,
323341 index = None ,
342+ use_nullable_dtypes = False ,
324343 gather_statistics = None ,
325344 filters = None ,
326345 split_row_groups = False ,
@@ -350,7 +369,7 @@ def read_metadata(
350369 )
351370
352371 # Stage 2: Generate output `meta`
353- meta = cls ._create_dd_meta (dataset_info )
372+ meta = cls ._create_dd_meta (dataset_info , use_nullable_dtypes )
354373
355374 # Stage 3: Generate parts and stats
356375 parts , stats , common_kwargs = cls ._construct_collection_plan (dataset_info )
@@ -375,6 +394,7 @@ def read_partition(
375394 pieces ,
376395 columns ,
377396 index ,
397+ use_nullable_dtypes = False ,
378398 categories = (),
379399 partitions = (),
380400 filters = None ,
@@ -445,7 +465,9 @@ def read_partition(
445465 arrow_table = pa .concat_tables (tables )
446466
447467 # Convert to pandas
448- df = cls ._arrow_table_to_pandas (arrow_table , categories , ** kwargs )
468+ df = cls ._arrow_table_to_pandas (
469+ arrow_table , categories , use_nullable_dtypes = use_nullable_dtypes , ** kwargs
470+ )
449471
450472 # For pyarrow.dataset api, need to convert partition columns
451473 # to categorigal manually for integer types.
@@ -958,7 +980,7 @@ def _collect_dataset_info(
958980 }
959981
960982 @classmethod
961- def _create_dd_meta (cls , dataset_info ):
983+ def _create_dd_meta (cls , dataset_info , use_nullable_dtypes = False ):
962984 """Use parquet schema and hive-partition information
963985 (stored in dataset_info) to construct DataFrame metadata.
964986 """
@@ -989,6 +1011,7 @@ def _create_dd_meta(cls, dataset_info):
9891011 schema .empty_table (),
9901012 categories ,
9911013 arrow_to_pandas = arrow_to_pandas ,
1014+ use_nullable_dtypes = use_nullable_dtypes ,
9921015 )
9931016 index_names = list (meta .index .names )
9941017 column_names = list (meta .columns )
@@ -1543,11 +1566,26 @@ def _read_table(
15431566
15441567 @classmethod
15451568 def _arrow_table_to_pandas (
1546- cls , arrow_table : pa .Table , categories , ** kwargs
1569+ cls , arrow_table : pa .Table , categories , use_nullable_dtypes = False , ** kwargs
15471570 ) -> pd .DataFrame :
15481571 _kwargs = kwargs .get ("arrow_to_pandas" , {})
15491572 _kwargs .update ({"use_threads" : False , "ignore_metadata" : False })
15501573
1574+ if use_nullable_dtypes :
1575+ if "types_mapper" in _kwargs :
1576+ # User-provided entries take priority over PYARROW_NULLABLE_DTYPE_MAPPING
1577+ types_mapper = _kwargs ["types_mapper" ]
1578+
1579+ def _types_mapper (pa_type ):
1580+ return types_mapper (pa_type ) or PYARROW_NULLABLE_DTYPE_MAPPING .get (
1581+ pa_type
1582+ )
1583+
1584+ _kwargs ["types_mapper" ] = _types_mapper
1585+
1586+ else :
1587+ _kwargs ["types_mapper" ] = PYARROW_NULLABLE_DTYPE_MAPPING .get
1588+
15511589 return arrow_table .to_pandas (categories = categories , ** _kwargs )
15521590
15531591 @classmethod
0 commit comments