Skip to content

Commit f7286a9

Browse files
authored
GH-38884: [C++] DatasetWriter release rows_in_flight_throttle when allocate writing failed (#38885)
### Rationale for this change When file-queue is fall or write failed, the `DatasetWriterImpl::DoWriteRecordBatch` might failed, however, the resources are not released. ### What changes are included in this PR? When file-queue is full or cannot open file, release the `row` resources. ### Are these changes tested? yes ### Are there any user-facing changes? no * Closes: #38884 Authored-by: mwish <maplewish117@gmail.com> Signed-off-by: Sutou Kouhei <kou@clear-code.com>
1 parent fe83387 commit f7286a9

2 files changed

Lines changed: 42 additions & 4 deletions

File tree

cpp/src/arrow/dataset/dataset_writer.cc

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class Throttle {
8787

8888
private:
8989
Future<> backpressure_ = Future<>::MakeFinished();
90-
uint64_t max_value_;
90+
const uint64_t max_value_;
9191
uint64_t in_waiting_ = 0;
9292
uint64_t current_value_ = 0;
9393
std::mutex mutex_;
@@ -621,11 +621,21 @@ class DatasetWriter::DatasetWriterImpl {
621621
backpressure = writer_state_.open_files_throttle.Acquire(1);
622622
if (!backpressure.is_finished()) {
623623
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
624+
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
624625
RETURN_NOT_OK(TryCloseLargestFile());
625626
break;
626627
}
627628
}
628-
RETURN_NOT_OK(dir_queue->StartWrite(next_chunk));
629+
auto s = dir_queue->StartWrite(next_chunk);
630+
if (!s.ok()) {
631+
// If `StartWrite` succeeded, it will Release the
632+
// `rows_in_flight_throttle` when the write task is finished.
633+
//
634+
// `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue`
635+
// so we don't need to release it here.
636+
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
637+
return s;
638+
}
629639
batch = std::move(remainder);
630640
if (batch) {
631641
RETURN_NOT_OK(dir_queue->FinishCurrentFile());
@@ -647,6 +657,7 @@ class DatasetWriter::DatasetWriterImpl {
647657
DatasetWriterState writer_state_;
648658
std::function<void()> pause_callback_;
649659
std::function<void()> resume_callback_;
660+
// Map from directory + prefix to the queue for that directory
650661
std::unordered_map<std::string, std::shared_ptr<DatasetWriterDirectoryQueue>>
651662
directory_queues_;
652663
std::mutex mutex_;

cpp/src/arrow/dataset/dataset_writer_test.cc

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,18 @@ class DatasetWriterTestFixture : public testing::Test {
189189
}
190190
}
191191

192-
void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) {
192+
void AssertCreatedData(const std::vector<ExpectedFile>& expected_files,
193+
bool check_num_record_batches = true) {
193194
counter_ = 0;
194195
for (const auto& expected_file : expected_files) {
195196
std::optional<MockFileInfo> written_file = FindFile(expected_file.filename);
196197
AssertFileCreated(written_file, expected_file.filename);
197198
int num_batches = 0;
198199
AssertBatchesEqual(*MakeBatch(expected_file.start, expected_file.num_rows),
199200
*ReadAsBatch(written_file->data, &num_batches));
200-
ASSERT_EQ(expected_file.num_record_batches, num_batches);
201+
if (check_num_record_batches) {
202+
ASSERT_EQ(expected_file.num_record_batches, num_batches);
203+
}
201204
}
202205
}
203206

@@ -277,6 +280,30 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
277280
{"testdir/chunk-3.arrow", 30, 5}});
278281
}
279282

283+
TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteBackpresure) {
284+
// GH-38884: This test is to make sure that the writer can handle
285+
// throttle resources in `WriteRecordBatch`.
286+
287+
constexpr auto kFileSizeLimit = static_cast<uint64_t>(10);
288+
write_options_.max_rows_per_file = kFileSizeLimit;
289+
write_options_.max_rows_per_group = kFileSizeLimit;
290+
write_options_.max_open_files = 2;
291+
write_options_.min_rows_per_group = kFileSizeLimit - 1;
292+
auto dataset_writer = MakeDatasetWriter(/*max_rows=*/kFileSizeLimit);
293+
for (int i = 0; i < 20; ++i) {
294+
dataset_writer->WriteRecordBatch(MakeBatch(kFileSizeLimit * 5), "");
295+
}
296+
EndWriterChecked(dataset_writer.get());
297+
std::vector<ExpectedFile> expected_files;
298+
for (int i = 0; i < 100; ++i) {
299+
expected_files.emplace_back("testdir/chunk-" + std::to_string(i) + ".arrow",
300+
kFileSizeLimit * i, kFileSizeLimit);
301+
}
302+
// Not checking the number of record batches because file may contain the
303+
// zero-length record batch.
304+
AssertCreatedData(expected_files, /*check_num_record_batches=*/false);
305+
}
306+
280307
TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithFunctor) {
281308
// Left padding with up to four zeros
282309
write_options_.max_rows_per_group = 10;

0 commit comments

Comments
 (0)