Skip to content

The Joy and Agony of Categorical Types #1040

@jcorbin

Description

@jcorbin

Intro

First off cheers on where you've got dask to today, it's been immediately
useful for my applications of sifting log data. In fact it's because dask is
so wildly useful that I keep trying to push through the rough patches and
improve what limitations I think I'm finding.

I'm recording a limitation that I hit while trying to do some group aggregation
work to open up discussion in various ways.

Summary

Using categorical types can drastically speed up a group aggregation
computation (by nearly an order of magnitude) if it worked. Core issue seems
to be a lack of support for combining slightly different categoricals in
pandas.concat. There's potential to workaround it in dask's dataframe core,
if we can sort out a working hack.

I finish out with some remarks in general about the JSON => dataframe gap.

Problem

  • I've got a pile (~10GiB across ~400 files) of gzipped-json data
  • I want to extract dimensions A, B, and C from it; A and B are string labels,
    C is a floating point
  • I want to group by the compound (A, B) dimension, and then take the count and
    sum(C) across those groups

Setup

I've found it fastest to just write my own shard loading function, rather than
deal with the typical pipeline (bag lines, map json, pluck pluck munge,
to_dataframe...):

columns = ['A', 'B', 'C']

import ujson  # about 2x faster than core json in my experience

def load_record(line):
    record = ujson.loads(line)
    return (record['A'], record['B'], record['C'])

@dask.do
def load_shard(path):
    with dask.utils.open(path, compression=dask.utils.infer_compression(path)) as f:
        records = map(load_record, f)

    shard = pd.DataFrame(records, columns=columns)
    shard['A'] = shard['A'].astype('category')  # XXX these are the contentious parts
    shard['B'] = shard['B'].astype('category')  # XXX ...
    shard['C'] = shard['C'].astype('float')

    return shard

paths = !find path/to/data -type f
shards = map(load_shard, paths)
df = dask.dataframe.from_imperative(shards, columns)
G = df.groupby(['A', 'B'])['C']

with dask.diagnostics.Profiler(watch=True):  # uses the WIP #1030
    # if we don't specify get here, then load_shard is GIL contended and the
    # whole thing takes ~Nx longer (N = num_workers)
    count, total = dask.compute(G.count(), G.sum(), get=dask.multiprocessing.get)

Results

Running over 10 shards (just slice the paths to shards array above) for easy
comparison, the above finishes in ~16 seconds on my machine, but yields the
unsatisfying exception in final result combination:

TypeError: categories must match existing categories when appending

Traceback
---------
  File "/.../dask/dask/async.py", line 264, in execute_task
    result = _execute_task(task, data)
  File "/.../dask/dask/async.py", line 245, in _execute_task
    args2 = [_execute_task(a, cache) for a in args]
  File "/.../dask/dask/async.py", line 246, in _execute_task
    return func(*args2)
  File "/.../dask/dask/dataframe/core.py", line 51, in _concat
    return pd.concat(args2)
  File "/.../lib/python2.7/site-packages/pandas/tools/merge.py", line 812, in concat
    copy=copy)
  File "/.../lib/python2.7/site-packages/pandas/tools/merge.py", line 949, in __init__
    self.new_axes = self._get_new_axes()
  File "/.../lib/python2.7/site-packages/pandas/tools/merge.py", line 1028, in _get_new_axes
    new_axes[self.axis] = self._get_concat_axis()
  File "/.../lib/python2.7/site-packages/pandas/tools/merge.py", line 1080, in _get_concat_axis
    concat_axis = _concat_indexes(indexes)
  File "/.../lib/python2.7/site-packages/pandas/tools/merge.py", line 1098, in _concat_indexes
    return indexes[0].append(indexes[1:])
  File "/.../lib/python2.7/site-packages/pandas/core/index.py", line 4943, in append
    arrays.append(label.append(appended))
  File "/.../lib/python2.7/site-packages/pandas/core/index.py", line 3542, in append
    to_concat = [ self._is_dtype_compat(c) for c in to_concat ]
  File "/.../lib/python2.7/site-packages/pandas/core/index.py", line 3193, in _is_dtype_compat
    raise TypeError("categories must match existing categories when appending")

For comparison, if we leave the strings as strings the computation succeeds,
but takes ~2:25 or ~145 seconds, nearly 9x the time of the categorical version.
The data size (as measured by pd.DataFrame.to_pickle(...)ing each of the 10
shards) only differs by a factor of ~2.5x, so there must be some sort of an
amplification effect going on as we have to keep (de)serializing between each
of the work units.

What's truly interesting about this performance difference is not simply that
the shard loading takes more time (due to serialization overhead), but that the
actual group sum/count work takes proportionately much more time; with
categorical types the group aggregation work was negligible, but with strings
it dominates over the extraction work.

Here's the task profile using categoricals:
image

And here's the task profile without categoricals (just using dtype object of strings instead):
image

Attempt to workaround

Based on taking two of my shards and learning how to concat them in isolation:

a, b = df.compute(shards[:2])

# XXX this will throw a `ValueError: incompatible categories in categorical concat`
#    pd.concat([a, b])

# however if we define:

def align_categories(a, *bs):
    for dim in a.dtypes.index[a.dtypes == 'category']:
        a_S = a[dim]
        bs_S = [b[dim] for b in bs]
        a_cats = set(a_S.cat.categories)
        bs_cats = [set(b_S.cat.categories) for b_S in bs_S]
        all_cats = reduce(set.union, [a_cats] + bs_cats)
        a_S.cat.add_categories(all_cats - a_cats, inplace=True)
        for i, b_S in enumerate(bs_S):
            b_cats = bs_cats[i]
            b_S.cat.add_categories(all_cats - b_cats, inplace=True)
            b_S.cat.reorder_categories(a_S.cat.categories, inplace=True)


# now we can:
align_categories(a, b)
df = pd.concat([a, b])

Heartened, we try:

diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py
index 9b5e129..7d9e276 100644
--- a/dask/dataframe/core.py
+++ b/dask/dataframe/core.py
@@ -49,12 +49,27 @@ def _concat(args, **kwargs):
         if not args2:
             return args[0]
         return pd.concat(args2)
+    align_categories(*args2)
     if isinstance(args[0], (pd.Index)):
         args = [arg for arg in args if len(arg)]
         return args[0].append(args[1:])
     return args


+def align_categories(a, *bs):
+    for dim in a.dtypes.index[a.dtypes == 'category']:
+        a_S = a[dim]
+        bs_S = [b[dim] for b in bs]
+        a_cats = set(a_S.cat.categories)
+        bs_cats = [set(b_S.cat.categories) for b_S in bs_S]
+        all_cats = reduce(set.union, [a_cats] + bs_cats)
+        a_S.cat.add_categories(all_cats - a_cats, inplace=True)
+        for i, b_S in enumerate(bs_S):
+            b_cats = bs_cats[i]
+            b_S.cat.add_categories(all_cats - b_cats, inplace=True)
+            b_S.cat.reorder_categories(a_S.cat.categories, inplace=True)
+
+
 def optimize(dsk, keys, **kwargs):
     from .optimize import optimize
     return optimize(dsk, keys, **kwargs)

However that ValueError wasn't actually our problem, we had a slightly
different TypeError due to the indicies of the aggregate shards being derived
from a categorical. Note in my case they're actually MultiIndexes and not
actually a CategoricalIndex.

Here is where I left of, having decided to write up this summary instead.

Sidebar: the joy and agony of JSON loading

Also I'm interested in feedback on the approach I've landed on for loading and
extracting JSON here:

  • use an imperative shard loading function that goes from file path to
    dataframe in one shot
  • allowing me to inline plucking and type casting also at the same time

I don't have numbers on the gains here, since I made the shift away from
equivalents like:

lines = db.from_filenames(all_the_files)
records = lines.map(ujson.loads) # <-- this is ~2x win over json.loads btw
tuples = records.map(some_plucking_function)
df = tuples.to_dataframe()
df.map_partitions(some_type_casting_function)

I tried many variations on this:

  • like using a map_partitions on records and returning direct tuple-lists
  • a pluck/zip network between records and df = ....to_dataframe()

But in the end, just writing an imperative shard loader was at least an order
of magnitude faster.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions