[Data][Serialization] - Use Arrow IPC for Arrow Schema Serdes#60195
Conversation
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a performance optimization for pyarrow.Schema serialization by using Arrow's native IPC format instead of cloudpickle. The implementation is clean and is accompanied by a comprehensive test case that validates the new serialization mechanism.
I have one suggestion to further improve performance by avoiding an unnecessary memory copy during serialization. This involves using pyarrow.Buffer directly instead of converting to a bytes object. I've left detailed comments on the implementation and the corresponding test file.
| return _restore_schema_from_ipc, (schema.serialize().to_pybytes(),) | ||
|
|
||
|
|
||
| def _restore_schema_from_ipc(buf: bytes) -> "pyarrow.Schema": | ||
| """Restore an Arrow Schema serialized to Arrow IPC format.""" | ||
| import pyarrow as pa | ||
|
|
||
| return pa.ipc.read_schema(pa.BufferReader(buf)) |
There was a problem hiding this comment.
To further improve performance and avoid an unnecessary memory copy, you can directly use the pyarrow.Buffer returned by schema.serialize() instead of converting it to a Python bytes object. pyarrow.Buffer objects are picklable and can be transferred efficiently.
This involves two small changes:
- In
_arrow_schema_reduce, remove the.to_pybytes()call. - In
_restore_schema_from_ipc, update the type hint forbufand remove thepa.BufferReaderwrapper, aspa.ipc.read_schemacan directly handle apyarrow.Buffer.
| return _restore_schema_from_ipc, (schema.serialize().to_pybytes(),) | |
| def _restore_schema_from_ipc(buf: bytes) -> "pyarrow.Schema": | |
| """Restore an Arrow Schema serialized to Arrow IPC format.""" | |
| import pyarrow as pa | |
| return pa.ipc.read_schema(pa.BufferReader(buf)) | |
| return _restore_schema_from_ipc, (schema.serialize(),) | |
| def _restore_schema_from_ipc(buf: "pyarrow.Buffer") -> "pyarrow.Schema": | |
| """Restore an Arrow Schema serialized to Arrow IPC format.""" | |
| import pyarrow as pa | |
| return pa.ipc.read_schema(buf) |
There was a problem hiding this comment.
============================================================
Schema: Large Schema (100 fields, mixed types) (100 fields)
Iterations: 10,000
Method Serialize (ms) Deserialize (ms) Size (bytes)
IPC 236.56 665.39 6856
cloudpickle 5939.44 1623.03 4710
ray.cloudpickle 359.02 706.08 6978
Speedup vs IPC (slower = Nx):
cloudpickle: Serialize: 25.11x Deserialize: 2.44x Size: 0.69x
ray.cloudpickle: Serialize: 1.52x Deserialize: 1.06x Size: 1.02x
============================================================
Ends up being worse with pa.Buffer
Signed-off-by: Goutam <goutam@anyscale.com>
…oject#60195) ## Description Ray Core was using `cloudpickle` instead of `Arrow IPC` as the serialization mechanism, and that takes a toll on Ray Data since Ray Data keeps track of the materialized `BlockMetadata` on the driver. Benchmark Results: ``` ============================================================ Schema: Large Schema (100 fields, mixed types) (100 fields) Iterations: 10,000 ============================================================ ``` | Method | Serialize (ms) | Deserialize (ms) | Size (bytes) | |--------|----------------|------------------|--------------| | Arrow IPC | 238.65 | 667.79 | 6856 | | cloudpickle | 5985.70 | 1633.24 | 4710 | | ray.cloudpickle (with Cloudpickle) | 6036.41 | 1623.84 | 4710 | | ray.cloudpickle (with Arrow IPC) | 308.53 | 706.48 | 6942 | ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: Limark Dcunha <limarkdcunha@gmail.com>
…oject#60195) ## Description Ray Core was using `cloudpickle` instead of `Arrow IPC` as the serialization mechanism, and that takes a toll on Ray Data since Ray Data keeps track of the materialized `BlockMetadata` on the driver. Benchmark Results: ``` ============================================================ Schema: Large Schema (100 fields, mixed types) (100 fields) Iterations: 10,000 ============================================================ ``` | Method | Serialize (ms) | Deserialize (ms) | Size (bytes) | |--------|----------------|------------------|--------------| | Arrow IPC | 238.65 | 667.79 | 6856 | | cloudpickle | 5985.70 | 1633.24 | 4710 | | ray.cloudpickle (with Cloudpickle) | 6036.41 | 1623.84 | 4710 | | ray.cloudpickle (with Arrow IPC) | 308.53 | 706.48 | 6942 | ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
…oject#60195) ## Description Ray Core was using `cloudpickle` instead of `Arrow IPC` as the serialization mechanism, and that takes a toll on Ray Data since Ray Data keeps track of the materialized `BlockMetadata` on the driver. Benchmark Results: ``` ============================================================ Schema: Large Schema (100 fields, mixed types) (100 fields) Iterations: 10,000 ============================================================ ``` | Method | Serialize (ms) | Deserialize (ms) | Size (bytes) | |--------|----------------|------------------|--------------| | Arrow IPC | 238.65 | 667.79 | 6856 | | cloudpickle | 5985.70 | 1633.24 | 4710 | | ray.cloudpickle (with Cloudpickle) | 6036.41 | 1623.84 | 4710 | | ray.cloudpickle (with Arrow IPC) | 308.53 | 706.48 | 6942 | ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com>
…oject#60195) ## Description Ray Core was using `cloudpickle` instead of `Arrow IPC` as the serialization mechanism, and that takes a toll on Ray Data since Ray Data keeps track of the materialized `BlockMetadata` on the driver. Benchmark Results: ``` ============================================================ Schema: Large Schema (100 fields, mixed types) (100 fields) Iterations: 10,000 ============================================================ ``` | Method | Serialize (ms) | Deserialize (ms) | Size (bytes) | |--------|----------------|------------------|--------------| | Arrow IPC | 238.65 | 667.79 | 6856 | | cloudpickle | 5985.70 | 1633.24 | 4710 | | ray.cloudpickle (with Cloudpickle) | 6036.41 | 1623.84 | 4710 | | ray.cloudpickle (with Arrow IPC) | 308.53 | 706.48 | 6942 | ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…oject#60195) ## Description Ray Core was using `cloudpickle` instead of `Arrow IPC` as the serialization mechanism, and that takes a toll on Ray Data since Ray Data keeps track of the materialized `BlockMetadata` on the driver. Benchmark Results: ``` ============================================================ Schema: Large Schema (100 fields, mixed types) (100 fields) Iterations: 10,000 ============================================================ ``` | Method | Serialize (ms) | Deserialize (ms) | Size (bytes) | |--------|----------------|------------------|--------------| | Arrow IPC | 238.65 | 667.79 | 6856 | | cloudpickle | 5985.70 | 1633.24 | 4710 | | ray.cloudpickle (with Cloudpickle) | 6036.41 | 1623.84 | 4710 | | ray.cloudpickle (with Arrow IPC) | 308.53 | 706.48 | 6942 | ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <goutam@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Ray Core was using
cloudpickleinstead ofArrow IPCas the serialization mechanism, and that takes a toll on Ray Data since Ray Data keeps track of the materializedBlockMetadataon the driver.Benchmark Results:
Related issues
Additional information