GH-38884: [C++] DatasetWriter release rows_in_flight_throttle when allocate writing failed#38885
Conversation
|
|
|
The associated issue mentions |
|
Updated the issue |
| backpressure = writer_state_.open_files_throttle.Acquire(1); | ||
| if (!backpressure.is_finished()) { | ||
| EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); | ||
| writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); |
There was a problem hiding this comment.
Can we move the rows_in_flight_throttle.Acquire() call to close to the existing rows_in_flight_throttle.Release() call and this Release() call to there? (DatasetWriterFileQueue?)
I think that the Acquire()/Release() pair should be close for easy to maintain like malloc()/free() pair and new/delete pair.
(Can we apply the same approach to the open_files_throttle.Acquire()/Release() pair too?)
There was a problem hiding this comment.
Hmm I think the best way is using something like a "RAII Guard" here. This might need kind of refactor the throttle type here
There was a problem hiding this comment.
Find it a bit hard to get related...
|
Can we add a test for this case? |
Sure I'll finish this after work tonight |
|
Sorry for late replying because so busy this week, during writing test, I found that |
|
@kou I've try to fix some comments, would you mind take a look? |
| // | ||
| // `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue` | ||
| // so we don't need to release it here. | ||
| writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); |
There was a problem hiding this comment.
It seems that the added test is passed without this.
Is it difficult to write a test for this change?
There was a problem hiding this comment.
Aha the added test hang forever on my machine(with debug compile)😂 let me take a look...
There was a problem hiding this comment.
I've update the test, and make it holding less time and easier to reproduce the problem. The test run in MacOS with RelWithDebInfo before this patch will hang forever
There was a problem hiding this comment.
Thanks. I tried again without this change:
diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc
index ae9fb3648..c38e899ab 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -633,7 +633,7 @@ class DatasetWriter::DatasetWriterImpl {
//
// `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue`
// so we don't need to release it here.
- writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
+ // writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
return s;
}
batch = std::move(remainder);And the added test is still passing.
Is it difficult to write a test for this change too? If it's difficult, I'll merge this without a test for this change.
(I confirmed that the added test hanged without another change https://github.com/apache/arrow/pull/38885/files#diff-387ad04c2450a38044e667e07183b8265866cb3736d10acdce137c2b83737b16R624 .)
There was a problem hiding this comment.
Oh I got what you mean, sorry for that! I think I just test https://github.com/apache/arrow/pull/38885/files#diff-387ad04c2450a38044e667e07183b8265866cb3736d10acdce137c2b83737b16R624
DatasetWriterDirectoryQueue::StartWrite is a bit hard to test, because OpenFileQueue always return OK now, DatasetWriterFileQueue::Push also merely failed
|
@kou I've update the code here. And avoid using large batch in testing. I can reproduce hang with same test in |
kou
left a comment
There was a problem hiding this comment.
+1
I'll merge in a few days if nobody objects this.
| // | ||
| // `open_files_throttle` will be handed by `DatasetWriterDirectoryQueue` | ||
| // so we don't need to release it here. | ||
| writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows()); |
|
After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit f7286a9. There was 1 benchmark result indicating a performance regression:
The full Conbench report has more details. |
|
@mapleFU This may be unstable... I re-run the job for now. |
|
FYI: It passed in the second run: https://github.com/apache/arrow/actions/runs/7170522378?pr=39176 |
I'll run in my local machine and find out why |
|
Seems the writer hang when writing the 89th file, and causing the test failed. I need sometime to gothrough the whole codebase of the writer, it's a bit complex because so many async tool is introduced... |
|
I guess |
Future<> WriteNext(std::shared_ptr<RecordBatch> next) {
// May want to prototype / measure someday pushing the async write down further
return DeferNotOk(options_.filesystem->io_context().executor()->Submit(
[self = this, batch = std::move(next)]() {
int64_t rows_to_release = batch->num_rows();
Status status = self->writer_->Write(batch);
self->writer_state_->rows_in_flight_throttle.Release(rows_to_release);
return status;
}));
}
|
Using diff: This can produce similiar, but when enlarge test time-out, the problem disappear. I think that maybe just I'm writing too many files, making schedule writing so slow? |
…hen allocate writing failed (apache#38885) ### Rationale for this change When file-queue is fall or write failed, the `DatasetWriterImpl::DoWriteRecordBatch` might failed, however, the resources are not released. ### What changes are included in this PR? When file-queue is full or cannot open file, release the `row` resources. ### Are these changes tested? yes ### Are there any user-facing changes? no * Closes: apache#38884 Authored-by: mwish <maplewish117@gmail.com> Signed-off-by: Sutou Kouhei <kou@clear-code.com>
Rationale for this change
When file-queue is fall or write failed, the
DatasetWriterImpl::DoWriteRecordBatchmight failed, however, the resources are not released.What changes are included in this PR?
When file-queue is full or cannot open file, release the
rowresources.Are these changes tested?
yes
Are there any user-facing changes?
no