Skip to content

[C#] IPC stream writer should write slices of buffers when writing sliced arrays #40517

@adamreeve

Description

@adamreeve

Describe the enhancement requested

Currently the C# ArrowStreamWriter always writes all data in a buffer. This behaviour differs to the Python/C++ implementation, which only writes slices of the buffers when an array has a nonzero offset or size in bytes less than the buffer length. This can be observed by looking at the sizes of IPC files for a whole RecordBatch, compared to slices of the data.

Python:

import pyarrow as pa
import numpy as np


num_rows = 400
rows_per_batch = 100

ints = pa.array(np.arange(0, num_rows, 1, dtype=np.int32))
floats = pa.array(np.arange(0, num_rows / 10.0, 0.1, dtype=np.float32))

all_data = pa.RecordBatch.from_arrays([ints, floats], names=["a", "b"])

sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, all_data.schema) as writer:
    writer.write_batch(all_data)
buf = sink.getvalue()
print(f"Size of serialized full batch = {buf.size}")

for offset in range(0, num_rows, rows_per_batch):
    slice = all_data.slice(offset, rows_per_batch)
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, slice.schema) as writer:
        writer.write_batch(slice)
    buf = sink.getvalue()
    print(f"Size of serialized slice at offset {offset} = {buf.size}")

This outputs:

Size of serialized full batch = 3576
Size of serialized slice at offset 0 = 1176
Size of serialized slice at offset 100 = 1176
Size of serialized slice at offset 200 = 1176
Size of serialized slice at offset 300 = 1176

The size of the full batch is 1/4 the full batch after accounting for the overhead of metadata.

Doing the same in C#:

const int numRows = 400;
const int rowsPerBatch = 100;

var allData = new RecordBatch.Builder()
    .Append("a", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(0, numRows))))
    .Append("b", false, col => col.Float(array => array.AppendRange(Enumerable.Range(0, numRows).Select(i => 0.1f * i))))
    .Build();

{
    using var ms = new MemoryStream();
    using var writer = new ArrowFileWriter(ms, allData.Schema, false, new IpcOptions());
    await writer.WriteStartAsync();
    await writer.WriteRecordBatchAsync(allData);
    await writer.WriteEndAsync();

    Console.WriteLine($"Size of serialized full batch = {ms.Length}");
}

for (var offset = 0; offset < allData.Length; offset += rowsPerBatch)
{
    var arraySlices = allData.Arrays
        .Select(arr => ArrowArrayFactory.Slice(arr, offset, rowsPerBatch))
        .ToArray();
    var slice = new RecordBatch(allData.Schema, arraySlices, arraySlices[0].Length);

    using var ms = new MemoryStream();
    using var writer = new ArrowFileWriter(ms, slice.Schema, false, new IpcOptions());
    await writer.WriteStartAsync();
    await writer.WriteRecordBatchAsync(slice);
    await writer.WriteEndAsync();

    Console.WriteLine($"Size of serialized slice at offset {offset} = {ms.Length}");
}

This outputs:

Size of serialized full batch = 3802
Size of serialized slice at offset 0 = 3802
Size of serialized slice at offset 100 = 3802
Size of serialized slice at offset 200 = 3802
Size of serialized slice at offset 300 = 3802

Writing a slice of the data results in the same file size as writing the full data, but we'd like to be able to break IPC data into smaller slices in order to send it over a transport that has a message size limit. We're currently working around this by copying the data after slicing.

From a quick look at the C++ implementation, one complication is dealing with null bitmaps, which need to be copied to ensure the start is aligned with a byte boundary.

Component(s)

C#

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions