@@ -252,37 +252,6 @@ Status ServiceBasedActorInfoAccessor::AsyncCreateActor(
252252 return Status::OK ();
253253}
254254
255- Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll (
256- const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
257- const StatusCallback &done) {
258- RAY_CHECK (subscribe != nullptr );
259- fetch_all_data_operation_ = [this , subscribe](const StatusCallback &done) {
260- auto callback = [subscribe, done](
261- const Status &status,
262- const std::vector<rpc::ActorTableData> &actor_info_list) {
263- for (auto &actor_info : actor_info_list) {
264- subscribe (ActorID::FromBinary (actor_info.actor_id ()), actor_info);
265- }
266- if (done) {
267- done (status);
268- }
269- };
270- RAY_CHECK_OK (AsyncGetAll (callback));
271- };
272-
273- subscribe_all_operation_ = [this , subscribe](const StatusCallback &done) {
274- auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
275- ActorTableData actor_data;
276- actor_data.ParseFromString (data);
277- subscribe (ActorID::FromBinary (actor_data.actor_id ()), actor_data);
278- };
279- return client_impl_->GetGcsPubSub ().SubscribeAll (ACTOR_CHANNEL, on_subscribe, done);
280- };
281-
282- return subscribe_all_operation_ (
283- [this , done](const Status &status) { fetch_all_data_operation_ (done); });
284- }
285-
286255Status ServiceBasedActorInfoAccessor::AsyncSubscribe (
287256 const ActorID &actor_id,
288257 const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
@@ -866,189 +835,6 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllResourceUsage(
866835 return Status::OK ();
867836}
868837
869- ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor (
870- ServiceBasedGcsClient *client_impl)
871- : client_impl_(client_impl) {}
872-
873- Status ServiceBasedTaskInfoAccessor::AsyncAdd (
874- const std::shared_ptr<rpc::TaskTableData> &data_ptr, const StatusCallback &callback) {
875- TaskID task_id = TaskID::FromBinary (data_ptr->task ().task_spec ().task_id ());
876- JobID job_id = JobID::FromBinary (data_ptr->task ().task_spec ().job_id ());
877- RAY_LOG (DEBUG) << " Adding task, task id = " << task_id << " , job id = " << job_id;
878- rpc::AddTaskRequest request;
879- request.mutable_task_data ()->CopyFrom (*data_ptr);
880- client_impl_->GetGcsRpcClient ().AddTask (
881- request,
882- [task_id, job_id, callback](const Status &status, const rpc::AddTaskReply &reply) {
883- if (callback) {
884- callback (status);
885- }
886- RAY_LOG (DEBUG) << " Finished adding task, status = " << status
887- << " , task id = " << task_id << " , job id = " << job_id;
888- });
889- return Status::OK ();
890- }
891-
892- Status ServiceBasedTaskInfoAccessor::AsyncGet (
893- const TaskID &task_id, const OptionalItemCallback<rpc::TaskTableData> &callback) {
894- RAY_LOG (DEBUG) << " Getting task, task id = " << task_id
895- << " , job id = " << task_id.JobId ();
896- rpc::GetTaskRequest request;
897- request.set_task_id (task_id.Binary ());
898- client_impl_->GetGcsRpcClient ().GetTask (
899- request, [task_id, callback](const Status &status, const rpc::GetTaskReply &reply) {
900- if (reply.has_task_data ()) {
901- callback (status, reply.task_data ());
902- } else {
903- callback (status, boost::none);
904- }
905- RAY_LOG (DEBUG) << " Finished getting task, status = " << status
906- << " , task id = " << task_id << " , job id = " << task_id.JobId ();
907- });
908- return Status::OK ();
909- }
910-
911- Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease (
912- const std::shared_ptr<rpc::TaskLeaseData> &data_ptr, const StatusCallback &callback) {
913- TaskID task_id = TaskID::FromBinary (data_ptr->task_id ());
914- NodeID node_id = NodeID::FromBinary (data_ptr->node_manager_id ());
915- RAY_LOG (DEBUG) << " Adding task lease, task id = " << task_id
916- << " , node id = " << node_id << " , job id = " << task_id.JobId ();
917- rpc::AddTaskLeaseRequest request;
918- request.mutable_task_lease_data ()->CopyFrom (*data_ptr);
919- client_impl_->GetGcsRpcClient ().AddTaskLease (
920- request, [task_id, node_id, callback](const Status &status,
921- const rpc::AddTaskLeaseReply &reply) {
922- if (callback) {
923- callback (status);
924- }
925- RAY_LOG (DEBUG) << " Finished adding task lease, status = " << status
926- << " , task id = " << task_id << " , node id = " << node_id
927- << " , job id = " << task_id.JobId ();
928- });
929- return Status::OK ();
930- }
931-
932- Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease (
933- const TaskID &task_id, const OptionalItemCallback<rpc::TaskLeaseData> &callback) {
934- RAY_LOG (DEBUG) << " Getting task lease, task id = " << task_id
935- << " , job id = " << task_id.JobId ();
936- rpc::GetTaskLeaseRequest request;
937- request.set_task_id (task_id.Binary ());
938- client_impl_->GetGcsRpcClient ().GetTaskLease (
939- request,
940- [task_id, callback](const Status &status, const rpc::GetTaskLeaseReply &reply) {
941- if (reply.has_task_lease_data ()) {
942- callback (status, reply.task_lease_data ());
943- } else {
944- callback (status, boost::none);
945- }
946- RAY_LOG (DEBUG) << " Finished getting task lease, status = " << status
947- << " , task id = " << task_id << " , job id = " << task_id.JobId ();
948- });
949- return Status::OK ();
950- }
951-
952- Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease (
953- const TaskID &task_id,
954- const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
955- const StatusCallback &done) {
956- RAY_CHECK (subscribe != nullptr )
957- << " Failed to subscribe task lease, task id = " << task_id
958- << " , job id = " << task_id.JobId ();
959-
960- auto fetch_data_operation = [this , task_id,
961- subscribe](const StatusCallback &fetch_done) {
962- auto callback = [task_id, subscribe, fetch_done](
963- const Status &status,
964- const boost::optional<rpc::TaskLeaseData> &result) {
965- subscribe (task_id, result);
966- if (fetch_done) {
967- fetch_done (status);
968- }
969- };
970- RAY_CHECK_OK (AsyncGetTaskLease (task_id, callback));
971- };
972-
973- auto subscribe_operation = [this , task_id,
974- subscribe](const StatusCallback &subscribe_done) {
975- auto on_subscribe = [task_id, subscribe](const std::string &id,
976- const std::string &data) {
977- TaskLeaseData task_lease_data;
978- task_lease_data.ParseFromString (data);
979- subscribe (task_id, task_lease_data);
980- };
981- return client_impl_->GetGcsPubSub ().Subscribe (TASK_LEASE_CHANNEL, task_id.Hex (),
982- on_subscribe, subscribe_done);
983- };
984-
985- subscribe_task_lease_operations_[task_id] = subscribe_operation;
986- fetch_task_lease_data_operations_[task_id] = fetch_data_operation;
987- return subscribe_operation (
988- [fetch_data_operation, done](const Status &status) { fetch_data_operation (done); });
989- }
990-
991- Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease (const TaskID &task_id) {
992- RAY_LOG (DEBUG) << " Unsubscribing task lease, task id = " << task_id
993- << " , job id = " << task_id.JobId ();
994- auto status =
995- client_impl_->GetGcsPubSub ().Unsubscribe (TASK_LEASE_CHANNEL, task_id.Hex ());
996- subscribe_task_lease_operations_.erase (task_id);
997- fetch_task_lease_data_operations_.erase (task_id);
998- RAY_LOG (DEBUG) << " Finished unsubscribing task lease, task id = " << task_id
999- << " , job id = " << task_id.JobId ();
1000- return status;
1001- }
1002-
1003- Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction (
1004- const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
1005- const StatusCallback &callback) {
1006- auto num_reconstructions = data_ptr->num_reconstructions ();
1007- NodeID node_id = NodeID::FromBinary (data_ptr->node_manager_id ());
1008- TaskID task_id = TaskID::FromBinary (data_ptr->task_id ());
1009- RAY_LOG (DEBUG) << " Reconstructing task, reconstructions num = " << num_reconstructions
1010- << " , node id = " << node_id << " , task id = " << task_id
1011- << " , job id = " << task_id.JobId ();
1012- rpc::AttemptTaskReconstructionRequest request;
1013- request.mutable_task_reconstruction ()->CopyFrom (*data_ptr);
1014- client_impl_->GetGcsRpcClient ().AttemptTaskReconstruction (
1015- request,
1016- [num_reconstructions, node_id, task_id, callback](
1017- const Status &status, const rpc::AttemptTaskReconstructionReply &reply) {
1018- if (callback) {
1019- callback (status);
1020- }
1021- RAY_LOG (DEBUG) << " Finished reconstructing task, status = " << status
1022- << " , reconstructions num = " << num_reconstructions
1023- << " , node id = " << node_id << " , task id = " << task_id
1024- << " , job id = " << task_id.JobId ();
1025- });
1026- return Status::OK ();
1027- }
1028-
1029- void ServiceBasedTaskInfoAccessor::AsyncResubscribe (bool is_pubsub_server_restarted) {
1030- RAY_LOG (DEBUG) << " Reestablishing subscription for task info." ;
1031- // If only the GCS sever has restarted, we only need to fetch data from the GCS server.
1032- // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
1033- // server first, then fetch data from the GCS server.
1034- if (is_pubsub_server_restarted) {
1035- for (auto &item : subscribe_task_lease_operations_) {
1036- auto &task_id = item.first ;
1037- RAY_CHECK_OK (item.second ([this , task_id](const Status &status) {
1038- fetch_task_lease_data_operations_[task_id](nullptr );
1039- }));
1040- }
1041- } else {
1042- for (auto &item : fetch_task_lease_data_operations_) {
1043- item.second (nullptr );
1044- }
1045- }
1046- }
1047-
1048- bool ServiceBasedTaskInfoAccessor::IsTaskLeaseUnsubscribed (const TaskID &task_id) {
1049- return client_impl_->GetGcsPubSub ().IsUnsubscribed (TASK_LEASE_CHANNEL, task_id.Hex ());
1050- }
1051-
1052838ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor (
1053839 ServiceBasedGcsClient *client_impl)
1054840 : client_impl_(client_impl) {}
0 commit comments