Skip to content

Commit 434f872

Browse files
AlenkaFbkietzjorisvandenbossche
authored
GH-40060: [C++][Python] Basic conversion of RecordBatch to Arrow Tensor - add support for different data types (#40359)
### What changes are included in this PR? - Added support for `RecordBatches` with fields of different type in the conversion `RecordBatch` → `Tensor`. - Added detail of the constraints to the `RecordBatch.to_tensor()` docstrings, see #40064 (comment). ### Are these changes tested? Yes. ### Are there any user-facing changes? No. * GitHub Issue: #40060 Lead-authored-by: AlenkaF <frim.alenka@gmail.com> Co-authored-by: Alenka Frim <AlenkaF@users.noreply.github.com> Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com> Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
1 parent 32437a5 commit 434f872

4 files changed

Lines changed: 268 additions & 51 deletions

File tree

cpp/src/arrow/record_batch.cc

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
#include "arrow/type.h"
3535
#include "arrow/util/iterator.h"
3636
#include "arrow/util/logging.h"
37+
#include "arrow/util/unreachable.h"
3738
#include "arrow/util/vector.h"
39+
#include "arrow/visit_type_inline.h"
3840

3941
namespace arrow {
4042

@@ -248,19 +250,40 @@ Result<std::shared_ptr<StructArray>> RecordBatch::ToStructArray() const {
248250
/*offset=*/0);
249251
}
250252

253+
template <typename Out>
254+
struct ConvertColumnsToTensorVisitor {
255+
Out*& out_values;
256+
const ArrayData& in_data;
257+
258+
template <typename T>
259+
Status Visit(const T&) {
260+
if constexpr (is_numeric(T::type_id)) {
261+
using In = typename T::c_type;
262+
auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);
263+
264+
if constexpr (std::is_same_v<In, Out>) {
265+
memcpy(out_values, in_values.data(), in_values.size_bytes());
266+
out_values += in_values.size();
267+
} else {
268+
for (In in_value : in_values) {
269+
*out_values++ = static_cast<Out>(in_value);
270+
}
271+
}
272+
return Status::OK();
273+
}
274+
Unreachable();
275+
}
276+
};
277+
251278
template <typename DataType>
252279
inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) {
253280
using CType = typename arrow::TypeTraits<DataType>::CType;
254281
auto* out_values = reinterpret_cast<CType*>(out);
255282

256-
// Loop through all of the columns
257-
for (int i = 0; i < batch.num_columns(); ++i) {
258-
const auto* in_values = batch.column(i)->data()->GetValues<CType>(1);
259-
260-
// Copy data of each column
261-
memcpy(out_values, in_values, sizeof(CType) * batch.num_rows());
262-
out_values += batch.num_rows();
263-
} // End loop through columns
283+
for (const auto& column : batch.columns()) {
284+
ConvertColumnsToTensorVisitor<CType> visitor{out_values, *column->data()};
285+
DCHECK_OK(VisitTypeInline(*column->type(), &visitor));
286+
}
264287
}
265288

266289
Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
@@ -269,28 +292,54 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
269292
"Conversion to Tensor for RecordBatches without columns/schema is not "
270293
"supported.");
271294
}
272-
const auto& type = column(0)->type();
273-
// Check for supported data types
274-
if (!is_integer(type->id()) && !is_floating(type->id())) {
275-
return Status::TypeError("DataType is not supported: ", type->ToString());
276-
}
277-
// Check for uniform data type
278295
// Check for no validity bitmap of each field
279296
for (int i = 0; i < num_columns(); ++i) {
280297
if (column(i)->null_count() > 0) {
281298
return Status::TypeError("Can only convert a RecordBatch with no nulls.");
282299
}
283-
if (column(i)->type() != type) {
284-
return Status::TypeError("Can only convert a RecordBatch with uniform data type.");
300+
}
301+
302+
// Check for supported data types and merge fields
303+
// to get the resulting uniform data type
304+
if (!is_integer(column(0)->type()->id()) && !is_floating(column(0)->type()->id())) {
305+
return Status::TypeError("DataType is not supported: ",
306+
column(0)->type()->ToString());
307+
}
308+
std::shared_ptr<Field> result_field = schema_->field(0);
309+
std::shared_ptr<DataType> result_type = result_field->type();
310+
311+
if (num_columns() > 1) {
312+
Field::MergeOptions options;
313+
options.promote_integer_to_float = true;
314+
options.promote_integer_sign = true;
315+
options.promote_numeric_width = true;
316+
317+
for (int i = 1; i < num_columns(); ++i) {
318+
if (!is_numeric(column(i)->type()->id())) {
319+
return Status::TypeError("DataType is not supported: ",
320+
column(i)->type()->ToString());
321+
}
322+
323+
// Casting of float16 is not supported, throw an error in this case
324+
if ((column(i)->type()->id() == Type::HALF_FLOAT ||
325+
result_field->type()->id() == Type::HALF_FLOAT) &&
326+
column(i)->type()->id() != result_field->type()->id()) {
327+
return Status::NotImplemented("Casting from or to halffloat is not supported.");
328+
}
329+
330+
ARROW_ASSIGN_OR_RAISE(
331+
result_field, result_field->MergeWith(
332+
schema_->field(i)->WithName(result_field->name()), options));
285333
}
334+
result_type = result_field->type();
286335
}
287336

288337
// Allocate memory
289338
ARROW_ASSIGN_OR_RAISE(
290339
std::shared_ptr<Buffer> result,
291-
AllocateBuffer(type->bit_width() * num_columns() * num_rows(), pool));
340+
AllocateBuffer(result_type->bit_width() * num_columns() * num_rows(), pool));
292341
// Copy data
293-
switch (type->id()) {
342+
switch (result_type->id()) {
294343
case Type::UINT8:
295344
ConvertColumnsToTensor<UInt8Type>(*this, result->mutable_data());
296345
break;
@@ -323,18 +372,18 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
323372
ConvertColumnsToTensor<DoubleType>(*this, result->mutable_data());
324373
break;
325374
default:
326-
return Status::TypeError("DataType is not supported: ", type->ToString());
375+
return Status::TypeError("DataType is not supported: ", result_type->ToString());
327376
}
328377

329378
// Construct Tensor object
330379
const auto& fixed_width_type =
331-
internal::checked_cast<const FixedWidthType&>(*column(0)->type());
380+
internal::checked_cast<const FixedWidthType&>(*result_type);
332381
std::vector<int64_t> shape = {num_rows(), num_columns()};
333382
std::vector<int64_t> strides;
334383
ARROW_RETURN_NOT_OK(
335384
internal::ComputeColumnMajorStrides(fixed_width_type, shape, &strides));
336385
ARROW_ASSIGN_OR_RAISE(auto tensor,
337-
Tensor::Make(type, std::move(result), shape, strides));
386+
Tensor::Make(result_type, std::move(result), shape, strides));
338387

339388
return tensor;
340389
}

cpp/src/arrow/record_batch_test.cc

Lines changed: 115 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -619,37 +619,37 @@ TEST_F(TestRecordBatch, ConcatenateRecordBatches) {
619619
ASSERT_BATCHES_EQUAL(*batch, *null_batch);
620620
}
621621

622-
TEST_F(TestRecordBatch, ToTensorUnsupported) {
622+
TEST_F(TestRecordBatch, ToTensorUnsupportedType) {
623623
const int length = 9;
624624

625-
// Mixed data type
626625
auto f0 = field("f0", int32());
627-
auto f1 = field("f1", int64());
626+
// Unsupported data type
627+
auto f1 = field("f1", utf8());
628628

629629
std::vector<std::shared_ptr<Field>> fields = {f0, f1};
630630
auto schema = ::arrow::schema(fields);
631631

632632
auto a0 = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]");
633-
auto a1 = ArrayFromJSON(int64(), "[10, 20, 30, 40, 50, 60, 70, 80, 90]");
633+
auto a1 = ArrayFromJSON(utf8(), R"(["a", "b", "c", "a", "b", "c", "a", "b", "c"])");
634634

635635
auto batch = RecordBatch::Make(schema, length, {a0, a1});
636636

637637
ASSERT_RAISES_WITH_MESSAGE(
638-
TypeError, "Type error: Can only convert a RecordBatch with uniform data type.",
638+
TypeError, "Type error: DataType is not supported: " + a1->type()->ToString(),
639639
batch->ToTensor());
640640

641-
// Unsupported data type
642-
auto f2 = field("f2", utf8());
643-
644-
std::vector<std::shared_ptr<Field>> fields_1 = {f2};
645-
auto schema_2 = ::arrow::schema(fields_1);
641+
// Unsupported boolean data type
642+
auto f2 = field("f2", boolean());
646643

647-
auto a2 = ArrayFromJSON(utf8(), R"(["a", "b", "c", "a", "b", "c", "a", "b", "c"])");
648-
auto batch_2 = RecordBatch::Make(schema_2, length, {a2});
644+
std::vector<std::shared_ptr<Field>> fields2 = {f0, f2};
645+
auto schema2 = ::arrow::schema(fields2);
646+
auto a2 = ArrayFromJSON(boolean(),
647+
"[true, false, true, true, false, true, false, true, true]");
648+
auto batch2 = RecordBatch::Make(schema2, length, {a0, a2});
649649

650650
ASSERT_RAISES_WITH_MESSAGE(
651651
TypeError, "Type error: DataType is not supported: " + a2->type()->ToString(),
652-
batch_2->ToTensor());
652+
batch2->ToTensor());
653653
}
654654

655655
TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) {
@@ -740,6 +740,108 @@ TEST_F(TestRecordBatch, ToTensorSupportedNaN) {
740740
CheckTensor<FloatType>(tensor, 18, shape, f_strides);
741741
}
742742

743+
TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) {
744+
const int length = 9;
745+
746+
auto f0 = field("f0", uint16());
747+
auto f1 = field("f1", int16());
748+
auto f2 = field("f2", float32());
749+
750+
auto a0 = ArrayFromJSON(uint16(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]");
751+
auto a1 = ArrayFromJSON(int16(), "[10, 20, 30, 40, 50, 60, 70, 80, 90]");
752+
auto a2 = ArrayFromJSON(float32(), "[100, 200, 300, NaN, 500, 600, 700, 800, 900]");
753+
754+
// Single column
755+
std::vector<std::shared_ptr<Field>> fields = {f0};
756+
auto schema = ::arrow::schema(fields);
757+
auto batch = RecordBatch::Make(schema, length, {a0});
758+
759+
ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor());
760+
ASSERT_OK(tensor->Validate());
761+
762+
std::vector<int64_t> shape = {9, 1};
763+
const int64_t uint16_size = sizeof(uint16_t);
764+
std::vector<int64_t> f_strides = {uint16_size, uint16_size * shape[0]};
765+
std::shared_ptr<Tensor> tensor_expected =
766+
TensorFromJSON(uint16(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]", shape, f_strides);
767+
768+
EXPECT_TRUE(tensor_expected->Equals(*tensor));
769+
CheckTensor<UInt16Type>(tensor, 9, shape, f_strides);
770+
771+
// uint16 + int16 = int32
772+
std::vector<std::shared_ptr<Field>> fields1 = {f0, f1};
773+
auto schema1 = ::arrow::schema(fields1);
774+
auto batch1 = RecordBatch::Make(schema1, length, {a0, a1});
775+
776+
ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor());
777+
ASSERT_OK(tensor1->Validate());
778+
779+
std::vector<int64_t> shape1 = {9, 2};
780+
const int64_t int32_size = sizeof(int32_t);
781+
std::vector<int64_t> f_strides_1 = {int32_size, int32_size * shape1[0]};
782+
std::shared_ptr<Tensor> tensor_expected_1 = TensorFromJSON(
783+
int32(), "[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90]",
784+
shape1, f_strides_1);
785+
786+
EXPECT_TRUE(tensor_expected_1->Equals(*tensor1));
787+
788+
CheckTensor<Int32Type>(tensor1, 18, shape1, f_strides_1);
789+
790+
ASSERT_EQ(tensor1->type()->bit_width(), tensor_expected_1->type()->bit_width());
791+
792+
ASSERT_EQ(1, tensor_expected_1->Value<Int32Type>({0, 0}));
793+
ASSERT_EQ(2, tensor_expected_1->Value<Int32Type>({1, 0}));
794+
ASSERT_EQ(10, tensor_expected_1->Value<Int32Type>({0, 1}));
795+
796+
// uint16 + int16 + float32 = float64
797+
std::vector<std::shared_ptr<Field>> fields2 = {f0, f1, f2};
798+
auto schema2 = ::arrow::schema(fields2);
799+
auto batch2 = RecordBatch::Make(schema2, length, {a0, a1, a2});
800+
801+
ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor());
802+
ASSERT_OK(tensor2->Validate());
803+
804+
std::vector<int64_t> shape2 = {9, 3};
805+
const int64_t f64_size = sizeof(double);
806+
std::vector<int64_t> f_strides_2 = {f64_size, f64_size * shape2[0]};
807+
std::shared_ptr<Tensor> tensor_expected_2 =
808+
TensorFromJSON(float64(),
809+
"[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, "
810+
"60, 70, 80, 90, 100, 200, 300, NaN, 500, 600, 700, 800, 900]",
811+
shape2, f_strides_2);
812+
813+
EXPECT_FALSE(tensor_expected_2->Equals(*tensor2));
814+
EXPECT_TRUE(tensor_expected_2->Equals(*tensor2, EqualOptions().nans_equal(true)));
815+
816+
CheckTensor<DoubleType>(tensor2, 27, shape2, f_strides_2);
817+
}
818+
819+
TEST_F(TestRecordBatch, ToTensorUnsupportedMixedFloat16) {
820+
const int length = 9;
821+
822+
auto f0 = field("f0", float16());
823+
auto f1 = field("f1", float64());
824+
825+
auto a0 = ArrayFromJSON(float16(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]");
826+
auto a1 = ArrayFromJSON(float64(), "[10, 20, 30, 40, 50, 60, 70, 80, 90]");
827+
828+
std::vector<std::shared_ptr<Field>> fields = {f0, f1};
829+
auto schema = ::arrow::schema(fields);
830+
auto batch = RecordBatch::Make(schema, length, {a0, a1});
831+
832+
ASSERT_RAISES_WITH_MESSAGE(
833+
NotImplemented, "NotImplemented: Casting from or to halffloat is not supported.",
834+
batch->ToTensor());
835+
836+
std::vector<std::shared_ptr<Field>> fields1 = {f1, f0};
837+
auto schema1 = ::arrow::schema(fields1);
838+
auto batch1 = RecordBatch::Make(schema1, length, {a1, a0});
839+
840+
ASSERT_RAISES_WITH_MESSAGE(
841+
NotImplemented, "NotImplemented: Casting from or to halffloat is not supported.",
842+
batch1->ToTensor());
843+
}
844+
743845
template <typename DataType>
744846
class TestBatchToTensor : public ::testing::Test {};
745847

python/pyarrow/table.pxi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3392,6 +3392,9 @@ cdef class RecordBatch(_Tabular):
33923392
def to_tensor(self):
33933393
"""
33943394
Convert to a :class:`~pyarrow.Tensor`.
3395+
3396+
RecordBatches that can be converted have fields of type signed or unsigned
3397+
integer or float, including all bit-widths, with no validity bitmask.
33953398
"""
33963399
cdef:
33973400
shared_ptr[CRecordBatch] c_record_batch

0 commit comments

Comments
 (0)