@@ -302,7 +302,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
302302 // / A hash set to record the ids that users want to delete but still in use.
303303 std::unordered_set<ObjectID> deletion_cache_;
304304 // / A mutex which protects this class.
305- std::mutex client_mutex_;
305+ std::recursive_mutex client_mutex_;
306306
307307#ifdef PLASMA_CUDA
308308 // / Cuda Device Manager.
@@ -343,7 +343,7 @@ uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
343343}
344344
345345bool PlasmaClient::Impl::IsInUse (const ObjectID& object_id) {
346- std::lock_guard<std::mutex > guard (client_mutex_);
346+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
347347
348348 const auto elem = objects_in_use_.find (object_id);
349349 return (elem != objects_in_use_.end ());
@@ -388,7 +388,7 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
388388Status PlasmaClient::Impl::Create (const ObjectID& object_id, int64_t data_size,
389389 const uint8_t * metadata, int64_t metadata_size,
390390 std::shared_ptr<Buffer>* data, int device_num) {
391- std::lock_guard<std::mutex > guard (client_mutex_);
391+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
392392
393393 ARROW_LOG (DEBUG) << " called plasma_create on conn " << store_conn_ << " with size "
394394 << data_size << " and metadata size " << metadata_size;
@@ -457,7 +457,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
457457Status PlasmaClient::Impl::CreateAndSeal (const ObjectID& object_id,
458458 const std::string& data,
459459 const std::string& metadata) {
460- std::lock_guard<std::mutex > guard (client_mutex_);
460+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
461461
462462 ARROW_LOG (DEBUG) << " called CreateAndSeal on conn " << store_conn_;
463463 // Compute the object hash.
@@ -510,7 +510,7 @@ Status PlasmaClient::Impl::GetBuffers(
510510 data + object->data_offset , object->data_size + object->metadata_size );
511511 } else {
512512#ifdef PLASMA_CUDA
513- std::lock_guard<std::mutex > lock (gpu_mutex);
513+ std::lock_guard<std::recursive_mutex > lock (gpu_mutex);
514514 auto iter = gpu_object_map.find (object_ids[i]);
515515 ARROW_CHECK (iter != gpu_object_map.end ());
516516 iter->second ->client_count ++;
@@ -576,7 +576,7 @@ Status PlasmaClient::Impl::GetBuffers(
576576 data + object->data_offset , object->data_size + object->metadata_size );
577577 } else {
578578#ifdef PLASMA_CUDA
579- std::lock_guard<std::mutex > lock (gpu_mutex);
579+ std::lock_guard<std::recursive_mutex > lock (gpu_mutex);
580580 auto handle = gpu_object_map.find (object_ids[i]);
581581 if (handle == gpu_object_map.end ()) {
582582 std::shared_ptr<CudaContext> context;
@@ -615,7 +615,7 @@ Status PlasmaClient::Impl::GetBuffers(
615615
616616Status PlasmaClient::Impl::Get (const std::vector<ObjectID>& object_ids,
617617 int64_t timeout_ms, std::vector<ObjectBuffer>* out) {
618- std::lock_guard<std::mutex > guard (client_mutex_);
618+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
619619
620620 const auto wrap_buffer = [=](const ObjectID& object_id,
621621 const std::shared_ptr<Buffer>& buffer) {
@@ -628,7 +628,7 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
628628
629629Status PlasmaClient::Impl::Get (const ObjectID* object_ids, int64_t num_objects,
630630 int64_t timeout_ms, ObjectBuffer* out) {
631- std::lock_guard<std::mutex > guard (client_mutex_);
631+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
632632
633633 const auto wrap_buffer = [](const ObjectID& object_id,
634634 const std::shared_ptr<Buffer>& buffer) { return buffer; };
@@ -646,7 +646,7 @@ Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
646646}
647647
648648Status PlasmaClient::Impl::Release (const ObjectID& object_id) {
649- std::lock_guard<std::mutex > guard (client_mutex_);
649+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
650650
651651 // If the client is already disconnected, ignore release requests.
652652 if (store_conn_ < 0 ) {
@@ -657,7 +657,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
657657
658658#ifdef PLASMA_CUDA
659659 if (object_entry->second ->object .device_num != 0 ) {
660- std::lock_guard<std::mutex > lock (gpu_mutex);
660+ std::lock_guard<std::recursive_mutex > lock (gpu_mutex);
661661 auto iter = gpu_object_map.find (object_id);
662662 ARROW_CHECK (iter != gpu_object_map.end ());
663663 if (--iter->second ->client_count == 0 ) {
@@ -685,7 +685,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
685685
686686// This method is used to query whether the plasma store contains an object.
687687Status PlasmaClient::Impl::Contains (const ObjectID& object_id, bool * has_object) {
688- std::lock_guard<std::mutex > guard (client_mutex_);
688+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
689689
690690 // Check if we already have a reference to the object.
691691 if (objects_in_use_.count (object_id) > 0 ) {
@@ -705,7 +705,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
705705}
706706
707707Status PlasmaClient::Impl::List (ObjectTable* objects) {
708- std::lock_guard<std::mutex > guard (client_mutex_);
708+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
709709 RETURN_NOT_OK (SendListRequest (store_conn_));
710710 std::vector<uint8_t > buffer;
711711 RETURN_NOT_OK (PlasmaReceive (store_conn_, MessageType::PlasmaListReply, &buffer));
@@ -784,7 +784,7 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t data
784784}
785785
786786Status PlasmaClient::Impl::Seal (const ObjectID& object_id) {
787- std::lock_guard<std::mutex > guard (client_mutex_);
787+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
788788
789789 // Make sure this client has a reference to the object before sending the
790790 // request to Plasma.
@@ -812,7 +812,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
812812}
813813
814814Status PlasmaClient::Impl::Abort (const ObjectID& object_id) {
815- std::lock_guard<std::mutex > guard (client_mutex_);
815+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
816816 auto object_entry = objects_in_use_.find (object_id);
817817 ARROW_CHECK (object_entry != objects_in_use_.end ())
818818 << " Plasma client called abort on an object without a reference to it" ;
@@ -828,7 +828,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
828828
829829#ifdef PLASMA_CUDA
830830 if (object_entry->second ->object .device_num != 0 ) {
831- std::lock_guard<std::mutex > lock (gpu_mutex);
831+ std::lock_guard<std::recursive_mutex > lock (gpu_mutex);
832832 auto iter = gpu_object_map.find (object_id);
833833 ARROW_CHECK (iter != gpu_object_map.end ());
834834 ARROW_CHECK (iter->second ->client_count == 1 );
@@ -851,7 +851,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
851851}
852852
853853Status PlasmaClient::Impl::Delete (const std::vector<ObjectID>& object_ids) {
854- std::lock_guard<std::mutex > guard (client_mutex_);
854+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
855855
856856 std::vector<ObjectID> not_in_use_ids;
857857 for (auto & object_id : object_ids) {
@@ -876,7 +876,7 @@ Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
876876}
877877
878878Status PlasmaClient::Impl::Evict (int64_t num_bytes, int64_t & num_bytes_evicted) {
879- std::lock_guard<std::mutex > guard (client_mutex_);
879+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
880880
881881 // Send a request to the store to evict objects.
882882 RETURN_NOT_OK (SendEvictRequest (store_conn_, num_bytes));
@@ -888,7 +888,7 @@ Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
888888}
889889
890890Status PlasmaClient::Impl::Hash (const ObjectID& object_id, uint8_t * digest) {
891- std::lock_guard<std::mutex > guard (client_mutex_);
891+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
892892
893893 // Get the plasma object data. We pass in a timeout of 0 to indicate that
894894 // the operation should timeout immediately.
@@ -905,7 +905,7 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
905905}
906906
907907Status PlasmaClient::Impl::Subscribe (int * fd) {
908- std::lock_guard<std::mutex > guard (client_mutex_);
908+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
909909
910910 int sock[2 ];
911911 // Create a non-blocking socket pair. This will only be used to send
@@ -929,7 +929,7 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
929929Status PlasmaClient::Impl::DecodeNotification (const uint8_t * buffer, ObjectID* object_id,
930930 int64_t * data_size,
931931 int64_t * metadata_size) {
932- std::lock_guard<std::mutex > guard (client_mutex_);
932+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
933933
934934 auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(buffer);
935935 ARROW_CHECK (object_info->object_id ()->size () == sizeof (ObjectID));
@@ -946,7 +946,7 @@ Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* o
946946
947947Status PlasmaClient::Impl::GetNotification (int fd, ObjectID* object_id,
948948 int64_t * data_size, int64_t * metadata_size) {
949- std::lock_guard<std::mutex > guard (client_mutex_);
949+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
950950
951951 auto notification = ReadMessageAsync (fd);
952952 if (notification == NULL ) {
@@ -958,7 +958,7 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
958958Status PlasmaClient::Impl::Connect (const std::string& store_socket_name,
959959 const std::string& manager_socket_name,
960960 int release_delay, int num_retries) {
961- std::lock_guard<std::mutex > guard (client_mutex_);
961+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
962962
963963 RETURN_NOT_OK (ConnectIpcSocketRetry (store_socket_name, num_retries, -1 , &store_conn_));
964964 if (manager_socket_name != " " ) {
@@ -977,7 +977,7 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
977977}
978978
979979Status PlasmaClient::Impl::Disconnect () {
980- std::lock_guard<std::mutex > guard (client_mutex_);
980+ std::lock_guard<std::recursive_mutex > guard (client_mutex_);
981981
982982 // NOTE: We purposefully do not finish sending release calls for objects in
983983 // use, so that we don't duplicate PlasmaClient::Release calls (when handling
0 commit comments