Skip to content

Commit c82ad93

Browse files
zhijunfupcmoritz
authored andcommitted
change mutex to recursive_mutex
1 parent ed7392c commit c82ad93

1 file changed

Lines changed: 23 additions & 23 deletions

File tree

cpp/src/plasma/client.cc

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
302302
/// A hash set to record the ids that users want to delete but still in use.
303303
std::unordered_set<ObjectID> deletion_cache_;
304304
/// A mutex which protects this class.
305-
std::mutex client_mutex_;
305+
std::recursive_mutex client_mutex_;
306306

307307
#ifdef PLASMA_CUDA
308308
/// Cuda Device Manager.
@@ -343,7 +343,7 @@ uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
343343
}
344344

345345
bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
346-
std::lock_guard<std::mutex> guard(client_mutex_);
346+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
347347

348348
const auto elem = objects_in_use_.find(object_id);
349349
return (elem != objects_in_use_.end());
@@ -388,7 +388,7 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
388388
Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
389389
const uint8_t* metadata, int64_t metadata_size,
390390
std::shared_ptr<Buffer>* data, int device_num) {
391-
std::lock_guard<std::mutex> guard(client_mutex_);
391+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
392392

393393
ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
394394
<< data_size << " and metadata size " << metadata_size;
@@ -457,7 +457,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
457457
Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
458458
const std::string& data,
459459
const std::string& metadata) {
460-
std::lock_guard<std::mutex> guard(client_mutex_);
460+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
461461

462462
ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
463463
// Compute the object hash.
@@ -510,7 +510,7 @@ Status PlasmaClient::Impl::GetBuffers(
510510
data + object->data_offset, object->data_size + object->metadata_size);
511511
} else {
512512
#ifdef PLASMA_CUDA
513-
std::lock_guard<std::mutex> lock(gpu_mutex);
513+
std::lock_guard<std::recursive_mutex> lock(gpu_mutex);
514514
auto iter = gpu_object_map.find(object_ids[i]);
515515
ARROW_CHECK(iter != gpu_object_map.end());
516516
iter->second->client_count++;
@@ -576,7 +576,7 @@ Status PlasmaClient::Impl::GetBuffers(
576576
data + object->data_offset, object->data_size + object->metadata_size);
577577
} else {
578578
#ifdef PLASMA_CUDA
579-
std::lock_guard<std::mutex> lock(gpu_mutex);
579+
std::lock_guard<std::recursive_mutex> lock(gpu_mutex);
580580
auto handle = gpu_object_map.find(object_ids[i]);
581581
if (handle == gpu_object_map.end()) {
582582
std::shared_ptr<CudaContext> context;
@@ -615,7 +615,7 @@ Status PlasmaClient::Impl::GetBuffers(
615615

616616
Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
617617
int64_t timeout_ms, std::vector<ObjectBuffer>* out) {
618-
std::lock_guard<std::mutex> guard(client_mutex_);
618+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
619619

620620
const auto wrap_buffer = [=](const ObjectID& object_id,
621621
const std::shared_ptr<Buffer>& buffer) {
@@ -628,7 +628,7 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
628628

629629
Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects,
630630
int64_t timeout_ms, ObjectBuffer* out) {
631-
std::lock_guard<std::mutex> guard(client_mutex_);
631+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
632632

633633
const auto wrap_buffer = [](const ObjectID& object_id,
634634
const std::shared_ptr<Buffer>& buffer) { return buffer; };
@@ -646,7 +646,7 @@ Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
646646
}
647647

648648
Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
649-
std::lock_guard<std::mutex> guard(client_mutex_);
649+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
650650

651651
// If the client is already disconnected, ignore release requests.
652652
if (store_conn_ < 0) {
@@ -657,7 +657,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
657657

658658
#ifdef PLASMA_CUDA
659659
if (object_entry->second->object.device_num != 0) {
660-
std::lock_guard<std::mutex> lock(gpu_mutex);
660+
std::lock_guard<std::recursive_mutex> lock(gpu_mutex);
661661
auto iter = gpu_object_map.find(object_id);
662662
ARROW_CHECK(iter != gpu_object_map.end());
663663
if (--iter->second->client_count == 0) {
@@ -685,7 +685,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
685685

686686
// This method is used to query whether the plasma store contains an object.
687687
Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) {
688-
std::lock_guard<std::mutex> guard(client_mutex_);
688+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
689689

690690
// Check if we already have a reference to the object.
691691
if (objects_in_use_.count(object_id) > 0) {
@@ -705,7 +705,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
705705
}
706706

707707
Status PlasmaClient::Impl::List(ObjectTable* objects) {
708-
std::lock_guard<std::mutex> guard(client_mutex_);
708+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
709709
RETURN_NOT_OK(SendListRequest(store_conn_));
710710
std::vector<uint8_t> buffer;
711711
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer));
@@ -784,7 +784,7 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t data
784784
}
785785

786786
Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
787-
std::lock_guard<std::mutex> guard(client_mutex_);
787+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
788788

789789
// Make sure this client has a reference to the object before sending the
790790
// request to Plasma.
@@ -812,7 +812,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
812812
}
813813

814814
Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
815-
std::lock_guard<std::mutex> guard(client_mutex_);
815+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
816816
auto object_entry = objects_in_use_.find(object_id);
817817
ARROW_CHECK(object_entry != objects_in_use_.end())
818818
<< "Plasma client called abort on an object without a reference to it";
@@ -828,7 +828,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
828828

829829
#ifdef PLASMA_CUDA
830830
if (object_entry->second->object.device_num != 0) {
831-
std::lock_guard<std::mutex> lock(gpu_mutex);
831+
std::lock_guard<std::recursive_mutex> lock(gpu_mutex);
832832
auto iter = gpu_object_map.find(object_id);
833833
ARROW_CHECK(iter != gpu_object_map.end());
834834
ARROW_CHECK(iter->second->client_count == 1);
@@ -851,7 +851,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
851851
}
852852

853853
Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
854-
std::lock_guard<std::mutex> guard(client_mutex_);
854+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
855855

856856
std::vector<ObjectID> not_in_use_ids;
857857
for (auto& object_id : object_ids) {
@@ -876,7 +876,7 @@ Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
876876
}
877877

878878
Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
879-
std::lock_guard<std::mutex> guard(client_mutex_);
879+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
880880

881881
// Send a request to the store to evict objects.
882882
RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes));
@@ -888,7 +888,7 @@ Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
888888
}
889889

890890
Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
891-
std::lock_guard<std::mutex> guard(client_mutex_);
891+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
892892

893893
// Get the plasma object data. We pass in a timeout of 0 to indicate that
894894
// the operation should timeout immediately.
@@ -905,7 +905,7 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
905905
}
906906

907907
Status PlasmaClient::Impl::Subscribe(int* fd) {
908-
std::lock_guard<std::mutex> guard(client_mutex_);
908+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
909909

910910
int sock[2];
911911
// Create a non-blocking socket pair. This will only be used to send
@@ -929,7 +929,7 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
929929
Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
930930
int64_t* data_size,
931931
int64_t* metadata_size) {
932-
std::lock_guard<std::mutex> guard(client_mutex_);
932+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
933933

934934
auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(buffer);
935935
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
@@ -946,7 +946,7 @@ Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* o
946946

947947
Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
948948
int64_t* data_size, int64_t* metadata_size) {
949-
std::lock_guard<std::mutex> guard(client_mutex_);
949+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
950950

951951
auto notification = ReadMessageAsync(fd);
952952
if (notification == NULL) {
@@ -958,7 +958,7 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
958958
Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
959959
const std::string& manager_socket_name,
960960
int release_delay, int num_retries) {
961-
std::lock_guard<std::mutex> guard(client_mutex_);
961+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
962962

963963
RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
964964
if (manager_socket_name != "") {
@@ -977,7 +977,7 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
977977
}
978978

979979
Status PlasmaClient::Impl::Disconnect() {
980-
std::lock_guard<std::mutex> guard(client_mutex_);
980+
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
981981

982982
// NOTE: We purposefully do not finish sending release calls for objects in
983983
// use, so that we don't duplicate PlasmaClient::Release calls (when handling

0 commit comments

Comments
 (0)