Skip to content

Single Big Object Parallel Transfer.#1827

Merged
pcmoritz merged 27 commits intoray-project:masterfrom
elibol:melih/bigobject
Apr 15, 2018
Merged

Single Big Object Parallel Transfer.#1827
pcmoritz merged 27 commits intoray-project:masterfrom
elibol:melih/bigobject

Conversation

@elibol
Copy link
Copy Markdown
Contributor

@elibol elibol commented Apr 4, 2018

Enables parallel transfer of a single big object.

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4652/
Test FAILed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4655/
Test PASSed.

create_failure_buffers_;
/// Tracks the number of calls to GetChunk that should fail after the first call
/// fails to obtain the buffer.
std::unordered_map<ray::ObjectID, uint64_t, ray::UniqueIDHasher> get_failed_count_;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why this is necessary. Can we just have each of the GetChunk calls fail/succeed independently of each other?

}

std::shared_ptr<plasma::PlasmaClient> ObjectBufferPool::GetObjectStore() {
if (available_clients.empty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the pool is at the granularity of object buffers instead of plasma clients, can we have a single client that is shared by all of the buffers?

RAY_LOG(DEBUG) << "SealOrAbortBuffer " << object_id << " "
<< buffer_state_[object_id].references;
if (!succeeded) {
buffer_state_[object_id].one_failed = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it now, it might make sense to only abort the chunk that failed, but keep any work that was done on the other chunks.

/// Information needed about each object chunk.
/// This is the structure returned whenever an object chunk is
/// retrieved.
struct ChunkInfo {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work if we just returned a uint8_t * to the data buffer instead of defining this other struct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need buffer length, but I did removed unnecessary fields in this struct.

uint64_t chunk_size_;
/// A vector for each object maintaining information about the chunks which comprise
/// an object. ChunkInfo is used to transfer
std::unordered_map<ray::ObjectID, std::vector<ChunkInfo>, ray::UniqueIDHasher>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, can this be a map to std::vector<uint8_t *>?

object_buffer.metadata_size));
auto *data = const_cast<uint8_t *>(object_buffer.data->data());
uint64_t num_chunks = BuildChunks(object_id, data, data_size, metadata_size);
buffer_state_.emplace(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to assume that if we do a Get on one chunk of an object, we are also going to call Get on every other chunk of the object. This also seems like it will break if we do simultaneous Gets on the same chunk of the same object. A less error-prone way of doing this might be to keep an actual reference count of the number of Get callers of each object. Then, this method would just increment the count, instead of setting the initial count to num_chunks.

std::shared_ptr<plasma::PlasmaClient> client;
/// The number of references that currently rely on this buffer.
/// We expect this many calls to Release or SealOrAbortBuffer.
uint64_t references;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative design that comes to mind is to combine the chunk_info_ and buffer_state_ maps. Also, instead of storing both a std::vector of the chunk information and a references count, how about we just make it a std::list of the chunk information, and the reference count is just the size of the list?

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4692/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4755/
Test FAILed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4764/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4771/
Test PASSed.

Copy link
Copy Markdown
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only partway through.

/// Mutex for thread-safe operations.
std::mutex pool_mutex_;
/// Determines the maximum chunk size to be transferred by a single thread.
uint64_t chunk_size_;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this const.

/// This is the structure returned whenever an object chunk is
/// retrieved.
struct ChunkInfo {
ChunkInfo() {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever called? Remove if not.


/// \param chunk_index The chunk index for which to obtain the buffer length.
/// \param data_size The size of the object + metadata.
/// \return The number of chunks into which the object will be split.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this documentation. Also, generally it's nice to start with one line that describes the method's purpose, before the params.

std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> CreateChunk(const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index);

ray::Status ReleaseCreateChunk(const ObjectID &object_id, uint64_t chunk_index);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document. Also, given its usage in the object manager, AbortCreateChunk might be a more appropriate name.


ray::Status ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk_index) {
std::lock_guard<std::mutex> lock(pool_mutex_);
create_buffer_state_[object_id].chunk_references[chunk_index]--;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can decrement the chunk_references here. In an example with two threads that both create the first chunk of an object of two chunks:

T1 creates chunk 1
T1 seals chunk 1
T2 creates chunk 1
T2 seals chunk 1 --> object is sealed in the object store, but only one chunk was created

It would be really nice if we had unit tests for these kinds of cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

ray::Status SealChunk(const ObjectID &object_id, uint64_t chunk_index);

/// Abort the create operation associated with an object.
ray::Status AbortCreate(const ObjectID &object_id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it's only called in the destructor. Should this be private? Also, please be clear in the documentation that this aborts all chunks of the object, including outstanding ones.

/// an object.
std::vector<ChunkInfo> chunk_info;
/// Reference counts for each chunk.
std::vector<uint64_t> chunk_references;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this field necessary? It's only incremented and decremented, never read.

const uint64_t chunk_index) {
std::lock_guard<std::mutex> lock(pool_mutex_);
create_buffer_state_[object_id].chunk_references[chunk_index]--;
// Make sure ReleaseCreateChunk OR SealChunk is called.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to AbortObject here if chunk_references is equal to 0 at every chunk_index and num_chunks_remaining == num_chunks, since this is back to the initial state right before the first CreateChunk.

}

std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(const ObjectID &object_id, uint8_t *data,
uint64_t data_size, uint64_t metadata_size) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata_size doesn't seem to be used in this method.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4772/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4778/
Test FAILed.

Copy link
Copy Markdown
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good for the most part, mostly requested some cleanups.

/// state, including create operations in progress for all chunks of the object.
///
/// \param object_id The ObjectID.
/// \return The status of invoking this method.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For methods that return ray::Status, we should be clear about when/how they might fail. For this method and some others on this class, it looks like the only time they should fail is if there's a bug, which we can catch with RAY_CHECK calls. I would make any of these methods return void instead.


/// Holds the state of a get buffer.
struct GetBufferState {
GetBufferState() {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the empty constructor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compiler won't allow unordered_map with values that don't have a default constructor (so non-existent keys can be accessed).


/// Holds the state of a create buffer.
struct CreateBufferState {
CreateBufferState() {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the empty constructor?

};

/// Returned when Get fails.
ChunkInfo errored_chunk_ = {0, nullptr, 0};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this const.

object_directory_(new ObjectDirectory(gcs_client)),
store_notification_(main_service, config.store_socket_name),
store_pool_(config.store_socket_name),
buffer_pool_(config.store_socket_name, config.object_chunk_size, 2*config.max_sends),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining the rationale for 2*config.max_sends. Style guide also recommends adding a /release_delay=/ comment right before the argument value for parameters whose purpose isn't clear. Also, I'm a little surprised linting didn't catch this, but add spaces around the *.

connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::TRANSFER, conn));
return ray::Status::IOError(
"Unable to transfer object to requesting plasma manager, object not local.");
std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> pair = buffer_pool_.GetChunk(object_id, data_size, metadata_size, chunk_index);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a more descriptive variable name than pair.

ObjectBufferPool::ChunkInfo chunk_info = pair.first;

if (!pair.second.ok()) {
// This is the first thread to invoke GetChunk => Get failed on the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this comment.

RAY_LOG(DEBUG) << "ExecuteReceiveObject " << client_id << " " << object_id << " "
<< chunk_index;

std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> pair = buffer_pool_.CreateChunk(object_id, data_size, metadata_size, chunk_index);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a more descriptive variable name than pair.

mutable_vec.resize(buffer_length);
uint8_t *mutable_data = mutable_vec.data();
std::vector<boost::asio::mutable_buffer> buffer;
buffer.push_back(asio::buffer(mutable_data, buffer_length));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asio::buffer also has a constructor that can take in the mutable_vec vector directly. Also, probably should've pointed this out earlier, but is it necessary to have the std::vector<boost::asio::mutable_buffer> buffer? Why not pass in the mutable buffer directly?

/// \param client_id The ClientID to which the object needs to be sent.
/// \param object_id The ObjectID of the object to be sent.
void QueueSend(const ClientID &client_id, const ObjectID &object_id,
void QueueSend(const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the documentation for the params here and in other places in the file.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These have been updated.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4781/
Test PASSed.

Copy link
Copy Markdown
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to merge, once the last changes are fixed and Travis tests pass (make sure to check the linting test on the merged build, not just the build for this branch).

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4785/
Test PASSed.

@concretevitamin
Copy link
Copy Markdown
Contributor

Why check in all the pprof visualizations?

Copy link
Copy Markdown
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch @concretevitamin, please remove everything under src/ray/object_manager/test/profile and any other unnecessary files

}

uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
return static_cast<uint64_t>(ceil(static_cast<double>(data_size) / chunk_size_));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the canonical way to do this is via integer division: (x + y - 1) / y

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4840/
Test FAILed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4841/
Test PASSed.

namespace ray {

ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
const uint64_t chunk_size, const int release_delay)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't make sense for uint64_t and int args to be const does it?

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4848/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4852/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4879/
Test FAILed.

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4886/
Test FAILed.

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4887/
Test FAILed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4905/
Test PASSed.

@pcmoritz pcmoritz merged commit fcd3044 into ray-project:master Apr 15, 2018
// Maximum number of receives allowed.
object_manager_config.max_receives = 2;
// Object chunk size, in bytes.
object_manager_config.object_chunk_size = static_cast<uint64_t>(std::pow(10, 8));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this has already been merged, but constants need to go in ray_config.h.

killall plasma_store || true
$REDIS_DIR/redis-cli -p 6379 shutdown || true
# killall plasma_store || true
# $REDIS_DIR/redis-cli -p 6379 shutdown || true
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we commenting these out?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They'll be removed or uncommented. The tests now start and remove plasma stores by pid.

@robertnishihara
Copy link
Copy Markdown
Collaborator

I see that this has already been merged, but I left a few more comments. Also note that @stephanie-wang had some comments which were not addressed but should be addressed.

Lock context_mutex;

std::mutex send_mutex_;
std::mutex receive_mutex_;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any lock must be documented very clearly. We need to know exactly what it is protecting and why.

std::shared_ptr<TcpClientConnection> conn) {
WriteLock guard(receive_mutex);
ReceiveRequest req = {client_id, object_id, object_size, conn};
std::unique_lock<std::mutex> guard(receive_mutex_);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be using lock_guard instead of unique_lock.

From http://jakascorner.com/blog/2016/02/lock_guard-and-unique_lock.html:

The rule of thumb is to always use std::lock_guard. But if we need some higher level functionalities, which are available by std::unique_lock, then we should use the std::unique_lock.

@elibol
Copy link
Copy Markdown
Contributor Author

elibol commented Apr 15, 2018

@robertnishihara I will address your comments in a separate PR.

elibol added a commit to elibol/ray that referenced this pull request Apr 15, 2018
object manager config bug fix.
addresses other comments from ray-project#1827.
pcmoritz pushed a commit that referenced this pull request Apr 16, 2018
* Move object manager parameters to ray config,
object manager config bug fix.
addresses other comments from #1827.

* linting and uint?

* typos

* remove uint.
royf added a commit to royf/ray that referenced this pull request Apr 22, 2018
* master: (56 commits)
  [xray] Turn on flushing to the GCS for the lineage cache (ray-project#1907)
  Single Big Object Parallel Transfer. (ray-project#1827)
  Remove num_threads as a parameter. (ray-project#1891)
  Adds Valgrind tests for multi-threaded object manager. (ray-project#1890)
  Pin cython version in docker base dependencies file. (ray-project#1898)
  Update arrow to efficiently serialize more types of numpy arrays. (ray-project#1889)
  updates (ray-project#1896)
  [DataFrame] Inherit documentation from Pandas (ray-project#1727)
  Update arrow and parquet-cpp. (ray-project#1875)
  raylet command line resource configuration plumbing (ray-project#1882)
  use raylet for remote ray nodes (ray-project#1880)
  [rllib] Propagate dim option to deepmind wrappers (ray-project#1876)
  [RLLib] DDPG (ray-project#1685)
  Lint Python files with Yapf (ray-project#1872)
  [DataFrame] Fixed repr, info, and memory_usage (ray-project#1874)
  Fix getattr compat (ray-project#1871)
  check if arrow build dir exists (ray-project#1863)
  [DataFrame] Encapsulate index and lengths into separate class (ray-project#1849)
  [DataFrame] Implemented __getattr__ (ray-project#1753)
  Add better analytics to docs (ray-project#1854)
  ...

# Conflicts:
#	python/ray/rllib/__init__.py
#	python/setup.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants