Single Big Object Parallel Transfer.#1827
Conversation
|
Test FAILed. |
|
Test PASSed. |
| create_failure_buffers_; | ||
| /// Tracks the number of calls to GetChunk that should fail after the first call | ||
| /// fails to obtain the buffer. | ||
| std::unordered_map<ray::ObjectID, uint64_t, ray::UniqueIDHasher> get_failed_count_; |
There was a problem hiding this comment.
I don't really understand why this is necessary. Can we just have each of the GetChunk calls fail/succeed independently of each other?
| } | ||
|
|
||
| std::shared_ptr<plasma::PlasmaClient> ObjectBufferPool::GetObjectStore() { | ||
| if (available_clients.empty()) { |
There was a problem hiding this comment.
Now that the pool is at the granularity of object buffers instead of plasma clients, can we have a single client that is shared by all of the buffers?
| RAY_LOG(DEBUG) << "SealOrAbortBuffer " << object_id << " " | ||
| << buffer_state_[object_id].references; | ||
| if (!succeeded) { | ||
| buffer_state_[object_id].one_failed = true; |
There was a problem hiding this comment.
Thinking about it now, it might make sense to only abort the chunk that failed, but keep any work that was done on the other chunks.
| /// Information needed about each object chunk. | ||
| /// This is the structure returned whenever an object chunk is | ||
| /// retrieved. | ||
| struct ChunkInfo { |
There was a problem hiding this comment.
Would it work if we just returned a uint8_t * to the data buffer instead of defining this other struct?
There was a problem hiding this comment.
We still need buffer length, but I did removed unnecessary fields in this struct.
| uint64_t chunk_size_; | ||
| /// A vector for each object maintaining information about the chunks which comprise | ||
| /// an object. ChunkInfo is used to transfer | ||
| std::unordered_map<ray::ObjectID, std::vector<ChunkInfo>, ray::UniqueIDHasher> |
There was a problem hiding this comment.
Similar to above, can this be a map to std::vector<uint8_t *>?
| object_buffer.metadata_size)); | ||
| auto *data = const_cast<uint8_t *>(object_buffer.data->data()); | ||
| uint64_t num_chunks = BuildChunks(object_id, data, data_size, metadata_size); | ||
| buffer_state_.emplace( |
There was a problem hiding this comment.
This seems to assume that if we do a Get on one chunk of an object, we are also going to call Get on every other chunk of the object. This also seems like it will break if we do simultaneous Gets on the same chunk of the same object. A less error-prone way of doing this might be to keep an actual reference count of the number of Get callers of each object. Then, this method would just increment the count, instead of setting the initial count to num_chunks.
| std::shared_ptr<plasma::PlasmaClient> client; | ||
| /// The number of references that currently rely on this buffer. | ||
| /// We expect this many calls to Release or SealOrAbortBuffer. | ||
| uint64_t references; |
There was a problem hiding this comment.
An alternative design that comes to mind is to combine the chunk_info_ and buffer_state_ maps. Also, instead of storing both a std::vector of the chunk information and a references count, how about we just make it a std::list of the chunk information, and the reference count is just the size of the list?
|
Test PASSed. |
|
Test FAILed. |
|
Test PASSed. |
|
Test PASSed. |
stephanie-wang
left a comment
There was a problem hiding this comment.
Only partway through.
| /// Mutex for thread-safe operations. | ||
| std::mutex pool_mutex_; | ||
| /// Determines the maximum chunk size to be transferred by a single thread. | ||
| uint64_t chunk_size_; |
| /// This is the structure returned whenever an object chunk is | ||
| /// retrieved. | ||
| struct ChunkInfo { | ||
| ChunkInfo() {} |
There was a problem hiding this comment.
Is this ever called? Remove if not.
|
|
||
| /// \param chunk_index The chunk index for which to obtain the buffer length. | ||
| /// \param data_size The size of the object + metadata. | ||
| /// \return The number of chunks into which the object will be split. |
There was a problem hiding this comment.
Update this documentation. Also, generally it's nice to start with one line that describes the method's purpose, before the params.
| std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> CreateChunk(const ObjectID &object_id, uint64_t data_size, | ||
| uint64_t metadata_size, uint64_t chunk_index); | ||
|
|
||
| ray::Status ReleaseCreateChunk(const ObjectID &object_id, uint64_t chunk_index); |
There was a problem hiding this comment.
Document. Also, given its usage in the object manager, AbortCreateChunk might be a more appropriate name.
|
|
||
| ray::Status ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk_index) { | ||
| std::lock_guard<std::mutex> lock(pool_mutex_); | ||
| create_buffer_state_[object_id].chunk_references[chunk_index]--; |
There was a problem hiding this comment.
I don't think we can decrement the chunk_references here. In an example with two threads that both create the first chunk of an object of two chunks:
T1 creates chunk 1
T1 seals chunk 1
T2 creates chunk 1
T2 seals chunk 1 --> object is sealed in the object store, but only one chunk was created
It would be really nice if we had unit tests for these kinds of cases.
| ray::Status SealChunk(const ObjectID &object_id, uint64_t chunk_index); | ||
|
|
||
| /// Abort the create operation associated with an object. | ||
| ray::Status AbortCreate(const ObjectID &object_id); |
There was a problem hiding this comment.
This looks like it's only called in the destructor. Should this be private? Also, please be clear in the documentation that this aborts all chunks of the object, including outstanding ones.
| /// an object. | ||
| std::vector<ChunkInfo> chunk_info; | ||
| /// Reference counts for each chunk. | ||
| std::vector<uint64_t> chunk_references; |
There was a problem hiding this comment.
Is this field necessary? It's only incremented and decremented, never read.
| const uint64_t chunk_index) { | ||
| std::lock_guard<std::mutex> lock(pool_mutex_); | ||
| create_buffer_state_[object_id].chunk_references[chunk_index]--; | ||
| // Make sure ReleaseCreateChunk OR SealChunk is called. |
There was a problem hiding this comment.
Might want to AbortObject here if chunk_references is equal to 0 at every chunk_index and num_chunks_remaining == num_chunks, since this is back to the initial state right before the first CreateChunk.
| } | ||
|
|
||
| std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(const ObjectID &object_id, uint8_t *data, | ||
| uint64_t data_size, uint64_t metadata_size) { |
There was a problem hiding this comment.
metadata_size doesn't seem to be used in this method.
|
Test PASSed. |
|
Test FAILed. |
stephanie-wang
left a comment
There was a problem hiding this comment.
Looks good for the most part, mostly requested some cleanups.
| /// state, including create operations in progress for all chunks of the object. | ||
| /// | ||
| /// \param object_id The ObjectID. | ||
| /// \return The status of invoking this method. |
There was a problem hiding this comment.
For methods that return ray::Status, we should be clear about when/how they might fail. For this method and some others on this class, it looks like the only time they should fail is if there's a bug, which we can catch with RAY_CHECK calls. I would make any of these methods return void instead.
|
|
||
| /// Holds the state of a get buffer. | ||
| struct GetBufferState { | ||
| GetBufferState() {} |
There was a problem hiding this comment.
Why the empty constructor?
There was a problem hiding this comment.
The compiler won't allow unordered_map with values that don't have a default constructor (so non-existent keys can be accessed).
|
|
||
| /// Holds the state of a create buffer. | ||
| struct CreateBufferState { | ||
| CreateBufferState() {} |
There was a problem hiding this comment.
Why the empty constructor?
| }; | ||
|
|
||
| /// Returned when Get fails. | ||
| ChunkInfo errored_chunk_ = {0, nullptr, 0}; |
| object_directory_(new ObjectDirectory(gcs_client)), | ||
| store_notification_(main_service, config.store_socket_name), | ||
| store_pool_(config.store_socket_name), | ||
| buffer_pool_(config.store_socket_name, config.object_chunk_size, 2*config.max_sends), |
There was a problem hiding this comment.
Add a comment explaining the rationale for 2*config.max_sends. Style guide also recommends adding a /release_delay=/ comment right before the argument value for parameters whose purpose isn't clear. Also, I'm a little surprised linting didn't catch this, but add spaces around the *.
| connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::TRANSFER, conn)); | ||
| return ray::Status::IOError( | ||
| "Unable to transfer object to requesting plasma manager, object not local."); | ||
| std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> pair = buffer_pool_.GetChunk(object_id, data_size, metadata_size, chunk_index); |
There was a problem hiding this comment.
Please use a more descriptive variable name than pair.
| ObjectBufferPool::ChunkInfo chunk_info = pair.first; | ||
|
|
||
| if (!pair.second.ok()) { | ||
| // This is the first thread to invoke GetChunk => Get failed on the |
There was a problem hiding this comment.
Update this comment.
| RAY_LOG(DEBUG) << "ExecuteReceiveObject " << client_id << " " << object_id << " " | ||
| << chunk_index; | ||
|
|
||
| std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> pair = buffer_pool_.CreateChunk(object_id, data_size, metadata_size, chunk_index); |
There was a problem hiding this comment.
Please use a more descriptive variable name than pair.
| mutable_vec.resize(buffer_length); | ||
| uint8_t *mutable_data = mutable_vec.data(); | ||
| std::vector<boost::asio::mutable_buffer> buffer; | ||
| buffer.push_back(asio::buffer(mutable_data, buffer_length)); |
There was a problem hiding this comment.
asio::buffer also has a constructor that can take in the mutable_vec vector directly. Also, probably should've pointed this out earlier, but is it necessary to have the std::vector<boost::asio::mutable_buffer> buffer? Why not pass in the mutable buffer directly?
| /// \param client_id The ClientID to which the object needs to be sent. | ||
| /// \param object_id The ObjectID of the object to be sent. | ||
| void QueueSend(const ClientID &client_id, const ObjectID &object_id, | ||
| void QueueSend(const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, |
There was a problem hiding this comment.
Update the documentation for the params here and in other places in the file.
There was a problem hiding this comment.
These have been updated.
|
Test PASSed. |
stephanie-wang
left a comment
There was a problem hiding this comment.
Looks good to merge, once the last changes are fixed and Travis tests pass (make sure to check the linting test on the merged build, not just the build for this branch).
|
Test PASSed. |
|
Why check in all the pprof visualizations? |
robertnishihara
left a comment
There was a problem hiding this comment.
ah good catch @concretevitamin, please remove everything under src/ray/object_manager/test/profile and any other unnecessary files
| } | ||
|
|
||
| uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) { | ||
| return static_cast<uint64_t>(ceil(static_cast<double>(data_size) / chunk_size_)); |
There was a problem hiding this comment.
the canonical way to do this is via integer division: (x + y - 1) / y
|
Test FAILed. |
|
Test PASSed. |
This reverts commit 4af43b1.
| namespace ray { | ||
|
|
||
| ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name, | ||
| const uint64_t chunk_size, const int release_delay) |
There was a problem hiding this comment.
it doesn't make sense for uint64_t and int args to be const does it?
|
Test PASSed. |
|
Test PASSed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test PASSed. |
| // Maximum number of receives allowed. | ||
| object_manager_config.max_receives = 2; | ||
| // Object chunk size, in bytes. | ||
| object_manager_config.object_chunk_size = static_cast<uint64_t>(std::pow(10, 8)); |
There was a problem hiding this comment.
I realize this has already been merged, but constants need to go in ray_config.h.
| killall plasma_store || true | ||
| $REDIS_DIR/redis-cli -p 6379 shutdown || true | ||
| # killall plasma_store || true | ||
| # $REDIS_DIR/redis-cli -p 6379 shutdown || true |
There was a problem hiding this comment.
Why are we commenting these out?
There was a problem hiding this comment.
They'll be removed or uncommented. The tests now start and remove plasma stores by pid.
|
I see that this has already been merged, but I left a few more comments. Also note that @stephanie-wang had some comments which were not addressed but should be addressed. |
| Lock context_mutex; | ||
|
|
||
| std::mutex send_mutex_; | ||
| std::mutex receive_mutex_; |
There was a problem hiding this comment.
Any lock must be documented very clearly. We need to know exactly what it is protecting and why.
| std::shared_ptr<TcpClientConnection> conn) { | ||
| WriteLock guard(receive_mutex); | ||
| ReceiveRequest req = {client_id, object_id, object_size, conn}; | ||
| std::unique_lock<std::mutex> guard(receive_mutex_); |
There was a problem hiding this comment.
We should be using lock_guard instead of unique_lock.
From http://jakascorner.com/blog/2016/02/lock_guard-and-unique_lock.html:
The rule of thumb is to always use std::lock_guard. But if we need some higher level functionalities, which are available by std::unique_lock, then we should use the std::unique_lock.
|
@robertnishihara I will address your comments in a separate PR. |
object manager config bug fix. addresses other comments from ray-project#1827.
* Move object manager parameters to ray config, object manager config bug fix. addresses other comments from #1827. * linting and uint? * typos * remove uint.
* master: (56 commits) [xray] Turn on flushing to the GCS for the lineage cache (ray-project#1907) Single Big Object Parallel Transfer. (ray-project#1827) Remove num_threads as a parameter. (ray-project#1891) Adds Valgrind tests for multi-threaded object manager. (ray-project#1890) Pin cython version in docker base dependencies file. (ray-project#1898) Update arrow to efficiently serialize more types of numpy arrays. (ray-project#1889) updates (ray-project#1896) [DataFrame] Inherit documentation from Pandas (ray-project#1727) Update arrow and parquet-cpp. (ray-project#1875) raylet command line resource configuration plumbing (ray-project#1882) use raylet for remote ray nodes (ray-project#1880) [rllib] Propagate dim option to deepmind wrappers (ray-project#1876) [RLLib] DDPG (ray-project#1685) Lint Python files with Yapf (ray-project#1872) [DataFrame] Fixed repr, info, and memory_usage (ray-project#1874) Fix getattr compat (ray-project#1871) check if arrow build dir exists (ray-project#1863) [DataFrame] Encapsulate index and lengths into separate class (ray-project#1849) [DataFrame] Implemented __getattr__ (ray-project#1753) Add better analytics to docs (ray-project#1854) ... # Conflicts: # python/ray/rllib/__init__.py # python/setup.py
Enables parallel transfer of a single big object.