Skip to content

GH-38884: [C++] DatasetWriter release rows_in_flight_throttle when allocate writing failed#38885

Merged
kou merged 8 commits intoapache:mainfrom
mapleFU:dataset/release-resources
Dec 7, 2023
Merged

GH-38884: [C++] DatasetWriter release rows_in_flight_throttle when allocate writing failed#38885
kou merged 8 commits intoapache:mainfrom
mapleFU:dataset/release-resources

Conversation

@mapleFU
Copy link
Copy Markdown
Member

@mapleFU mapleFU commented Nov 26, 2023

Rationale for this change

When file-queue is fall or write failed, the DatasetWriterImpl::DoWriteRecordBatch might failed, however, the resources are not released.

What changes are included in this PR?

When file-queue is full or cannot open file, release the row resources.

Are these changes tested?

yes

Are there any user-facing changes?

no

@mapleFU mapleFU requested a review from westonpace as a code owner November 26, 2023 11:09
@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Nov 26, 2023

cc @kou @bkietz

@github-actions
Copy link
Copy Markdown

⚠️ GitHub issue #38884 has been automatically assigned in GitHub to PR creator.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Nov 26, 2023
@mapleFU mapleFU changed the title GH-38884: [C++] DatasetWriter release resource when File is full GH-38884: [C++] DatasetWriter release rows_in_flight_throttle when allocate writing failed Nov 26, 2023
@kou
Copy link
Copy Markdown
Member

kou commented Nov 27, 2023

The associated issue mentions DatasetWriterFileQueue::WriteNext but this PR mentions DatasetWriterImpl::DoWriteRecordBatch not DatasetWriterFileQueue::WriteNext. What is the relation of them in this context?

@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Nov 27, 2023

Updated the issue

backpressure = writer_state_.open_files_throttle.Acquire(1);
if (!backpressure.is_finished()) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the rows_in_flight_throttle.Acquire() call to close to the existing rows_in_flight_throttle.Release() call and this Release() call to there? (DatasetWriterFileQueue?)
I think that the Acquire()/Release() pair should be close for easy to maintain like malloc()/free() pair and new/delete pair.

(Can we apply the same approach to the open_files_throttle.Acquire()/Release() pair too?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think the best way is using something like a "RAII Guard" here. This might need kind of refactor the throttle type here

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find it a bit hard to get related...

@kou
Copy link
Copy Markdown
Member

kou commented Nov 27, 2023

Can we add a test for this case?

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Nov 27, 2023
@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Nov 27, 2023

Can we add a test for this case?

Sure I'll finish this after work tonight

@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Nov 29, 2023

Sorry for late replying because so busy this week, during writing test, I found that dataset_writer might write chunk with size == 0, is this expected?

@mapleFU mapleFU requested a review from kou November 29, 2023 16:18
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Nov 29, 2023
@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Nov 30, 2023

@kou I've try to fix some comments, would you mind take a look?

//
// `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue`
// so we don't need to release it here.
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the added test is passed without this.
Is it difficult to write a test for this change?

Copy link
Copy Markdown
Member Author

@mapleFU mapleFU Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha the added test hang forever on my machine(with debug compile)😂 let me take a look...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've update the test, and make it holding less time and easier to reproduce the problem. The test run in MacOS with RelWithDebInfo before this patch will hang forever

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I tried again without this change:

diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc
index ae9fb3648..c38e899ab 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -633,7 +633,7 @@ class DatasetWriter::DatasetWriterImpl {
         //
         // `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue`
         // so we don't need to release it here.
-        writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
+        // writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
         return s;
       }
       batch = std::move(remainder);

And the added test is still passing.
Is it difficult to write a test for this change too? If it's difficult, I'll merge this without a test for this change.
(I confirmed that the added test hanged without another change https://github.com/apache/arrow/pull/38885/files#diff-387ad04c2450a38044e667e07183b8265866cb3736d10acdce137c2b83737b16R624 .)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I got what you mean, sorry for that! I think I just test https://github.com/apache/arrow/pull/38885/files#diff-387ad04c2450a38044e667e07183b8265866cb3736d10acdce137c2b83737b16R624

DatasetWriterDirectoryQueue::StartWrite is a bit hard to test, because OpenFileQueue always return OK now, DatasetWriterFileQueue::Push also merely failed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Dec 1, 2023
@mapleFU mapleFU requested a review from kou December 1, 2023 05:53
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Dec 1, 2023
@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Dec 4, 2023

@kou I've update the code here. And avoid using large batch in testing. I can reproduce hang with same test in main branch with RelWithDebInfo mode

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Dec 4, 2023
Copy link
Copy Markdown
Member

@kou kou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I'll merge in a few days if nobody objects this.

//
// `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue`
// so we don't need to release it here.
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting changes Awaiting changes labels Dec 4, 2023
@kou kou merged commit f7286a9 into apache:main Dec 7, 2023
@kou kou removed the awaiting merge Awaiting merge label Dec 7, 2023
@mapleFU mapleFU deleted the dataset/release-resources branch December 7, 2023 08:24
@conbench-apache-arrow
Copy link
Copy Markdown

After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit f7286a9.

There was 1 benchmark result indicating a performance regression:

The full Conbench report has more details.

@kou
Copy link
Copy Markdown
Member

kou commented Dec 11, 2023

@mapleFU This may be unstable...
It failed in "Java JNI" CI of #39176: https://github.com/apache/arrow/actions/runs/7170522378/job/19523443755?pr=39176#step:7:3284

[ RUN      ] DatasetWriterTestFixture.MaxRowsOneWriteBackpresure
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:115: Failure
Value of: _fut.Wait(::arrow::kDefaultAssertFinishesWaitSeconds)
  Actual: false
Expected: true
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:154: Failure
Expected: (found) != (actual_paths.end()), actual: 8-byte object <88-4C 00-00 E9-7F 00-00> vs 8-byte object <88-4C 00-00 E9-7F 00-00>
The file testdir/chunk-89.arrow was not in the list of files visited
Google Test trace:
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:183: pre_finish
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:154: Failure
Expected: (found) != (actual_paths.end()), actual: 8-byte object <F8-34 00-F4 E8-7F 00-00> vs 8-byte object <F8-34 00-F4 E8-7F 00-00>
The file testdir/chunk-89.arrow was not in the list of files visited
Google Test trace:
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:187: post_finish
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:161: Failure
Value of: _st.ok()
  Actual: false
Expected: true
'_error_or_value27.status()' failed with Invalid: File is too small: 0
/arrow/cpp/src/arrow/result.cc:28: ValueOrDie called on an error: Invalid: File is too small: 0
/build/cpp/release/arrow-dataset-dataset-writer-test[0xd7d0e8]
/build/cpp/release/arrow-dataset-dataset-writer-test[0xd7d43d]
/build/cpp/release/arrow-dataset-dataset-writer-test[0xe5f462]
/build/cpp/release/arrow-dataset-dataset-writer-test[0xe6b6e7]
/build/cpp/release/arrow-dataset-dataset-writer-test[0x5fb7fa]
/build/cpp/release/arrow-dataset-dataset-writer-test[0x5eeec9]
/build/cpp/release/libarrow_gtest.so.1.11.0(void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*)+0x5f)[0x7fe90eabfb6f]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::Test::Run()+0xce)[0x7fe90eab489e]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::TestInfo::Run()+0x155)[0x7fe90eab4a15]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::TestSuite::Run()+0xd9)[0x7fe90eab4fd9]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::internal::UnitTestImpl::RunAllTests()+0x51a)[0x7fe90eab56ba]
/build/cpp/release/libarrow_gtest.so.1.11.0(bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*)+0x5f)[0x7fe90eabffef]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::UnitTest::Run()+0x7c)[0x7fe90eab4abc]
/build/cpp/release/libarrow_gtest_main.so.1.11.0(main+0x3b)[0x7fe90eae70eb]
/lib64/libc.so.6(__libc_start_main+0xf5)[0x7fe90d6fe555]
/build/cpp/release/arrow-dataset-dataset-writer-test[0x5d39c7]
/build/cpp/src/arrow/dataset

I re-run the job for now.

@kou
Copy link
Copy Markdown
Member

kou commented Dec 11, 2023

FYI: It passed in the second run: https://github.com/apache/arrow/actions/runs/7170522378?pr=39176

@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Dec 12, 2023

@mapleFU This may be unstable... It failed in "Java JNI" CI of #39176: https://github.com/apache/arrow/actions/runs/7170522378/job/19523443755?pr=39176#step:7:3284

[ RUN      ] DatasetWriterTestFixture.MaxRowsOneWriteBackpresure
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:115: Failure
Value of: _fut.Wait(::arrow::kDefaultAssertFinishesWaitSeconds)
  Actual: false
Expected: true
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:154: Failure
Expected: (found) != (actual_paths.end()), actual: 8-byte object <88-4C 00-00 E9-7F 00-00> vs 8-byte object <88-4C 00-00 E9-7F 00-00>
The file testdir/chunk-89.arrow was not in the list of files visited
Google Test trace:
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:183: pre_finish
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:154: Failure
Expected: (found) != (actual_paths.end()), actual: 8-byte object <F8-34 00-F4 E8-7F 00-00> vs 8-byte object <F8-34 00-F4 E8-7F 00-00>
The file testdir/chunk-89.arrow was not in the list of files visited
Google Test trace:
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:187: post_finish
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:161: Failure
Value of: _st.ok()
  Actual: false
Expected: true
'_error_or_value27.status()' failed with Invalid: File is too small: 0
/arrow/cpp/src/arrow/result.cc:28: ValueOrDie called on an error: Invalid: File is too small: 0
/build/cpp/release/arrow-dataset-dataset-writer-test[0xd7d0e8]
/build/cpp/release/arrow-dataset-dataset-writer-test[0xd7d43d]
/build/cpp/release/arrow-dataset-dataset-writer-test[0xe5f462]
/build/cpp/release/arrow-dataset-dataset-writer-test[0xe6b6e7]
/build/cpp/release/arrow-dataset-dataset-writer-test[0x5fb7fa]
/build/cpp/release/arrow-dataset-dataset-writer-test[0x5eeec9]
/build/cpp/release/libarrow_gtest.so.1.11.0(void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*)+0x5f)[0x7fe90eabfb6f]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::Test::Run()+0xce)[0x7fe90eab489e]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::TestInfo::Run()+0x155)[0x7fe90eab4a15]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::TestSuite::Run()+0xd9)[0x7fe90eab4fd9]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::internal::UnitTestImpl::RunAllTests()+0x51a)[0x7fe90eab56ba]
/build/cpp/release/libarrow_gtest.so.1.11.0(bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*)+0x5f)[0x7fe90eabffef]
/build/cpp/release/libarrow_gtest.so.1.11.0(testing::UnitTest::Run()+0x7c)[0x7fe90eab4abc]
/build/cpp/release/libarrow_gtest_main.so.1.11.0(main+0x3b)[0x7fe90eae70eb]
/lib64/libc.so.6(__libc_start_main+0xf5)[0x7fe90d6fe555]
/build/cpp/release/arrow-dataset-dataset-writer-test[0x5d39c7]
/build/cpp/src/arrow/dataset

I re-run the job for now.

I'll run in my local machine and find out why

@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Dec 13, 2023

***Failed 64.16 sec

[ RUN      ] DatasetWriterTestFixture.MaxRowsOneWriteBackpresure
/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:115: Failure
Value of: _fut.Wait(::arrow::kDefaultAssertFinishesWaitSeconds)
  Actual: false
Expected: true

Seems the writer hang when writing the 89th file, and causing the test failed. I need sometime to gothrough the whole codebase of the writer, it's a bit complex because so many async tool is introduced...

@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Dec 13, 2023

I guess DatasetWriterFileQueue::WriteNext form a kind of deadlock or other in current impl, but I need time to checking it

@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Dec 13, 2023

  Future<> WriteNext(std::shared_ptr<RecordBatch> next) {
    // May want to prototype / measure someday pushing the async write down further
    return DeferNotOk(options_.filesystem->io_context().executor()->Submit(
        [self = this, batch = std::move(next)]() {
          int64_t rows_to_release = batch->num_rows();
          Status status = self->writer_->Write(batch);
          self->writer_state_->rows_in_flight_throttle.Release(rows_to_release);
          return status;
        }));
  }

self->writer_state_->rows_in_flight_throttle.Release might trigger the callback, I guess this might trigger the problem. But I need some time to convince @kou

@mapleFU
Copy link
Copy Markdown
Member Author

mapleFU commented Dec 14, 2023

/Users/fuxuwei/workspace/CMakeLibs/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:115: Failure
Value of: _fut.Wait(::arrow::kDefaultAssertFinishesWaitSeconds)
  Actual: false
Expected: true
/Users/fuxuwei/workspace/CMakeLibs/arrow/cpp/src/arrow/result.cc:28: ValueOrDie called on an error: Invalid: File is too small: 0
/Users/fuxuwei/workspace/CMakeLibs/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:154: Failure
Expected: (found) != (actual_paths.end()), actual: 8-byte object <E8-9B 00-53 01-00 00-00> vs 8-byte object <E8-9B 00-53 01-00 00-00>
The file testdir/chunk-63.arrow was not in the list of files visited
Google Test trace:
/Users/fuxuwei/workspace/CMakeLibs/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:183: pre_finish
/Users/fuxuwei/workspace/CMakeLibs/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:154: Failure
Expected: (found) != (actual_paths.end()), actual: 8-byte object <E8-8B 00-54 01-00 00-00> vs 8-byte object <E8-8B 00-54 01-00 00-00>
The file testdir/chunk-63.arrow was not in the list of files visited
Google Test trace:
/Users/fuxuwei/workspace/CMakeLibs/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:187: post_finish
/Users/fuxuwei/workspace/CMakeLibs/arrow/cpp/src/arrow/dataset/dataset_writer_test.cc:162: Failure
Value of: _st.ok()
  Actual: false
Expected: true
'_error_or_value27.status()' failed with Invalid: File is too small: 0
0   libarrow.1500.0.0.dylib             0x00000001082d1ecc _ZN5arrow4util7CerrLog14PrintBackTraceEv + 44
1   libarrow.1500.0.0.dylib             0x00000001082d1e80 _ZN5arrow4util7CerrLogD2Ev + 184
2   libarrow.1500.0.0.dylib             0x00000001082d1dc0 _ZN5arrow4util7CerrLogD0Ev + 12
3   libarrow.1500.0.0.dylib             0x00000001082d1d5c _ZN5arrow4util8ArrowLogD1Ev + 48
4   libarrow.1500.0.0.dylib             0x00000001081a7a30 _ZN5arrow8internal17InvalidValueOrDieERKNS_6StatusE + 240
5   arrow-dataset-dataset-writer-test   0x0000000104560ca0 _ZN5arrow7dataset8internal24DatasetWriterTestFixture11ReadAsBatchENSt3__117basic_string_viewIcNS3_11char_traitsIcEEEEPi + 4952
6   arrow-dataset-dataset-writer-test   0x0000000104550390 _ZN5arrow7dataset8internal24DatasetWriterTestFixture17AssertCreatedDataERKNSt3__16vectorINS2_12ExpectedFileENS3_9allocatorIS5_EEEEb + 192
7   arrow-dataset-dataset-writer-test   0x0000000104551d14 _ZN5arrow7dataset8internal56DatasetWriterTestFixture_MaxRowsOneWriteBackpresure_Test8TestBodyEv + 652
8   libarrow_gtest.1.11.0.dylib         0x00000001049d1bdc _ZN7testing8internal35HandleExceptionsInMethodIfSupportedINS_4TestEvEET0_PT_MS4_FS3_vEPKc + 92
9   libarrow_gtest.1.11.0.dylib         0x00000001049d1b30 _ZN7testing4Test3RunEv + 444
10  libarrow_gtest.1.11.0.dylib         0x00000001049d329c _ZN7testing8TestInfo3RunEv + 492
11  libarrow_gtest.1.11.0.dylib         0x00000001049d3e58 _ZN7testing9TestSuite3RunEv + 352
12  libarrow_gtest.1.11.0.dylib         0x00000001049e3c3c _ZN7testing8internal12UnitTestImpl11RunAllTestsEv + 2120
13  libarrow_gtest.1.11.0.dylib         0x00000001049e32d0 _ZN7testing8internal35HandleExceptionsInMethodIfSupportedINS0_12UnitTestImplEbEET0_PT_MS4_FS3_vEPKc + 92
14  libarrow_gtest.1.11.0.dylib         0x00000001049e3240 _ZN7testing8UnitTest3RunEv + 124
15  libarrow_gtest_main.1.11.0.dylib    0x0000000104627e9c main + 68
16  dyld                                0x00000001046f50f4 start + 520
Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

Using diff:

     // May want to prototype / measure someday pushing the async write down further
     return DeferNotOk(options_.filesystem->io_context().executor()->Submit(
         [self = this, batch = std::move(next)]() {
+          // Sleep 1 s
+          std::this_thread::sleep_for(std::chrono::seconds(1));
           int64_t rows_to_release = batch->num_rows();
           Status status = self->writer_->Write(batch);
           self->writer_state_->rows_in_flight_throttle.Release(rows_to_release);

This can produce similiar, but when enlarge test time-out, the problem disappear. I think that maybe just I'm writing too many files, making schedule writing so slow?

dgreiss pushed a commit to dgreiss/arrow that referenced this pull request Feb 19, 2024
…hen allocate writing failed (apache#38885)

### Rationale for this change

When file-queue is fall or write failed, the `DatasetWriterImpl::DoWriteRecordBatch` might failed, however, the resources are not released.

### What changes are included in this PR?

When file-queue is full or cannot open file, release the `row` resources.

### Are these changes tested?

yes

### Are there any user-facing changes?

no

* Closes: apache#38884

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[C++] DatasetWriter cleanup resource when not able to write

2 participants