Skip to content

Commit 53c1e0b

Browse files
committed
Merge branch 'master' into fly_0826_reuse
2 parents a1f7046 + d4d4a12 commit 53c1e0b

File tree

106 files changed

+2225
-444
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+2225
-444
lines changed

cpp/CMakeLists.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -669,12 +669,12 @@ if(ARROW_STATIC_LINK_LIBS)
669669
add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS})
670670
endif()
671671

672-
set(ARROW_SHARED_PRIVATE_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} ${BOOST_SYSTEM_LIBRARY}
673-
${BOOST_FILESYSTEM_LIBRARY})
672+
set(ARROW_SHARED_PRIVATE_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} ${BOOST_FILESYSTEM_LIBRARY}
673+
${BOOST_SYSTEM_LIBRARY})
674674

675-
list(APPEND ARROW_STATIC_LINK_LIBS ${BOOST_SYSTEM_LIBRARY} ${BOOST_FILESYSTEM_LIBRARY})
675+
list(APPEND ARROW_STATIC_LINK_LIBS ${BOOST_FILESYSTEM_LIBRARY} ${BOOST_SYSTEM_LIBRARY})
676676

677-
list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS boost_system boost_filesystem boost_regex)
677+
list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS boost_filesystem boost_system boost_regex)
678678

679679
if(NOT MSVC)
680680
list(APPEND ARROW_LINK_LIBS ${CMAKE_DL_LIBS})
@@ -701,8 +701,8 @@ set(ARROW_TEST_SHARED_LINK_LIBS
701701
arrow_shared
702702
${ARROW_LINK_LIBS}
703703
${double-conversion_LIBRARIES}
704-
${BOOST_SYSTEM_LIBRARY}
705704
${BOOST_FILESYSTEM_LIBRARY}
705+
${BOOST_SYSTEM_LIBRARY}
706706
${ARROW_TEST_LINK_TOOLCHAIN})
707707

708708
if(NOT MSVC)

cpp/src/arrow/compare.cc

Lines changed: 99 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,38 +1008,39 @@ bool ArrayRangeEquals(const Array& left, const Array& right, int64_t left_start_
10081008
return are_equal;
10091009
}
10101010

1011-
bool StridedTensorContentEquals(int dim_index, int64_t left_offset, int64_t right_offset,
1012-
int elem_size, const Tensor& left, const Tensor& right) {
1011+
namespace {
1012+
1013+
bool StridedIntegerTensorContentEquals(const int dim_index, int64_t left_offset,
1014+
int64_t right_offset, int elem_size,
1015+
const Tensor& left, const Tensor& right) {
1016+
const auto n = left.shape()[dim_index];
1017+
const auto left_stride = left.strides()[dim_index];
1018+
const auto right_stride = right.strides()[dim_index];
10131019
if (dim_index == left.ndim() - 1) {
1014-
for (int64_t i = 0; i < left.shape()[dim_index]; ++i) {
1015-
if (memcmp(left.raw_data() + left_offset + i * left.strides()[dim_index],
1016-
right.raw_data() + right_offset + i * right.strides()[dim_index],
1017-
elem_size) != 0) {
1020+
for (int64_t i = 0; i < n; ++i) {
1021+
if (memcmp(left.raw_data() + left_offset + i * left_stride,
1022+
right.raw_data() + right_offset + i * right_stride, elem_size) != 0) {
10181023
return false;
10191024
}
10201025
}
10211026
return true;
10221027
}
1023-
for (int64_t i = 0; i < left.shape()[dim_index]; ++i) {
1024-
if (!StridedTensorContentEquals(dim_index + 1, left_offset, right_offset, elem_size,
1025-
left, right)) {
1028+
for (int64_t i = 0; i < n; ++i) {
1029+
if (!StridedIntegerTensorContentEquals(dim_index + 1, left_offset, right_offset,
1030+
elem_size, left, right)) {
10261031
return false;
10271032
}
1028-
left_offset += left.strides()[dim_index];
1029-
right_offset += right.strides()[dim_index];
1033+
left_offset += left_stride;
1034+
right_offset += right_stride;
10301035
}
10311036
return true;
10321037
}
10331038

1034-
bool TensorEquals(const Tensor& left, const Tensor& right) {
1039+
bool IntegerTensorEquals(const Tensor& left, const Tensor& right) {
10351040
bool are_equal;
10361041
// The arrays are the same object
10371042
if (&left == &right) {
10381043
are_equal = true;
1039-
} else if (left.type_id() != right.type_id()) {
1040-
are_equal = false;
1041-
} else if (left.size() == 0) {
1042-
are_equal = true;
10431044
} else {
10441045
const bool left_row_major_p = left.is_row_major();
10451046
const bool left_column_major_p = left.is_column_major();
@@ -1048,14 +1049,9 @@ bool TensorEquals(const Tensor& left, const Tensor& right) {
10481049

10491050
if (!(left_row_major_p && right_row_major_p) &&
10501051
!(left_column_major_p && right_column_major_p)) {
1051-
const auto& shape = left.shape();
1052-
if (shape != right.shape()) {
1053-
are_equal = false;
1054-
} else {
1055-
const auto& type = checked_cast<const FixedWidthType&>(*left.type());
1056-
are_equal =
1057-
StridedTensorContentEquals(0, 0, 0, type.bit_width() / 8, left, right);
1058-
}
1052+
const auto& type = checked_cast<const FixedWidthType&>(*left.type());
1053+
are_equal =
1054+
StridedIntegerTensorContentEquals(0, 0, 0, type.bit_width() / 8, left, right);
10591055
} else {
10601056
const auto& size_meta = checked_cast<const FixedWidthType&>(*left.type());
10611057
const int byte_width = size_meta.bit_width() / CHAR_BIT;
@@ -1071,6 +1067,85 @@ bool TensorEquals(const Tensor& left, const Tensor& right) {
10711067
return are_equal;
10721068
}
10731069

1070+
template <typename DataType>
1071+
bool StridedFloatTensorContentEquals(const int dim_index, int64_t left_offset,
1072+
int64_t right_offset, const Tensor& left,
1073+
const Tensor& right, const EqualOptions& opts) {
1074+
using c_type = typename DataType::c_type;
1075+
const auto n = left.shape()[dim_index];
1076+
const auto left_stride = left.strides()[dim_index];
1077+
const auto right_stride = right.strides()[dim_index];
1078+
if (dim_index == left.ndim() - 1) {
1079+
auto left_data = left.raw_data();
1080+
auto right_data = right.raw_data();
1081+
if (opts.nans_equal()) {
1082+
for (int64_t i = 0; i < n; ++i) {
1083+
c_type left_value =
1084+
*reinterpret_cast<const c_type*>(left_data + left_offset + i * left_stride);
1085+
c_type right_value = *reinterpret_cast<const c_type*>(right_data + right_offset +
1086+
i * right_stride);
1087+
if (!(left_value == right_value ||
1088+
(std::isnan(left_value) && std::isnan(right_value)))) {
1089+
return false;
1090+
}
1091+
}
1092+
} else {
1093+
for (int64_t i = 0; i < n; ++i) {
1094+
c_type left_value =
1095+
*reinterpret_cast<const c_type*>(left_data + left_offset + i * left_stride);
1096+
c_type right_value = *reinterpret_cast<const c_type*>(right_data + right_offset +
1097+
i * right_stride);
1098+
if (left_value != right_value) {
1099+
return false;
1100+
}
1101+
}
1102+
}
1103+
return true;
1104+
}
1105+
for (int64_t i = 0; i < n; ++i) {
1106+
if (!StridedFloatTensorContentEquals<DataType>(dim_index + 1, left_offset,
1107+
right_offset, left, right, opts)) {
1108+
return false;
1109+
}
1110+
left_offset += left_stride;
1111+
right_offset += right_stride;
1112+
}
1113+
return true;
1114+
}
1115+
1116+
template <typename DataType>
1117+
bool FloatTensorEquals(const Tensor& left, const Tensor& right,
1118+
const EqualOptions& opts) {
1119+
static_assert(std::is_floating_point<typename DataType::c_type>::value,
1120+
"DataType must be a floating point type");
1121+
return StridedFloatTensorContentEquals<DataType>(0, 0, 0, left, right, opts);
1122+
}
1123+
1124+
} // namespace
1125+
1126+
bool TensorEquals(const Tensor& left, const Tensor& right, const EqualOptions& opts) {
1127+
if (left.type_id() != right.type_id()) {
1128+
return false;
1129+
} else if (left.size() == 0 && right.size() == 0) {
1130+
return true;
1131+
} else if (left.shape() != right.shape()) {
1132+
return false;
1133+
}
1134+
1135+
switch (left.type_id()) {
1136+
// TODO: Support half-float tensors
1137+
// case Type::HALF_FLOAT:
1138+
case Type::FLOAT:
1139+
return FloatTensorEquals<FloatType>(left, right, opts);
1140+
1141+
case Type::DOUBLE:
1142+
return FloatTensorEquals<DoubleType>(left, right, opts);
1143+
1144+
default:
1145+
return IntegerTensorEquals(left, right);
1146+
}
1147+
}
1148+
10741149
namespace {
10751150

10761151
template <typename LeftSparseIndexType, typename RightSparseIndexType>

cpp/src/arrow/compare.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ class EqualOptions {
8282
bool ARROW_EXPORT ArrayEquals(const Array& left, const Array& right,
8383
const EqualOptions& = EqualOptions::Defaults());
8484

85-
bool ARROW_EXPORT TensorEquals(const Tensor& left, const Tensor& right);
85+
bool ARROW_EXPORT TensorEquals(const Tensor& left, const Tensor& right,
86+
const EqualOptions& = EqualOptions::Defaults());
8687

8788
/// EXPERIMENTAL: Returns true if the given sparse tensors are exactly equal
8889
bool ARROW_EXPORT SparseTensorEquals(const SparseTensor& left, const SparseTensor& right);

cpp/src/arrow/dataset/dataset.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Status SimpleDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
3434
std::unique_ptr<ScanTaskIterator>* out) {
3535
// Make an explicit copy of record_batches_ to ensure Scan can be called
3636
// multiple times.
37-
auto it = MakeIterator(record_batches_);
37+
auto it = MakeVectorIterator(record_batches_);
3838

3939
// RecordBatch -> ScanTask
4040
auto fn = [](std::shared_ptr<RecordBatch> batch) -> std::unique_ptr<ScanTask> {

cpp/src/arrow/dataset/dataset.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
9494

9595
std::unique_ptr<DataFragmentIterator> GetFragments(
9696
std::shared_ptr<ScanOptions> options) override {
97-
return MakeIterator(fragments_);
97+
return MakeVectorIterator(fragments_);
9898
}
9999

100100
std::string type() const override { return "simple_data_source"; }

cpp/src/arrow/dataset/file_base.cc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717

1818
#include "arrow/dataset/file_base.h"
1919

20+
#include <algorithm>
21+
#include <vector>
22+
2023
#include "arrow/filesystem/filesystem.h"
2124
#include "arrow/io/interfaces.h"
2225
#include "arrow/io/memory.h"
26+
#include "arrow/util/stl.h"
2327

2428
namespace arrow {
2529
namespace dataset {
@@ -41,5 +45,68 @@ Status FileBasedDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
4145
return format_->ScanFile(source_, scan_options_, scan_context, out);
4246
}
4347

48+
FileSystemBasedDataSource::FileSystemBasedDataSource(
49+
fs::FileSystem* filesystem, const fs::Selector& selector,
50+
std::shared_ptr<FileFormat> format, std::shared_ptr<ScanOptions> scan_options,
51+
std::vector<fs::FileStats> stats)
52+
: filesystem_(filesystem),
53+
selector_(std::move(selector)),
54+
format_(std::move(format)),
55+
scan_options_(std::move(scan_options)),
56+
stats_(std::move(stats)) {}
57+
58+
Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
59+
const fs::Selector& selector,
60+
std::shared_ptr<FileFormat> format,
61+
std::shared_ptr<ScanOptions> scan_options,
62+
std::unique_ptr<FileSystemBasedDataSource>* out) {
63+
std::vector<fs::FileStats> stats;
64+
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats));
65+
66+
auto new_end =
67+
std::remove_if(stats.begin(), stats.end(), [&](const fs::FileStats& stats) {
68+
return stats.type() != fs::FileType::File ||
69+
!format->IsKnownExtension(stats.extension());
70+
});
71+
stats.resize(new_end - stats.begin());
72+
73+
out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format),
74+
std::move(scan_options), std::move(stats)));
75+
return Status::OK();
76+
}
77+
78+
std::unique_ptr<DataFragmentIterator> FileSystemBasedDataSource::GetFragments(
79+
std::shared_ptr<ScanOptions> options) {
80+
struct Impl : DataFragmentIterator {
81+
Impl(fs::FileSystem* filesystem, std::shared_ptr<FileFormat> format,
82+
std::shared_ptr<ScanOptions> scan_options, std::vector<fs::FileStats> stats)
83+
: filesystem_(filesystem),
84+
format_(std::move(format)),
85+
scan_options_(std::move(scan_options)),
86+
stats_(std::move(stats)) {}
87+
88+
Status Next(std::shared_ptr<DataFragment>* out) {
89+
if (i_ == stats_.size()) {
90+
*out = nullptr;
91+
return Status::OK();
92+
}
93+
FileSource src(stats_[i_++].path(), filesystem_);
94+
95+
std::unique_ptr<DataFragment> fragment;
96+
RETURN_NOT_OK(format_->MakeFragment(src, scan_options_, &fragment));
97+
*out = std::move(fragment);
98+
return Status::OK();
99+
}
100+
101+
size_t i_ = 0;
102+
fs::FileSystem* filesystem_;
103+
std::shared_ptr<FileFormat> format_;
104+
std::shared_ptr<ScanOptions> scan_options_;
105+
std::vector<fs::FileStats> stats_;
106+
};
107+
108+
return internal::make_unique<Impl>(filesystem_, format_, options, stats_);
109+
}
110+
44111
} // namespace dataset
45112
} // namespace arrow

cpp/src/arrow/dataset/file_base.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
#include <memory>
2121
#include <string>
2222
#include <utility>
23+
#include <vector>
2324

2425
#include "arrow/buffer.h"
2526
#include "arrow/dataset/dataset.h"
2627
#include "arrow/dataset/scanner.h"
2728
#include "arrow/dataset/type_fwd.h"
2829
#include "arrow/dataset/visibility.h"
2930
#include "arrow/dataset/writer.h"
31+
#include "arrow/filesystem/filesystem.h"
3032
#include "arrow/io/file.h"
3133
#include "arrow/util/compression.h"
3234

@@ -130,6 +132,11 @@ class ARROW_DS_EXPORT FileFormat {
130132
std::shared_ptr<ScanOptions> scan_options,
131133
std::shared_ptr<ScanContext> scan_context,
132134
std::unique_ptr<ScanTaskIterator>* out) const = 0;
135+
136+
/// \brief Open a fragment
137+
virtual Status MakeFragment(const FileSource& location,
138+
std::shared_ptr<ScanOptions> opts,
139+
std::unique_ptr<DataFragment>* out) = 0;
133140
};
134141

135142
/// \brief A DataFragment that is stored in a file with a known format
@@ -147,11 +154,43 @@ class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment {
147154
const FileSource& source() const { return source_; }
148155
std::shared_ptr<FileFormat> format() const { return format_; }
149156

157+
std::shared_ptr<ScanOptions> scan_options() const override { return scan_options_; }
158+
150159
protected:
151160
FileSource source_;
152161
std::shared_ptr<FileFormat> format_;
153162
std::shared_ptr<ScanOptions> scan_options_;
154163
};
155164

165+
/// \brief A DataSource which takes files of one format from a directory
166+
///
167+
/// The directory is crawled upon construction (Make) and not updated afterward.
168+
/// GetFragments() will not include files added after this DataDource is constructed and
169+
/// will error if files are deleted/moved.
170+
class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource {
171+
public:
172+
static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector,
173+
std::shared_ptr<FileFormat> format,
174+
std::shared_ptr<ScanOptions> scan_options,
175+
std::unique_ptr<FileSystemBasedDataSource>* out);
176+
177+
std::string type() const override { return "directory"; }
178+
179+
std::unique_ptr<DataFragmentIterator> GetFragments(
180+
std::shared_ptr<ScanOptions> options) override;
181+
182+
protected:
183+
FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector,
184+
std::shared_ptr<FileFormat> format,
185+
std::shared_ptr<ScanOptions> scan_options,
186+
std::vector<fs::FileStats> stats);
187+
188+
fs::FileSystem* filesystem_ = NULLPTR;
189+
fs::Selector selector_;
190+
std::shared_ptr<FileFormat> format_;
191+
std::shared_ptr<ScanOptions> scan_options_;
192+
std::vector<fs::FileStats> stats_;
193+
};
194+
156195
} // namespace dataset
157196
} // namespace arrow

cpp/src/arrow/dataset/file_parquet.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "arrow/table.h"
2424
#include "arrow/util/iterator.h"
2525
#include "arrow/util/range.h"
26+
#include "arrow/util/stl.h"
2627
#include "parquet/arrow/reader.h"
2728
#include "parquet/file_reader.h"
2829

@@ -174,5 +175,13 @@ Status ParquetFileFormat::ScanFile(const FileSource& source,
174175
out);
175176
}
176177

178+
Status ParquetFileFormat::MakeFragment(const FileSource& source,
179+
std::shared_ptr<ScanOptions> opts,
180+
std::unique_ptr<DataFragment>* out) {
181+
// TODO(bkietz) check location.path() against IsKnownExtension etc
182+
*out = internal::make_unique<ParquetFragment>(source, opts);
183+
return Status::OK();
184+
}
185+
177186
} // namespace dataset
178187
} // namespace arrow

0 commit comments

Comments
 (0)