Remove self from closures in dask.dataframe#1004
Conversation
Previously the dataframe object contained tasks that closed over the dataframe itself. This made certain tasks very costly to serialize.
|
|
||
|
|
||
| def _sum(x): | ||
| return x.sum() |
There was a problem hiding this comment.
Optional nitpick - these could be replaced with methodcaller objects. Results in smaller serialized form, and faster serialization time with cloudpickle (although both are super negligible). Feel free to ignore.
In [6]: import cloudpickle
In [7]: from operator import methodcaller
In [8]: _sum = methodcaller('sum')
In [9]: def _sum2(x):
...: return x.sum()
...:
In [10]: cloudpickle.dumps(_sum)
Out[10]: '\x80\x02coperator\nmethodcaller\nq\x00)\x81q\x01.'
In [11]: cloudpickle.dumps(_sum2)
Out[11]: '\x80\x02ccloudpickle.cloudpickle\n_fill_function\nq\x00(ccloudpickle.cloudpickle\n_make_skel_func\nq\x01ccloudpickle.cloudpickle\n_builtin_type\nq\x02U\x08CodeTypeq\x03\x85q\x04Rq\x05(K\x01K\x01K\x01KCU\n|\x00\x00j\x00\x00\x83\x00\x00Sq\x06N\x85q\x07U\x03sumq\x08\x85q\tU\x01xq\n\x85q\x0bU\x1e<ipython-input-9-2e7c5b0ec5c4>q\x0cU\x05_sum2q\rK\x01U\x02\x00\x01q\x0e))tq\x0fRq\x10]q\x11}q\x12\x87q\x13Rq\x14}q\x15N}q\x16tR.'
In [12]: %timeit cloudpickle.dumps(_sum)
The slowest run took 4.28 times longer than the fastest. This could mean that an intermediate result is being cached
10000 loops, best of 3: 21.5 µs per loop
In [13]: %timeit cloudpickle.dumps(_sum2)
10000 loops, best of 3: 108 µs per loopThere was a problem hiding this comment.
Yeah, unfortunately methodcaller doesn't serialize well.
In [1]: import numpy as np
In [2]: from cloudpickle import dumps, loads
In [3]: from operator import methodcaller
In [4]: f = methodcaller('sum')
In [5]: x = np.ones((2, 2))
In [6]: f(x)
Out[6]: 4.0
In [7]: loads(dumps(f))(x)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-7-6f39faa8a209> in <module>()
----> 1 loads(dumps(f))(x)
TypeError: methodcaller needs at least one argument, the method nameGeneral rule of thumb, if its in the standard library, it doesn't serialize well.
There was a problem hiding this comment.
Also, when _sum is part of a module rather than created dynamically it will be much faster to serialize.
|
Hi, just dropping a note here since I was the one who replaced from sys import version
if version < '3':
import copy_reg as copyreg
else:
import copyreg
def _reduce_method_descriptor(m):
return getattr, (m.__objclass__, m.__name__)
# type(set.union) is used as a proxy to <class 'method_descriptor'>
copyreg.pickle(type(set.union), _reduce_method_descriptor)
import pickle
import cloudpickle
def _dumps(x):
return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
_loads = pickle.loadsWith those definitions, then serialization works for In [10]: from operator import methodcaller
In [11]: f = methodcaller('sum')
In [12]: import numpy as np
In [13]: x = np.ones((2,2))
In [14]: f(x)
Out[14]: 4.0
In [15]: _loads(_dumps(f))(x)
Out[15]: 4.0 |
|
Hi @PedroMDuarte I wish you had brought that up earlier. I repeated this work almost exactly in cloudpickle a little while ago. cloudpipe/cloudpickle@ce99eee If you're interested, that project could use contributors like yourself. There are a lot of people now depending on it and only a few people adding fixes like this from time to time. |
|
@mrocklin That is cool! I am by no means an expert in serialization, but I will look into helping out with |
|
Merging soon if no additional comments. |
Remove self from closures in dask.dataframe
* Make column projections stricter (dask#881) * Simplify again after lowering (dask#884) * Visual EXPLAIN (dask#885) * Fix merge predicate pushdowns with weird predicates (dask#888) * Handle futures that are put into map_partitions (dask#892) * Remove eager divisions from indexing (dask#891) * Add shuffle if objects are not aligned and partitions are unknown in assign (dask#887) Co-authored-by: Hendrik Makait <hendrik@makait.com> * Add support for dd.Aggregation (dask#893) * Fix random_split for series (dask#894) * Update dask version * Use Aggregation from dask/dask (dask#895) * Fix meta calculation error in groupby (dask#897) * Revert "Use Aggregation from dask/dask" (dask#898) * Parquet reader using Pyarrow FileSystem (dask#882) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Fix assign for empty indexer (dask#901) * Add dask.dataframe import at start (dask#903) * Add indicator support to merge (dask#902) * Fix detection of parquet filter pushdown (dask#910) * Speedup init of `ReadParquetPyarrowFS` (dask#909) * Don't rely on sets in are_co_aligned (dask#908) * Implement more efficient GroupBy.mean (dask#906) * Refactor GroupByReduction (dask#920) * Implement array inference in new_collection (dask#922) * Add support for convert string option (dask#912) * P2P shuffle drops partitioning column early (dask#899) * Avoid culling for SetIndexBlockwise with divisions (dask#925) * Re-run versioneer install to fix version number (tag_prefix) (dask#926) * Sort if split_out=1 in value_counts (dask#924) * Wrap fragments (dask#911) * Ensure that columns are copied in projection (dask#927) * Raise in map if pandas < 2.1 (dask#929) * Add _repr_html_ and updated __repr__ for FrameBase (dask#930) * Support token for map_partitions (dask#931) * Fix Copy-on-Write related bug in groupby.transform (dask#933) * Fix to_dask_dataframe test after switching to dask-expr by default (dask#935) * Use multi-column assign in groupby apply (dask#934) * Enable copy on write by default (dask#932) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Avoid fusing from_pandas ops to avoid duplicating data (dask#938) * Adjust automatic split_out parameter (dask#940) * Revert "Add _repr_html_ and updated __repr__ for FrameBase (dask#930)" (dask#941) * Remove repartition from P2P shuffle (dask#942) * [Parquet] Calculate divisions from statistics (dask#917) * Accept user arguments for arrow_to_pandas (dask#936) * Add _repr_html_ and prettier __repr__ w/o graph materialization (dask#943) * Add dask tokenize for fragment wrapper (dask#948) * Warn if annotations are ignored (dask#947) * Require `pyarrow>=7` (dask#949) * Implement string conversion for from_array (dask#950) * Add dtype and columns type check for shuffle (dask#951) * Concat arrow tables before converting to pandas (dask#928) * MINOR: Avoid confusion around shuffle method (dask#956) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Set pa cpu count (dask#954) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Update for pandas nighlies (dask#953) * Fix bug with split_out in groupby aggregate (dask#957) * Fix default observed value (dask#960) * Ensure that we respect shuffle in context manager (dask#958) Co-authored-by: Hendrik Makait <hendrik@makait.com> * Fix 'Empty' prefix to non-empty Series repr (dask#963) * Update README.md (dask#964) * Adjust split_out values to be consistent with other methods (dask#961) * bump version to 1.0 * Raise an error if the optimizer cannot terminate (dask#966) * Fix non-converging optimizer (dask#967) * Fixup filter pushdown through merges with ands and column reuse (dask#969) * Fix unique with shuffle and strings (dask#971) * Implement custom reductions (dask#970) * Fixup set_index with one partition but more divisions by user (dask#972) * Fixup predicate pushdown for query 19 (dask#973) Co-authored-by: Miles <miles59923@gmail.com> * Revert enabling pandas cow (dask#974) * Update changelog for 1.0.2 * Fix set-index preserving divisions for presorted (dask#977) * Fixup reduction with split_every=False (dask#978) * Release for dask 2024.3.1 * Raise better error for repartition on divisions with unknown divisions (dask#980) * Fix concat of series objects with column projection (dask#981) * Fix some reset_index optimization issues (dask#982) * Remove keys() (dask#983) * Ensure wrapping an array when comparing to Series works if columns are empty (dask#984) * Version v1.0.4 * Visual ANALYZE (dask#889) Co-authored-by: fjetter <fjetter@users.noreply.github.com> * Support ``prefix`` argument in ``from_delayed`` (dask#991) * Ensure drop matches column names exactly (dask#992) * Fix SettingWithCopyWarning in _merge.py (dask#990) * Update pyproject.toml (dask#994) * Allow passing of boolean index for column index in loc (dask#995) * Ensure that repr doesn't raise if an operand is a pandas object (dask#996) * Version v1.0.5 * Reduce coverage target a little bit (dask#999) * Nicer read_parquet prefix (dask#998) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Set divisions with divisions already known (dask#997) * Start building and publishing conda nightlies (dask#986) * Fix zero division error when reading index from parquet (dask#1000) * Rename overloaded `to/from_dask_dataframe` API (dask#987) * Register json and orc APIs for "pandas" dispatch (dask#1004) * Fix pyarrow fs reads for list of directories (dask#1006) * Release for dask 2024.4.0 * Fix meta caclulation in drop_duplicates (dask#1007) * Release 1.0.7 * Support named aggregations in groupby.aggregate (dask#1009) * Make release 1.0.9 * Adjust version number in changes * Make setattr work (dask#1011) * Release for dask 2024.4.1 * Fix head for npartitions=-1 and optimizer step (dask#1014) * Deprecate ``to/from_dask_dataframe`` API (dask#1001) * Fix projection for rename if projection isn't renamed (dask#1016) * Fix unique with numeric columns (dask#1017) * Add changes for new version * Fix column projections in merge when suffixes are relevant (dask#1019) * Simplify dtype casting logic for shuffle (dask#1012) * Use implicit knowledge about divisions for efficient grouping (dask#946) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Co-authored-by: Hendrik Makait <hendrik@makait.com> * Fix assign after set index incorrect projections (dask#1020) * Fix read_parquet if directory is empty (dask#1023) * Rename uniuqe_partition_mapping property and add docs (dask#1022) * Add docs for usefule optimizer methods (dask#1025) * Fix doc build error (dask#1026) * Fix error in analyze for scalar (dask#1027) * Add nr of columns to explain output for projection (dask#1030) Co-authored-by: Hendrik Makait <hendrik@makait.com> * Fuse more aggressively if parquet files are tiny (dask#1029) * Move IO docstrings over (dask#1033) * Release for dask 2024.4.2 * Add cudf support to ``to_datetime`` and ``_maybe_from_pandas`` (dask#1035) * Fix backend dispatching for `read_csv` (dask#1028) * Fix loc accessing index for element wise op (dask#1037) * Fix loc slicing with Datetime Index (dask#1039) * Fix shuffle after set_index from 1 partition df (dask#1040) * Bugfix release * Fix bug in ``Series`` reductions (dask#1041) * Fix shape returning integer (dask#1043) * Fix xarray integration with scalar columns (dask#1046) * Fix None min/max statistics and missing statistics generally (dask#1045) * Fix drop with set (dask#1047) * Fix delayed in fusing with multipled dependencies (dask#1038) * Add bugfix release * Optimize when from-delayed is called (dask#1048) * Fix default name conversion in `ToFrame` (dask#1044) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Add support for ``DataFrame.melt`` (dask#1049) * Fixup failing test (dask#1052) * Generalize ``get_dummies`` (dask#1053) * reduce pickle size of parquet fragments (dask#1050) * Add a bunch of docs (dask#1051) Co-authored-by: Hendrik Makait <hendrik@makait.com> * Release for dask 2024.5.0 * Fix to_parquet in append mode (dask#1057) * Fix sort_values for unordered categories (dask#1058) * Fix dropna before merge (dask#1062) * Fix non-integer divisions in FusedIO (dask#1063) * Add cache argument to ``lower_once`` (dask#1059) * Use ensure_deterministic kwarg instead of config (dask#1064) * Fix isin with strings (dask#1067) * Fix isin for head computation (dask#1068) * Fix read_csv with positional usecols (dask#1069) * Release for dask 2024.5.1 * Use `is_categorical_dtype` dispatch for `sort_values` (dask#1070) * Fix meta for string accessors (dask#1071) * Fix projection to empty from_pandas (dask#1072) * Release for dask 2024.5.2 * Fix categorize if columns are dropped (dask#1074) * Fix resample divisions propagation (dask#1075) * Release for dask 2024.6.0 * Fix get_group for multiple keys (dask#1080) * Skip distributed tests (dask#1081) * Fix cumulative aggregations for empty partitions (dask#1082) * Move another test to distributed folder (dask#1085) * Release 1.1.4 * Release for dask 2024.6.2 * Add minimal subset of interchange protocol (dask#1087) * Add from_map docstring (dask#1088) * Ensure 1 task group per from_delayed (dask#1084) * Advise against using from_delayed (dask#1089) * Refactor shuffle method to handle invalid columns (dask#1091) * Fix freq behavior on ci (dask#1092) * Add first array draft (dask#1090) * Fix array import stuff (dask#1094) * Add asarray (dask#1095) * Implement arange (dask#1097) * Implement linspace (dask#1098) * Implement zeros and ones (dask#1099) * Remvoe pandas 2 checks (dask#1100) * Add unify-chunks draft to arrays (dask#1101) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Release for dask 2024.7.0 * Skip test if optional xarray cannot be imported (dask#1104) * Fix deepcopying FromPandas class (dask#1105) * Fix from_pandas with chunksize and empty df (dask#1106) * Link fix in readme (dask#1107) * Fix shuffle blowing up the task graph (dask#1108) Co-authored-by: Hendrik Makait <hendrik@makait.com> * Release for dask 2024.7.1 * Fix some things for pandas 3 (dask#1110) * Fixup remaining upstream failures (dask#1111) * Release for dask 2024.8.0 * Drop support for Python 3.9 (dask#1109) Co-authored-by: James Bourbeau <jrbourbeau@gmail.com> * Fix tuples as on argument in merge (dask#1117) * Fix merging when index name in meta missmatches actual name (dask#1119) Co-authored-by: Hendrik Makait <hendrik@makait.com> * Register `read_parquet` and `read_csv` as "dispatchable" (dask#1114) * Fix projection for Index class in read_parquet (dask#1120) * Fix result index of merge (dask#1121) * Introduce `ToBackend` expression (dask#1115) * Avoid calling ``array`` attribute on ``cudf.Series`` (dask#1122) * Make split_out for categorical default smarter (dask#1124) * Release for dask 2024.8.1 * Fix scalar detection of columns coming from sql (dask#1125) * Bump `pyarrow>=14.0.1` minimum versions (dask#1127) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Fix concat axis 1 bug in divisions (dask#1128) * Release for dask 2024.8.2 * Use task-based rechunking as default (dask#1131) * Improve performance of `DelayedsExpr` through caching (dask#1132) * Import from tokenize (dask#1133) * Release for dask 2024.9.0 * Add concatenate flag to .compute() (dask#1138) * Release for dask 2024.9.1 * Fix displaying timestamp scalar (dask#1141) * Fix alignment issue with groupby index accessors (dask#1142) * Improve handling of optional dependencies in `analyze` and `explain` (dask#1146) * Switch from mambaforge to miniforge in CI (dask#1147) * Fix merge_asof for single partition (dask#1145) * Raise exception when calculating divisons (dask#1149) * Fix binary operations with scalar on the left (dask#1150) * Explicitly list setuptools as a build dependency in conda recipe (dask#1151) * Version v1.1.16 * Fix ``Merge`` divisions after filtering partitions (dask#1152) * Fix meta calculation for to_datetime (dask#1153) * Internal cleanup of P2P code (dask#1154) * Migrate P2P shuffle and merge to TaskSpec (dask#1155) * Improve Aggregation docstring explicitly mentionning SeriesGroupBy (dask#1156) * Migrate shuffle and merge to `P2PBarrierTask` (dask#1157) * Migrate Blockwise to use taskspec (dask#1159) * Add support for Python 3.13 (dask#1160) * Release for dask 2024.11.0 * Fix fusion calling things multiple times (dask#1161) * Version 1.1.18 * Version 1.1.19 * Fix orphaned dependencies in Fused expression (dask#1163) * Use Taskspec fuse implementation (dask#1162) Co-authored-by: Patrick Hoefler <61934744+phofl@users.noreply.github.com> * Introduce more caching when walking the expression (dask#1165) * Avoid exponentially growing graph for Assign-Projection combinations (dask#1164) * Remove ``from_dask_dataframe`` (dask#1167) * Deprecated and remove from_legacy_dataframe usage (dask#1168) Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com> * Remove recursion in task spec (dask#1158) * Fix value_counts with split_out != 1 (dask#1170) * Release 2024.12.0 * Use new blockwise unpack collection in array (dask#1173) * Propagate group_keys in DataFrameGroupBy (dask#1174) * Fix assign optimization when overwriting columns (dask#1176) * Remove custom read-csv stuff (dask#1178) * Fixup install paths (dask#1179) * Version 1.1.21 * Remove legacy conversion functions (dask#1177) * Remove duplicated files * Move repository * Clean up docs and imports * Clean up docs and imports --------- Co-authored-by: Hendrik Makait <hendrik@makait.com> Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com> Co-authored-by: Miles <miles59923@gmail.com> Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Co-authored-by: Richard (Rick) Zamora <rzamora217@gmail.com> Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Co-authored-by: James Bourbeau <jrbourbeau@gmail.com> Co-authored-by: alex-rakowski <alexrakowski90@gmail.com> Co-authored-by: Matthew Rocklin <mrocklin@gmail.com> Co-authored-by: Sandro <shfu29r4bu@liamekaens.com> Co-authored-by: Ben <55319792+benrutter@users.noreply.github.com> Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com> Co-authored-by: Guillaume Eynard-Bontemps <g.eynard.bontemps@gmail.com> Co-authored-by: Tom Augspurger <tom.augspurger88@gmail.com>
Previously the dataframe object contained tasks that closed over the
dataframe itself. This made certain tasks very costly to serialize.