Skip to content

dask.compute on nested Python objects #1968

@eriknw

Description

@eriknw

I really like using dask.compute to calculate many things in a single pass over the data. I sometimes even do so programmatically--e.g., from using results from a previous pass over the data. The single list format via *args in compute is somewhat limiting. I want a compute that also computes elements from nested dicts and lists. I hacked/whipped up the following to do this that uses dask.compute:

# Extend `dask.compute` to work on nested data structures.
from toolz import concat
from dask import compute

def _flatten_compute(vals):
    if isinstance(vals, list):
        inner = list(concat(map(_flatten_compute, vals)))
        return ['list', len(inner)] +  inner
    elif isinstance(vals, dict):
        inner = []
        for k, v in vals.items():
            inner.append(k)
            inner.extend(_flatten_compute(v))
        return ['dict', len(inner)] +  inner
    else:
        return ['item', vals]


def _rebuild_compute(results, start=0):
    if results[start] == 'list':
        rv = []
        stop = start + 2 + results[start + 1]
        i = start + 2
        while i < stop:
            val = results[i]
            if val == 'item':
                rv.append(results[i + 1])
                i += 2
            else:  # list or dict
                rv.append(_rebuild_compute(results, start=i))
                i += 2 + results[i + 1]
        return rv
    elif results[start] == 'dict':
        rv = {}
        stop = start + 2 + results[start + 1]
        i = start + 2
        while i < stop:
            key = results[i]
            val = results[i + 1]
            if val == 'item':
                rv[key] = results[i + 2]
                i += 3
            else:  # list or dict
                rv[key] = _rebuild_compute(results, start=i + 1)
                i += 3 + results[i + 2]
        return rv
    else:
        raise ValueError('Does not compute: %s %s' % (start, results))


def compute_nested(*args, **kwargs):
    """Extended ``dask.compute`` to work on nested data structures.
    
    This will compute (potentially nested) elements in lists and
    values in dicts.  For example:
    
    >>> compute_nested({'total': bag.count(), 'truthy': bag.filter(None).count()})
    ({'total': 100, 'truthy': 75},)
    """    
    flattened_args = _flatten_compute(list(args))
    results = compute(*flattened_args, **kwargs)
    return _rebuild_compute(results)

Thoughts? Should I take the effort to make a PR to add this to dask?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions