Add pipelined & parallel compression optimization#6262
Add pipelined & parallel compression optimization#6262cp5555 wants to merge 7 commits intofacebook:masterfrom
Conversation
include/rocksdb/advanced_options.h
Outdated
There was a problem hiding this comment.
Please rename the option name to something more specific.
There was a problem hiding this comment.
Changed this option to parallel_threads.
include/rocksdb/advanced_options.h
Outdated
There was a problem hiding this comment.
Do we need to when we already have the previous option?
There was a problem hiding this comment.
Removed this since parallel_threads is sufficient.
util/work_queue.h
Outdated
There was a problem hiding this comment.
Please copy&past the header from another file.
There was a problem hiding this comment.
Copied RocksDB header for work_queue.h.
util/work_queue.h
Outdated
There was a problem hiding this comment.
Are these source code copied from somewhere else? If not I suggest you keep the comment format similar to other code: "//" for each line.
There was a problem hiding this comment.
Can you put this information in a code comment? Maybe after line 27.
There was a problem hiding this comment.
Added. Also add similar information in work_queue_test.cc.
util/work_queue.h
Outdated
There was a problem hiding this comment.
Was the class copied from somewhere? If it is from a proven library then fine. If not, we probably should at least add some unit tests. It's also worth thinking whether we can simplify it but still satisfy our performance requirement.
There was a problem hiding this comment.
Added a unit test for work_queue.h, since there is an out-of-box unit test from facebook/zstd. Currently all methods in the class are used except waitUntilFinished, so maybe we could keep the class WorkQueue as it was in zstd repo.
There was a problem hiding this comment.
Do we need to keep it a subclass of BlockBasedTableBuilder, or can we define a separate class for it? It feels that the relationship between the two classes are quite loose.
There was a problem hiding this comment.
ParallelCompressionRep was designed as a helper class for parallel compression in block based table only. ParallelCompressionRep will only be used by BlockBasedTableBuilder, so I made it a private inner class so that it's not visible to other code, including those who used block_based_table_builder.h.
There was a problem hiding this comment.
Can you keep the coding convention? You can run "make format" to reformat it. I tried it and it also work with Ubuntu subsystem on Windows. Let me know if you want me to run it for you and give you a patch for formatting.
There was a problem hiding this comment.
I've checked the make format results in latest commits. Sorry for the inconvenience.
287046f to
0b02f05
Compare
|
ping @siying |
siying
left a comment
There was a problem hiding this comment.
Awesome! I don't have major comments in the main logic. More unit tests might be needed though.
util/work_queue.h
Outdated
There was a problem hiding this comment.
Can you put this information in a code comment? Maybe after line 27.
|
|
||
| // Get blocks from mem-table walking thread, compress them and | ||
| // pass them to the write thread. Used in parallel compression mode only | ||
| void WriteBlocks(CompressionContext& compression_ctx, |
There was a problem hiding this comment.
If my understanding is correct, the function is a function used by compression threads. I think we should try to think about a better name. Right now, it's a little bit hard for me to imaging that it is a long running function that keeps taking work item from a queue and process them until it is signaled to finish. Maybe including something like "thread". Or keeping the convention of some parts of the code, prefix "Bg" in the function name.
There was a problem hiding this comment.
Rename WriteBlocks to BGWorkCompression.
| CompressionType& result_compression_type); | ||
|
|
||
| // Get compressed blocks from WriteBlocks and write them into SST | ||
| void WriteRawBlocks(); |
There was a problem hiding this comment.
Similar to WriteBlocks(). If my understand is correct, this is the function used by the block writing thread. Can you try to think about a better name? Something like including "thread" in the function name or something like that.
There was a problem hiding this comment.
Rename WriteRawBlocks to BGWorkWriteRawBlock. Get rid of "s" suffix to keep aligned with original WriteRawBlock.
| @@ -0,0 +1,254 @@ | |||
| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. | |||
There was a problem hiding this comment.
I only see a unit test for work queue, but I didn't see unit test that builds SST files with parallel compression. Maybe I missed something. Also it is preferred to have at least one unit test that covers the whole flow: from setting the option, generating SST files, and read them back and check the values are correct.
There was a problem hiding this comment.
Sorry I missed unit tests in former versions.
I leveraged existing RandomizedHarnessTest in table/table_test.cc as basic unit tests. They now also check cases where parallel compression is enabled.
I also enabled DBBasicTestWithParallelIO in db/db_basic_test.cc and DBTest2::CompressionOptions in db/db_test2.cc to check parallel compression cases.
These tests should all cover the whole flow.
00eb742 to
b321bb9
Compare
siying
left a comment
There was a problem hiding this comment.
Sorry I gave it another pass and have more comments. I should have been more careful in the first round of reviews.
Again thank you for working on it and I believe it is very cool project.
There was a problem hiding this comment.
Here is a problem: without parallel compression, inside this function call, rep_.data_begin_offset is updated, so that in BlockBasedTableBuilder::Add() we can determine the size of the file reaches limit so the file can terminate. But now, we don't know it until the background threads finished compressing the blocks.
I don't know a good way to solve the problem. Can we estimate the size and terminate the file in BlockBasedTableBuilder::Add()?
Also, can compression queue can be pointer that points to the object in the pool instead?
There was a problem hiding this comment.
Here is a problem: without parallel compression, inside this function call, rep_.data_begin_offset is updated, so that in BlockBasedTableBuilder::Add() we can determine the size of the file reaches limit so the file can terminate. But now, we don't know it until the background threads finished compressing the blocks.
I don't know a good way to solve the problem. Can we estimate the size and terminate the file in BlockBasedTableBuilder::Add()?
rep_.data_begin_offset will only be increased in kBuffered state, where parallel compression code path is not involved. I think that variable tracks raw size instead of compressed size, in a synchronized and single-threaded way. Only when the state is changed to kUnbuffered, code related to parallel compression is executed. As a result, code related to rep_.data_begin_offset should work well with parallel compression enabled.
There is such a problem in ProcessKeyValueCompaction in compaction_job.cc though, where current_output_file_size is updated by builder's FileSize(). However, maximum blocks inflight is bounded by number of compression threads (I'll explain that in another comment). As a result, the file size with parallel compression is bounded by original_file_size + compressed_block_size * number_of_compression_threads. I think it's acceptable in most cases.
Also, can compression queue can be pointer that points to the object in the pool instead?
I make all WorkQueue's point to pointers in the pool. Now block_rep_pool_, compress_queue_ and write_queue_ contain references (pointers) to data in block_rep_buf_.
There was a problem hiding this comment.
Add SST size estimation based on historical_compression_ratio * bytes_under_compression.
There was a problem hiding this comment.
We follow Google C++ Style and class member's naming convention is keys_ptr_: https://google.github.io/styleguide/cppguide.html#Variable_Names
util/work_queue.h
Outdated
There was a problem hiding this comment.
Maybe I missed something but I didn't see maxSzie_ ever set in the queues used. Should we set something? Because the writer thread can be significantly faster than the background compression thread, without a limit to the queue size, we can end up with unlimited raw block in the queue, which consumes memory and make it harder to estimate file size, but does not help with anything.
There was a problem hiding this comment.
The maximum size of queue, i.e. inflight blocks, was implicitly ensured because we have a determined number of BlockRep's, which is equal to number of compression threads. Each time we want to emit a block to compression, we have to fetch a BlockRep from its pool. I make this more explicit by adding setMaxSize calls in ParallelCompressionRep constructor.
There was a problem hiding this comment.
Change setMaxSize to initialization in initialization list to keep the same convention as BlockBasedTableBuilder::Rep.
There was a problem hiding this comment.
Is it possible to make it std::unique_ptr so that we don't have to do the cleaning up with delete?
There was a problem hiding this comment.
Can you explain why the data structure is a WorkQueue and not a normal vector or deque?
There was a problem hiding this comment.
The BlockRep pool will be pushed by writer thread and popped by block building thread concurrently. This is to reuse memory and keep a determined number of inflight compression payloads. As a result, the pool has to be thread-safe. More comments are added around BlockRep definition for this.
There was a problem hiding this comment.
I don't think we need ptr in those variable names. It's clear they are pointers because their types are pointers.
There was a problem hiding this comment.
Rather than delete them, can we make them unique_ptr?
There was a problem hiding this comment.
I'm a little bit confused here. We moved block_rep.slot_ptr, but we still seem to continue using and cleaning up this pointer. It seems to be contradicting. Do we need std::move() here?
There was a problem hiding this comment.
WorkQueue's push method only accepts r-value by design. We have to use std::move to wrap the pointer, but the data referenced by pointer is not moved, only pointer value, i.e. a scalar, is "moved". Actually, the pointer value is just copied into the queue.
slot_ptr is now made a unique_ptr, and unique_ptr::get() returns a r-value itself, so std::move is not necessary. But std::move for block_rep's is still needed, because it's a l-value.
There was a problem hiding this comment.
This std::move() is confusing to me too. I'm not sure about the behavior we want for block_rep after the move.
My understanding is that, we want to reuse those allocated memory for keys, strings, first_key_in_next_block_ptr, etc. If that is the case, can we be more explicit here? If block_rep_pool keeps holding the ownership to all those objects pointed by those pointers, I don't think we should do std::move() here.
Either way, please add comments somewhere to explain the ownership for those objects.
There was a problem hiding this comment.
When block_rep is a struct, std::move will only move its value (including pointers), but not the data it references. Now block_rep is a pointer, std::move will only move its value as well. I've added comments for object ownership around BlockRep definition and WorkQueue variable definitions.
There was a problem hiding this comment.
Moving a pointer is confusing too. My vote would be to move away from this move.
There was a problem hiding this comment.
Modified WorkQueue design so it copies elements instead of moves them. As long as we avoid passing large elements directly to WorkQueue in future, this should be fine. We can always pass large objects by pointers.
This behavior is more similar to STL queue, and maybe less misleading.
|
@siying Sorry I'm busy writing my master thesis these days. I'll look into your comments by the end of this week. |
|
Hi @siying , sorry for late reply. I've fixed the code according your comments. Besides, another unit test for parallel compression is added in DBBasicTestWithTimestampCompressionSettings test in db/db_with_timestamp_basic_test.cc. PTAL, thanks! |
include/rocksdb/advanced_options.h
Outdated
There was a problem hiding this comment.
More demostrations about SST file inflation added.
4f8d702 to
7e390de
Compare
|
Hi @siying , Again thanks for your comments! Several updates since last push:
PTAL. Thanks! |
siying
left a comment
There was a problem hiding this comment.
Thank you for making the change. I don't have major comments anymore.
Please update summary of the pull request to be clearer. Consider to remove "Add Feature -" from the PR title to be more concise. Also add an entry HISTORY.md to explain this new feature.
db/db_test2.cc
Outdated
There was a problem hiding this comment.
Ideally we need a unit test that validates the data in the database. That is the keys are expected and not lost during the compaction process.
There was a problem hiding this comment.
Added consistency check between data written and data in the database. This should benefit original DBTest2::CompressionOptions test as well.
include/rocksdb/advanced_options.h
Outdated
There was a problem hiding this comment.
This is a public header so ideally mentioning of internal function like BlockBasedTableBuilder::EstimatedFileSize() is not recommended. Imagine the readers of header files under include/rocksdb/ are RocksDB users who don't read source code. I think the mentioning of internal functions here can be removed.
There was a problem hiding this comment.
Rewrote the option demonstration without code details.
There was a problem hiding this comment.
Moving a pointer is confusing too. My vote would be to move away from this move.
There was a problem hiding this comment.
We follow Google C++ Style, which bans C style casting: https://google.github.io/styleguide/cppguide.html#Casting try to use a C++ style casting instead.
There was a problem hiding this comment.
We follow Google C++ Style, which bans C style casting: https://google.github.io/styleguide/cppguide.html#Casting try to use a C++ style casting instead.
|
Hi @siying , I've finished addressing latest comments, main changes include:
PTAL. Thanks! |
facebook-github-bot
left a comment
There was a problem hiding this comment.
@siying has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.
facebook-github-bot
left a comment
There was a problem hiding this comment.
@siying has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.
siying
left a comment
There was a problem hiding this comment.
I think it mostly looks good to me. Sorry that I have a late comment.
We also need to add an option to to the stress test tool. It can be done in a follow up pull request but needs to be done. Start with adding an option in db_stress to cover it. You can start with looking at how DB options are set up in db_stress_tool/db_stress_test_base.cc and add an option for parallel level. After doing that, set it up by a chance in tools/db_crashtest.py.
Thanks again for working on such a complicated feature. We are really close to land it.
There was a problem hiding this comment.
I think if r->status is OK, we should exit the loop and avoid calling WriteRawBlock() again. I believe inside WriteRawBlock() we only assert status OK and in release mode we would just override the status, which can cause problem. I think it's safer to just exit the loop if r->status.
Ideally the failure case is tested.
Sorry for this late comment. I should have noticed it earlier. But this is serious and we have to fix it before commit the feature.
There was a problem hiding this comment.
Sure. Did you mean exiting the loop when r->status is NOT OK?
There was a problem hiding this comment.
Also, there seems to be a data race between the main thread and the block writer thread with rep_->status().
Those variables added to estimate file size might also have a data race.
There was a problem hiding this comment.
For variables to estimate file size,
raw_bytes_submitted will only be updated and accessed in block building thread, should be safe;
raw_bytes_compressed and curr_compression_ratio will be updated in writer thread, and accessed in block building thread. Their updates should be single-threaded. There might be the case where r->offset is already updated, but raw_bytes_compressed and curr_compression_ratio are not updated yet, causing an estimation error of 1-compressed-block-size. Shall we make these updates atomic, or you think it's acceptable?
There was a problem hiding this comment.
I noticed that, in orignal BlockBasedTableBuilder, when compression is aborted, rep_->status is set to Status::Corruption, but WriteRawBlock is still called. Is WriteRawBlock meant to write uncompressed data here, or we should return before WriteRawBlock is called?
There was a problem hiding this comment.
Regarding the data race for variables related to file size estimation, making the variables used by BlockBasedTableBuilder::EstimatedFileSize atomic should be good enough to me.
The compression validation failure seems to be a bug. We don't have to fix the bug with non-parallel case. But if it is fixed in parallel writer case, it is great.
There was a problem hiding this comment.
Variables used by BlockBasedTableBuilder::EstimatedFileSize is all protected by estimation_mutex. This should lead to more accurate (and predictable) estimation then separate atomic variables.
I've added if (!ok()) check for compression in both parallel and non-parallel cases. A fake faulting compressor is needed in future for thorough testing though.
|
@cp5555 has updated the pull request. Re-import the pull request |
|
Hi @siying , Again thanks for your continuous efforts on code review. It has been really helpful because I'm not quite familiar with RocksDB design principles, and I've learned a lot during the code revision. For problems about error checking and data racing, here comes the updates:
PTAL. Thanks! Ziyue |
Summary: facebook#6262 causes CLANG analyze to complain. Add assertion to suppress the warning. Test Plan: Run "clang analyze" and make sure it passes.
|
@yzygitzh when I re-read some of the code and have some ideas to improve the new code added. Would you have time to address them as a follow up? A general suggestion would be to separate those logic of generate the objects into queues into separate functions. If possible, encapsulate some logic as part of member functions of the classes. Ideally, the logic of deserializing and serializiing data from queue items should not be mixed together with the logic of processing them. Another suggestion is that, the logic of estimating file offset can be separate out as different functions or even classes. We can discuss through messenger on this. Thanks again for making the contribution. |
Summary: `HarnessTest` in `table_test.cc` currently tests many parameter combinations sequentially using nested loops. This is problematic from a testing perspective, since if the test fails, we have no way of knowing how many/which combinations have failed. It can also cause timeouts on our test system due to the sheer number of combinations tested. (Specifically, the parallel compression threads parameter added by facebook#6262 seems to have been the last straw.) The patch turns `HarnessTest` into a parameterized test, so the various parameter combinations can be tested separately and potentially concurrently. It also cleans up the tests a little, fixes `RandomizedLongDB`, which did not get updated when the parallel compression threads parameter was added, and turns `FooterTests` into a standalone test case (since it does not actually need a fixture class). Test Plan: `make check`
Summary: `HarnessTest` in `table_test.cc` currently tests many parameter combinations sequentially in a loop. This is problematic from a testing perspective, since if the test fails, we have no way of knowing how many/which combinations have failed. It can also cause timeouts on our test system due to the sheer number of combinations tested. (Specifically, the parallel compression threads parameter added by #6262 seems to have been the last straw.) There is some DIY code there that splits the load among eight test cases but that does not appear to be sufficient anymore. Instead, the patch turns `HarnessTest` into a parameterized test, so all the parameter combinations can be tested separately and potentially concurrently. It also cleans up the tests a little, fixes `RandomizedLongDB`, which did not get updated when the parallel compression threads parameter was added, and turns `FooterTests` into a standalone test case (since it does not actually need a fixture class). Pull Request resolved: #6974 Test Plan: `make check` Reviewed By: siying Differential Revision: D22029572 Pulled By: ltamasi fbshipit-source-id: 51baea670771c33928f2eb3902bd69dcf540aa41
This PR adds support for pipelined & parallel compression optimization for
BlockBasedTableBuilder. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can setCompressionOptions::parallel_threadsgreater than 1 to enable compression parallelism.