|
7 | 7 | import pandas as pd |
8 | 8 | import pyarrow as pa |
9 | 9 | import pyarrow.parquet as pq |
| 10 | + |
| 11 | +try: |
| 12 | + from pyarrow.parquet import filters_to_expression |
| 13 | +except ImportError: |
| 14 | + from pyarrow.parquet import _filters_to_expression as filters_to_expression |
| 15 | + |
10 | 16 | from packaging.version import parse as parse_version |
11 | 17 |
|
| 18 | +from dask import config |
12 | 19 | from dask.base import tokenize |
13 | 20 | from dask.core import flatten |
14 | 21 | from dask.dataframe.backends import pyarrow_schema_dispatch |
@@ -436,7 +443,9 @@ def read_metadata( |
436 | 443 | ) |
437 | 444 |
|
438 | 445 | # Stage 2: Generate output `meta` |
439 | | - meta = cls._create_dd_meta(dataset_info, use_nullable_dtypes) |
| 446 | + meta = cls._create_dd_meta( |
| 447 | + dataset_info, use_nullable_dtypes=use_nullable_dtypes |
| 448 | + ) |
440 | 449 |
|
441 | 450 | # Stage 3: Generate parts and stats |
442 | 451 | parts, stats, common_kwargs = cls._construct_collection_plan(dataset_info) |
@@ -1091,6 +1100,7 @@ def _collect_dataset_info( |
1091 | 1100 | "metadata_task_size": metadata_task_size, |
1092 | 1101 | "kwargs": { |
1093 | 1102 | "dataset": _dataset_kwargs, |
| 1103 | + "convert_string": config.get("dataframe.convert_string"), |
1094 | 1104 | **kwargs, |
1095 | 1105 | }, |
1096 | 1106 | } |
@@ -1123,11 +1133,13 @@ def _create_dd_meta(cls, dataset_info, use_nullable_dtypes=False): |
1123 | 1133 |
|
1124 | 1134 | # Use _arrow_table_to_pandas to generate meta |
1125 | 1135 | arrow_to_pandas = dataset_info["kwargs"].get("arrow_to_pandas", {}).copy() |
| 1136 | + convert_string = dataset_info["kwargs"].get("convert_string", False) |
1126 | 1137 | meta = cls._arrow_table_to_pandas( |
1127 | 1138 | schema.empty_table(), |
1128 | 1139 | categories, |
1129 | 1140 | arrow_to_pandas=arrow_to_pandas, |
1130 | 1141 | use_nullable_dtypes=use_nullable_dtypes, |
| 1142 | + convert_string=convert_string, |
1131 | 1143 | ) |
1132 | 1144 | index_names = list(meta.index.names) |
1133 | 1145 | column_names = list(meta.columns) |
@@ -1317,7 +1329,7 @@ def _construct_collection_plan(cls, dataset_info): |
1317 | 1329 | # Get/transate filters |
1318 | 1330 | ds_filters = None |
1319 | 1331 | if filters is not None: |
1320 | | - ds_filters = pq._filters_to_expression(filters) |
| 1332 | + ds_filters = filters_to_expression(filters) |
1321 | 1333 |
|
1322 | 1334 | # Define subset of `dataset_info` required by _collect_file_parts |
1323 | 1335 | dataset_info_kwargs = { |
@@ -1666,7 +1678,7 @@ def _read_table( |
1666 | 1678 | use_threads=False, |
1667 | 1679 | schema=schema, |
1668 | 1680 | columns=cols, |
1669 | | - filter=pq._filters_to_expression(filters) if filters else None, |
| 1681 | + filter=filters_to_expression(filters) if filters else None, |
1670 | 1682 | ) |
1671 | 1683 | else: |
1672 | 1684 | arrow_table = _read_table_from_path( |
@@ -1699,40 +1711,76 @@ def _read_table( |
1699 | 1711 | return arrow_table |
1700 | 1712 |
|
1701 | 1713 | @classmethod |
1702 | | - def _arrow_table_to_pandas( |
1703 | | - cls, arrow_table: pa.Table, categories, use_nullable_dtypes=False, **kwargs |
1704 | | - ) -> pd.DataFrame: |
1705 | | - _kwargs = kwargs.get("arrow_to_pandas", {}) |
1706 | | - _kwargs.update({"use_threads": False, "ignore_metadata": False}) |
1707 | | - |
1708 | | - if use_nullable_dtypes: |
1709 | | - # Determine is `pandas` or `pyarrow`-backed dtypes should be used |
1710 | | - if use_nullable_dtypes == "pandas": |
1711 | | - default_types_mapper = PYARROW_NULLABLE_DTYPE_MAPPING.get |
| 1714 | + def _determine_type_mapper( |
| 1715 | + cls, *, use_nullable_dtypes=False, convert_string=False, **kwargs |
| 1716 | + ): |
| 1717 | + user_mapper = kwargs.get("arrow_to_pandas", {}).get("types_mapper") |
| 1718 | + type_mappers = [] |
| 1719 | + |
| 1720 | + def pyarrow_type_mapper(pyarrow_dtype): |
| 1721 | + # Special case pyarrow strings to use more feature complete dtype |
| 1722 | + # See https://github.com/pandas-dev/pandas/issues/50074 |
| 1723 | + if pyarrow_dtype == pa.string(): |
| 1724 | + return pd.StringDtype("pyarrow") |
1712 | 1725 | else: |
1713 | | - # use_nullable_dtypes == "pyarrow" |
| 1726 | + return pd.ArrowDtype(pyarrow_dtype) |
1714 | 1727 |
|
1715 | | - def default_types_mapper(pyarrow_dtype): # type: ignore |
1716 | | - # Special case pyarrow strings to use more feature complete dtype |
1717 | | - # See https://github.com/pandas-dev/pandas/issues/50074 |
1718 | | - if pyarrow_dtype == pa.string(): |
1719 | | - return pd.StringDtype("pyarrow") |
1720 | | - else: |
1721 | | - return pd.ArrowDtype(pyarrow_dtype) |
| 1728 | + # always use the user-defined mapper first, if available |
| 1729 | + if user_mapper is not None: |
| 1730 | + type_mappers.append(user_mapper) |
1722 | 1731 |
|
1723 | | - if "types_mapper" in _kwargs: |
1724 | | - # User-provided entries take priority over default_types_mapper |
1725 | | - types_mapper = _kwargs["types_mapper"] |
| 1732 | + # next in priority is converting strings |
| 1733 | + if convert_string: |
| 1734 | + type_mappers.append({pa.string(): pd.StringDtype("pyarrow")}.get) |
1726 | 1735 |
|
1727 | | - def _types_mapper(pa_type): |
1728 | | - return types_mapper(pa_type) or default_types_mapper(pa_type) |
| 1736 | + # and then nullable types |
| 1737 | + if use_nullable_dtypes == "pandas": |
| 1738 | + type_mappers.append(PYARROW_NULLABLE_DTYPE_MAPPING.get) |
| 1739 | + elif use_nullable_dtypes: # "pyarrow" or True |
| 1740 | + type_mappers.append(pyarrow_type_mapper) |
1729 | 1741 |
|
1730 | | - _kwargs["types_mapper"] = _types_mapper |
| 1742 | + def default_types_mapper(pyarrow_dtype): |
| 1743 | + """Try all type mappers in order, starting from the user type mapper.""" |
| 1744 | + for type_converter in type_mappers: |
| 1745 | + converted_type = type_converter(pyarrow_dtype) |
| 1746 | + if converted_type is not None: |
| 1747 | + return converted_type |
1731 | 1748 |
|
1732 | | - else: |
1733 | | - _kwargs["types_mapper"] = default_types_mapper |
| 1749 | + if len(type_mappers) > 0: |
| 1750 | + return default_types_mapper |
1734 | 1751 |
|
1735 | | - return arrow_table.to_pandas(categories=categories, **_kwargs) |
| 1752 | + @classmethod |
| 1753 | + def _arrow_table_to_pandas( |
| 1754 | + cls, |
| 1755 | + arrow_table: pa.Table, |
| 1756 | + categories, |
| 1757 | + use_nullable_dtypes=False, |
| 1758 | + convert_string=False, |
| 1759 | + **kwargs, |
| 1760 | + ) -> pd.DataFrame: |
| 1761 | + _kwargs = kwargs.get("arrow_to_pandas", {}) |
| 1762 | + _kwargs.update({"use_threads": False, "ignore_metadata": False}) |
| 1763 | + |
| 1764 | + types_mapper = cls._determine_type_mapper( |
| 1765 | + use_nullable_dtypes=use_nullable_dtypes, |
| 1766 | + convert_string=convert_string, |
| 1767 | + **kwargs, |
| 1768 | + ) |
| 1769 | + if types_mapper is not None: |
| 1770 | + _kwargs["types_mapper"] = types_mapper |
| 1771 | + |
| 1772 | + res = arrow_table.to_pandas(categories=categories, **_kwargs) |
| 1773 | + # TODO: remove this when fixed in pyarrow: https://github.com/apache/arrow/issues/34283 |
| 1774 | + if ( |
| 1775 | + convert_string |
| 1776 | + and isinstance(res.index, pd.Index) |
| 1777 | + and not isinstance(res.index, pd.MultiIndex) |
| 1778 | + and pd.api.types.is_string_dtype(res.index.dtype) |
| 1779 | + and res.index.dtype |
| 1780 | + not in (pd.StringDtype("pyarrow"), pd.ArrowDtype(pa.string())) |
| 1781 | + ): |
| 1782 | + res.index = res.index.astype(pd.StringDtype("pyarrow")) |
| 1783 | + return res |
1736 | 1784 |
|
1737 | 1785 | @classmethod |
1738 | 1786 | def collect_file_metadata(cls, path, fs, file_path): |
|
0 commit comments