-
Notifications
You must be signed in to change notification settings - Fork 16
Description
ProxyStore's serialization mechanism generally uses pickle with a fallback to cloudpickle. There is special handling of a couple type (str and bytes) which can be more efficiently serialized, but this support is very limited.
Sending dataframe/array-like data in the apps that ProxyStore targets is very common so it would be good to provide better handling of those types when possible (such as how Dask handles them separately).
As an example, numpy.save() is measurably faster for numpy arrays than ProxyStore's serialize().
$ python -m timeit -n 10 -s "import io, numpy" "x = numpy.random.rand(10000, 10000); output = io.BytesIO(); numpy.save(output, x)"
10 loops, best of 5: 554 msec per loop
$ python -m timeit -n 10 -s "import numpy; from proxystore.serialize import serialize" "x = numpy.random.rand(10000, 10000); serialize(x)"
10 loops, best of 5: 843 msec per loop
We want to (1) refactor to serialization system to support better integration of alternate serializers for specific types and (2) add alternate serializers for common, large data types (numpy, pandas, polars, pytorch, TF).
Part 1: Serialization Refactor
The current serialize()/deserialize() are just a chain of if statements with some error handling. It would be better to have separate serialize constructs and a registration system for easier extension. This is similar to how Globus Compute does it.
from typing import Protocol
class Serializer(Protocol):
def serializable(obj: Any) -> bool: ...
def serialize(obj: Any) -> bytes: ...
def deserialize(data: bytes) -> Any: ...
class BytesSerializer: ...
class StrSerializer: ...
class PickleSerializer: ...
class CloudPickleSerializer: ... # Maybe Pickle/Cloudpickle should be merged?
SERIALIZERS: dict[bytes, Serializer] = OrderedDict(
[
(b'01', BytesSerializer()),
(b'02', StrSerializer()),
(b'03', PickleSerializer()),
(b'04', CloudPickleSerializer()),
]
)
def serialize(obj: Any) -> bytes:
for identifier, serializer in SERIALIZERS.items():
if serializer.serializable(obj):
data = serializer.serialize(obj)
# I wonder what best way to append is? This probably makes a copy.
# Maybe we pass BytesIO with identifier already written to
# Serializer.serializer()? I think we start with as it is written
# right now, benchmark, then run again with BytesIO.
return identifier + '\n' + data
else:
# Raise useful error
def deserialize(data: bytes) -> Any:
...
identifier, separator, data = data.partition(b'\n')
...
serializer = SERIALIZERS[identifer] # Need to handle KeyError
try:
return serializer.deserialize(data)
except Exception as e:
raise SerializationError(...) from ePart 2: Third-party Serializer Support
With the above system in place, it should be pretty easy to extend with more Serializer implementations for specific data types. We'll start with numpy (numpy.save), pandas (DataFrame.to_feature or to_pickle), and polars (DataFrame.write_ipc).
The main caveat is that it is not guarenteed that the numpy, pandas, or polars will be installed. That is why Serializer.serializable() exists. We can return False if the package is not available and then check the data type of the object.
Once we have added support for these data types, we'll need to write a benchmark to measure serialization and deserialization time. We'll want to repeat each serialization of random data many times and with many data sizes to plot the comparison between the old ProxyStore serializer and the new.
Afterwards we can try (1) enabling compression on the aforementioned (and rerunning the benchmarks) and then (2) supporting pytorch models (torch.save).