Skip to content

Conversation

@wesm
Copy link
Member

@wesm wesm commented Nov 26, 2017

For systems (like Dask) that prefer to handle their own framed buffer transport, this provides a list of memoryview-compatible objects with minimal copying / allocation from the input data structure, which can similarly be zero-copy reconstructed to the original object.

To motivate the use case, consider a dict of ndarrays:

data = {i: np.random.randn(1000, 1000) for i in range(50)}

Here, we have:

>>> %timeit serialized = pa.serialize(data)
52.7 µs ± 1.01 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

This is about 400MB of data. Some systems may not want to double memory by assembling this into a single large buffer, like with the to_buffer method:

>>> written = serialized.to_buffer()
>>> written.size
400015456

We provide a to_components method which contains a dict with a 'data' field containing a list of pyarrow.Buffer objects. This can be converted back to the original Python object using pyarrow.deserialize_components:

>>> %timeit components = serialized.to_components()
73.8 µs ± 812 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

>>> list(components.keys())
['num_buffers', 'data', 'num_tensors']

>>> len(components['data'])
101

>>> type(components['data'][0])
pyarrow.lib.Buffer

and

>>> %timeit recons = pa.deserialize_components(components)
93.6 µs ± 260 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

The reason there are 101 data components (1 + 2 * 50) is that:

  • 1 buffer for the serialized Union stream representing the object
  • 2 buffers for each of the tensors: 1 for the metadata and 1 for the tensor body. The body is separate so that this is zero-copy from the input

Next step after this is ARROW-1784 which is to transport a pandas.DataFrame using this mechanism

cc @pitrou @jcrist @mrocklin

wesm added 5 commits November 23, 2017 13:40
Change-Id: I383cb91f1819c6d51d0320cfb7fdfbb0a29f0ff5
Change-Id: I78923fa5adf24bf3ba92fffb90c4a9d4fdb3da6b
Change-Id: I0a7346c65a39894cb14f7130cbe9ab125845cb84
Change-Id: I2cf430456b12a5c9830c9caa7824fcbcc6e167ef
Change-Id: I9811e853311cc42c0f9899e8c6601e0058e3c623

/// \brief Implementation of MessageReader that reads from InputStream
/// \since 0.5.0
class ARROW_EXPORT InputStreamMessageReader : public MessageReader {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was never necessary to export this class

wesm added 3 commits November 26, 2017 13:24
Change-Id: Id3167b483bbd19f1899d146c5f926d62690d3402
Change-Id: Ie2c9cce3b53fddc17bc4a42130efe95c6867617c
Change-Id: I87081833d8beac518ad7cb832df624bc39e33185
@mrocklin
Copy link

At first glance the to_components/deserialize_components structure seems good to me. This is definitely something that we can work with on the Dask side. Is it possible to easily turn each of the elements of to_components()['data'] into a memoryview without significant cost?

@mrocklin
Copy link

>>> list(components.keys())
['num_buffers', 'data', 'num_tensors']

I'm curious what the metadata here is supposed to mean and how it relates to data. Just curious though.

@wesm
Copy link
Member Author

wesm commented Nov 26, 2017

Yeah, you just call memoryview on the objects in to_components()['data'] -- the Buffer objects support the buffer/memoryview protocol.

The serialized payload consists of an Arrow data structure describing the whole object, and the ndarrays / buffers are sent as "sidecars". So if num_buffers and num_tensors are both 0, then there will only be one buffer in data. In general there are 1 + num_buffers + 2 * num_tensors

@wesm
Copy link
Member Author

wesm commented Nov 26, 2017

I could possibly add an argument to to_components that returns buffers as memoryviews, if that would help. Might be nice to encapsulate this detail somewhat (though Dask must have knowledge of the additional metadata anyhow)

@mrocklin
Copy link

From a Dask perspective I'm more than happy to handle the memoryview conversion.

// TODO(wesm): Not sure how pedantic we need to be about checking the return
// values of these functions. There are other places where we do not check
// PyDict_SetItem/SetItemString return value, but these failures would be
// quite esoteric
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main failure mode would be MemoryError when growing the dict to make place for the new key.

PyObject* wrapped_buffer = wrap_buffer(buffer);
RETURN_IF_PYERROR();
if (PyList_Append(buffers, wrapped_buffer) < 0) {
RETURN_IF_PYERROR();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably need Py_DECREF(wrapper_buffer) here as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

static std::unique_ptr<MessageReader> Open(io::InputStream* stream);

/// \brief Create MessageReader that reads from owned InputStream
static std::unique_ptr<MessageReader> Open(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, is there a rationale or convention for the use of unique_ptr vs shared_ptr here? :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to only use shared_ptr when there is some reasonable expectation that shared ownership may be frequently needed (of course one can always transfer the pointer to a shared_ptr if needed). There are some other places in the library where shared_ptr is returned (or an out-variable) that would be better as unique_ptr

Change-Id: Id21d531666d810c2c7a68d74ff37e85e8ac0a8e2
@wesm
Copy link
Member Author

wesm commented Nov 27, 2017

+1

@wesm wesm closed this in 2e3832f Nov 27, 2017
@wesm wesm deleted the ARROW-1783 branch November 27, 2017 21:15
wesm added a commit that referenced this pull request Dec 1, 2017
…erialized Python object with minimal allocation

For systems (like Dask) that prefer to handle their own framed buffer transport, this provides a list of memoryview-compatible objects with minimal copying / allocation from the input data structure, which can similarly be zero-copy reconstructed to the original object.

To motivate the use case, consider a dict of ndarrays:

```
data = {i: np.random.randn(1000, 1000) for i in range(50)}
```

Here, we have:

```
>>> %timeit serialized = pa.serialize(data)
52.7 µs ± 1.01 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
```

This is about 400MB of data. Some systems may not want to double memory by assembling this into a single large buffer, like with the `to_buffer` method:

```
>>> written = serialized.to_buffer()
>>> written.size
400015456
```

We provide a `to_components` method which contains a dict with a `'data'` field containing a list of `pyarrow.Buffer` objects. This can be converted back to the original Python object using `pyarrow.deserialize_components`:

```
>>> %timeit components = serialized.to_components()
73.8 µs ± 812 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

>>> list(components.keys())
['num_buffers', 'data', 'num_tensors']

>>> len(components['data'])
101

>>> type(components['data'][0])
pyarrow.lib.Buffer
```

and

```
>>> %timeit recons = pa.deserialize_components(components)
93.6 µs ± 260 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
```

The reason there are 101 data components (1 + 2 * 50) is that:

* 1 buffer for the serialized Union stream representing the object
* 2 buffers for each of the tensors: 1 for the metadata and 1 for the tensor body. The body is separate so that this is zero-copy from the input

Next step after this is ARROW-1784 which is to transport a pandas.DataFrame using this mechanism

cc @pitrou @jcrist @mrocklin

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #1362 from wesm/ARROW-1783 and squashes the following commits:

4ec5a89 [Wes McKinney] Add missing decref on error
e8c76d4 [Wes McKinney] Acquire GIL in GetSerializedFromComponents
1d2e0e2 [Wes McKinney] Fix function documentation
fffc7bb [Wes McKinney] Typos, add deserialize_components to API
50d2fee [Wes McKinney] Finish componentwise serialization roundtrip
58174dd [Wes McKinney] More progress, stubs for reconstruction
b1e31a3 [Wes McKinney] Draft GetTensorMessage
337e1d2 [Wes McKinney] Draft SerializedPyObject::GetComponents
598ef33 [Wes McKinney] Tweak
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants