Skip to content

Commit bfe8532

Browse files
author
Alex Wu
authored
[core] Cleanup dead pubsub related code (ray-project#16629)
1 parent ea23382 commit bfe8532

25 files changed

Lines changed: 25 additions & 1436 deletions

src/ray/common/ray_config_def.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,6 @@ RAY_CONFIG(uint64_t, gcs_max_concurrent_resource_pulls, 100)
246246
// Feature flag to turn on resource report polling. Polling and raylet pushing are
247247
// mutually exlusive.
248248
RAY_CONFIG(bool, pull_based_resource_reporting, true)
249-
// Feature flag to use grpc instead of redis for resource broadcast.
250-
RAY_CONFIG(bool, grpc_based_resource_broadcast, true)
251249
// Feature flag to enable grpc based pubsub in GCS.
252250
RAY_CONFIG(bool, gcs_grpc_based_pubsub, false)
253251

src/ray/gcs/accessor.h

Lines changed: 0 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,6 @@ class ActorInfoAccessor {
8787
virtual Status AsyncCreateActor(const TaskSpecification &task_spec,
8888
const StatusCallback &callback) = 0;
8989

90-
/// Subscribe to any register or update operations of actors.
91-
///
92-
/// \param subscribe Callback that will be called each time when an actor is registered
93-
/// or updated.
94-
/// \param done Callback that will be called when subscription is complete and we
95-
/// are ready to receive notification.
96-
/// \return Status
97-
virtual Status AsyncSubscribeAll(
98-
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
99-
const StatusCallback &done) = 0;
100-
10190
/// Subscribe to any update operations of an actor.
10291
///
10392
/// \param actor_id The ID of actor to be subscribed to.
@@ -193,96 +182,6 @@ class JobInfoAccessor {
193182
JobInfoAccessor() = default;
194183
};
195184

196-
/// \class TaskInfoAccessor
197-
/// `TaskInfoAccessor` is a sub-interface of `GcsClient`.
198-
/// This class includes all the methods that are related to accessing
199-
/// task information in the GCS.
200-
class TaskInfoAccessor {
201-
public:
202-
virtual ~TaskInfoAccessor() {}
203-
204-
/// Add a task to GCS asynchronously.
205-
///
206-
/// \param data_ptr The task that will be added to GCS.
207-
/// \param callback Callback that will be called after task has been added
208-
/// to GCS.
209-
/// \return Status
210-
virtual Status AsyncAdd(const std::shared_ptr<rpc::TaskTableData> &data_ptr,
211-
const StatusCallback &callback) = 0;
212-
213-
/// Get task information from GCS asynchronously.
214-
///
215-
/// \param task_id The ID of the task to look up in GCS.
216-
/// \param callback Callback that is called after lookup finished.
217-
/// \return Status
218-
virtual Status AsyncGet(const TaskID &task_id,
219-
const OptionalItemCallback<rpc::TaskTableData> &callback) = 0;
220-
221-
/// Add a task lease to GCS asynchronously.
222-
///
223-
/// \param data_ptr The task lease that will be added to GCS.
224-
/// \param callback Callback that will be called after task lease has been added
225-
/// to GCS.
226-
/// \return Status
227-
virtual Status AsyncAddTaskLease(const std::shared_ptr<rpc::TaskLeaseData> &data_ptr,
228-
const StatusCallback &callback) = 0;
229-
230-
/// Get task lease information from GCS asynchronously.
231-
///
232-
/// \param task_id The ID of the task to look up in GCS.
233-
/// \param callback Callback that is called after lookup finished.
234-
/// \return Status
235-
virtual Status AsyncGetTaskLease(
236-
const TaskID &task_id,
237-
const OptionalItemCallback<rpc::TaskLeaseData> &callback) = 0;
238-
239-
/// Subscribe asynchronously to the event that the given task lease is added in GCS.
240-
///
241-
/// \param task_id The ID of the task to be subscribed to.
242-
/// \param subscribe Callback that will be called each time when the task lease is
243-
/// updated or the task lease is empty currently.
244-
/// \param done Callback that will be called when subscription is complete.
245-
/// \return Status
246-
virtual Status AsyncSubscribeTaskLease(
247-
const TaskID &task_id,
248-
const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
249-
const StatusCallback &done) = 0;
250-
251-
/// Cancel subscription to a task lease asynchronously.
252-
///
253-
/// \param task_id The ID of the task to be unsubscribed to.
254-
/// \return Status
255-
virtual Status AsyncUnsubscribeTaskLease(const TaskID &task_id) = 0;
256-
257-
/// Attempt task reconstruction to GCS asynchronously.
258-
///
259-
/// \param data_ptr The task reconstruction that will be added to GCS.
260-
/// \param callback Callback that will be called after task reconstruction
261-
/// has been added to GCS.
262-
/// \return Status
263-
virtual Status AttemptTaskReconstruction(
264-
const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
265-
const StatusCallback &callback) = 0;
266-
267-
/// Reestablish subscription.
268-
/// This should be called when GCS server restarts from a failure.
269-
/// PubSub server restart will cause GCS server restart. In this case, we need to
270-
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
271-
/// server.
272-
///
273-
/// \param is_pubsub_server_restarted Whether pubsub server is restarted.
274-
virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
275-
276-
/// Check if the specified task lease is unsubscribed.
277-
///
278-
/// \param task_id The ID of the task.
279-
/// \return Whether the specified task lease is unsubscribed.
280-
virtual bool IsTaskLeaseUnsubscribed(const TaskID &task_id) = 0;
281-
282-
protected:
283-
TaskInfoAccessor() = default;
284-
};
285-
286185
/// `ObjectInfoAccessor` is a sub-interface of `GcsClient`.
287186
/// This class includes all the methods that are related to accessing
288187
/// object information in the GCS.

src/ray/gcs/gcs_client.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,6 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
123123
return *node_resource_accessor_;
124124
}
125125

126-
/// Get the sub-interface for accessing task information in GCS.
127-
/// This function is thread safe.
128-
TaskInfoAccessor &Tasks() {
129-
RAY_CHECK(task_accessor_ != nullptr);
130-
return *task_accessor_;
131-
}
132-
133126
/// Get the sub-interface for accessing error information in GCS.
134127
/// This function is thread safe.
135128
ErrorInfoAccessor &Errors() {
@@ -178,7 +171,6 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
178171
std::unique_ptr<ObjectInfoAccessor> object_accessor_;
179172
std::unique_ptr<NodeInfoAccessor> node_accessor_;
180173
std::unique_ptr<NodeResourceInfoAccessor> node_resource_accessor_;
181-
std::unique_ptr<TaskInfoAccessor> task_accessor_;
182174
std::unique_ptr<ErrorInfoAccessor> error_accessor_;
183175
std::unique_ptr<StatsInfoAccessor> stats_accessor_;
184176
std::unique_ptr<WorkerInfoAccessor> worker_accessor_;

src/ray/gcs/gcs_client/service_based_accessor.cc

Lines changed: 0 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -252,37 +252,6 @@ Status ServiceBasedActorInfoAccessor::AsyncCreateActor(
252252
return Status::OK();
253253
}
254254

255-
Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll(
256-
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
257-
const StatusCallback &done) {
258-
RAY_CHECK(subscribe != nullptr);
259-
fetch_all_data_operation_ = [this, subscribe](const StatusCallback &done) {
260-
auto callback = [subscribe, done](
261-
const Status &status,
262-
const std::vector<rpc::ActorTableData> &actor_info_list) {
263-
for (auto &actor_info : actor_info_list) {
264-
subscribe(ActorID::FromBinary(actor_info.actor_id()), actor_info);
265-
}
266-
if (done) {
267-
done(status);
268-
}
269-
};
270-
RAY_CHECK_OK(AsyncGetAll(callback));
271-
};
272-
273-
subscribe_all_operation_ = [this, subscribe](const StatusCallback &done) {
274-
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
275-
ActorTableData actor_data;
276-
actor_data.ParseFromString(data);
277-
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
278-
};
279-
return client_impl_->GetGcsPubSub().SubscribeAll(ACTOR_CHANNEL, on_subscribe, done);
280-
};
281-
282-
return subscribe_all_operation_(
283-
[this, done](const Status &status) { fetch_all_data_operation_(done); });
284-
}
285-
286255
Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
287256
const ActorID &actor_id,
288257
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
@@ -866,189 +835,6 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncGetAllResourceUsage(
866835
return Status::OK();
867836
}
868837

869-
ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor(
870-
ServiceBasedGcsClient *client_impl)
871-
: client_impl_(client_impl) {}
872-
873-
Status ServiceBasedTaskInfoAccessor::AsyncAdd(
874-
const std::shared_ptr<rpc::TaskTableData> &data_ptr, const StatusCallback &callback) {
875-
TaskID task_id = TaskID::FromBinary(data_ptr->task().task_spec().task_id());
876-
JobID job_id = JobID::FromBinary(data_ptr->task().task_spec().job_id());
877-
RAY_LOG(DEBUG) << "Adding task, task id = " << task_id << ", job id = " << job_id;
878-
rpc::AddTaskRequest request;
879-
request.mutable_task_data()->CopyFrom(*data_ptr);
880-
client_impl_->GetGcsRpcClient().AddTask(
881-
request,
882-
[task_id, job_id, callback](const Status &status, const rpc::AddTaskReply &reply) {
883-
if (callback) {
884-
callback(status);
885-
}
886-
RAY_LOG(DEBUG) << "Finished adding task, status = " << status
887-
<< ", task id = " << task_id << ", job id = " << job_id;
888-
});
889-
return Status::OK();
890-
}
891-
892-
Status ServiceBasedTaskInfoAccessor::AsyncGet(
893-
const TaskID &task_id, const OptionalItemCallback<rpc::TaskTableData> &callback) {
894-
RAY_LOG(DEBUG) << "Getting task, task id = " << task_id
895-
<< ", job id = " << task_id.JobId();
896-
rpc::GetTaskRequest request;
897-
request.set_task_id(task_id.Binary());
898-
client_impl_->GetGcsRpcClient().GetTask(
899-
request, [task_id, callback](const Status &status, const rpc::GetTaskReply &reply) {
900-
if (reply.has_task_data()) {
901-
callback(status, reply.task_data());
902-
} else {
903-
callback(status, boost::none);
904-
}
905-
RAY_LOG(DEBUG) << "Finished getting task, status = " << status
906-
<< ", task id = " << task_id << ", job id = " << task_id.JobId();
907-
});
908-
return Status::OK();
909-
}
910-
911-
Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease(
912-
const std::shared_ptr<rpc::TaskLeaseData> &data_ptr, const StatusCallback &callback) {
913-
TaskID task_id = TaskID::FromBinary(data_ptr->task_id());
914-
NodeID node_id = NodeID::FromBinary(data_ptr->node_manager_id());
915-
RAY_LOG(DEBUG) << "Adding task lease, task id = " << task_id
916-
<< ", node id = " << node_id << ", job id = " << task_id.JobId();
917-
rpc::AddTaskLeaseRequest request;
918-
request.mutable_task_lease_data()->CopyFrom(*data_ptr);
919-
client_impl_->GetGcsRpcClient().AddTaskLease(
920-
request, [task_id, node_id, callback](const Status &status,
921-
const rpc::AddTaskLeaseReply &reply) {
922-
if (callback) {
923-
callback(status);
924-
}
925-
RAY_LOG(DEBUG) << "Finished adding task lease, status = " << status
926-
<< ", task id = " << task_id << ", node id = " << node_id
927-
<< ", job id = " << task_id.JobId();
928-
});
929-
return Status::OK();
930-
}
931-
932-
Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease(
933-
const TaskID &task_id, const OptionalItemCallback<rpc::TaskLeaseData> &callback) {
934-
RAY_LOG(DEBUG) << "Getting task lease, task id = " << task_id
935-
<< ", job id = " << task_id.JobId();
936-
rpc::GetTaskLeaseRequest request;
937-
request.set_task_id(task_id.Binary());
938-
client_impl_->GetGcsRpcClient().GetTaskLease(
939-
request,
940-
[task_id, callback](const Status &status, const rpc::GetTaskLeaseReply &reply) {
941-
if (reply.has_task_lease_data()) {
942-
callback(status, reply.task_lease_data());
943-
} else {
944-
callback(status, boost::none);
945-
}
946-
RAY_LOG(DEBUG) << "Finished getting task lease, status = " << status
947-
<< ", task id = " << task_id << ", job id = " << task_id.JobId();
948-
});
949-
return Status::OK();
950-
}
951-
952-
Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
953-
const TaskID &task_id,
954-
const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
955-
const StatusCallback &done) {
956-
RAY_CHECK(subscribe != nullptr)
957-
<< "Failed to subscribe task lease, task id = " << task_id
958-
<< ", job id = " << task_id.JobId();
959-
960-
auto fetch_data_operation = [this, task_id,
961-
subscribe](const StatusCallback &fetch_done) {
962-
auto callback = [task_id, subscribe, fetch_done](
963-
const Status &status,
964-
const boost::optional<rpc::TaskLeaseData> &result) {
965-
subscribe(task_id, result);
966-
if (fetch_done) {
967-
fetch_done(status);
968-
}
969-
};
970-
RAY_CHECK_OK(AsyncGetTaskLease(task_id, callback));
971-
};
972-
973-
auto subscribe_operation = [this, task_id,
974-
subscribe](const StatusCallback &subscribe_done) {
975-
auto on_subscribe = [task_id, subscribe](const std::string &id,
976-
const std::string &data) {
977-
TaskLeaseData task_lease_data;
978-
task_lease_data.ParseFromString(data);
979-
subscribe(task_id, task_lease_data);
980-
};
981-
return client_impl_->GetGcsPubSub().Subscribe(TASK_LEASE_CHANNEL, task_id.Hex(),
982-
on_subscribe, subscribe_done);
983-
};
984-
985-
subscribe_task_lease_operations_[task_id] = subscribe_operation;
986-
fetch_task_lease_data_operations_[task_id] = fetch_data_operation;
987-
return subscribe_operation(
988-
[fetch_data_operation, done](const Status &status) { fetch_data_operation(done); });
989-
}
990-
991-
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) {
992-
RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id
993-
<< ", job id = " << task_id.JobId();
994-
auto status =
995-
client_impl_->GetGcsPubSub().Unsubscribe(TASK_LEASE_CHANNEL, task_id.Hex());
996-
subscribe_task_lease_operations_.erase(task_id);
997-
fetch_task_lease_data_operations_.erase(task_id);
998-
RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id
999-
<< ", job id = " << task_id.JobId();
1000-
return status;
1001-
}
1002-
1003-
Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction(
1004-
const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
1005-
const StatusCallback &callback) {
1006-
auto num_reconstructions = data_ptr->num_reconstructions();
1007-
NodeID node_id = NodeID::FromBinary(data_ptr->node_manager_id());
1008-
TaskID task_id = TaskID::FromBinary(data_ptr->task_id());
1009-
RAY_LOG(DEBUG) << "Reconstructing task, reconstructions num = " << num_reconstructions
1010-
<< ", node id = " << node_id << ", task id = " << task_id
1011-
<< ", job id = " << task_id.JobId();
1012-
rpc::AttemptTaskReconstructionRequest request;
1013-
request.mutable_task_reconstruction()->CopyFrom(*data_ptr);
1014-
client_impl_->GetGcsRpcClient().AttemptTaskReconstruction(
1015-
request,
1016-
[num_reconstructions, node_id, task_id, callback](
1017-
const Status &status, const rpc::AttemptTaskReconstructionReply &reply) {
1018-
if (callback) {
1019-
callback(status);
1020-
}
1021-
RAY_LOG(DEBUG) << "Finished reconstructing task, status = " << status
1022-
<< ", reconstructions num = " << num_reconstructions
1023-
<< ", node id = " << node_id << ", task id = " << task_id
1024-
<< ", job id = " << task_id.JobId();
1025-
});
1026-
return Status::OK();
1027-
}
1028-
1029-
void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
1030-
RAY_LOG(DEBUG) << "Reestablishing subscription for task info.";
1031-
// If only the GCS sever has restarted, we only need to fetch data from the GCS server.
1032-
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
1033-
// server first, then fetch data from the GCS server.
1034-
if (is_pubsub_server_restarted) {
1035-
for (auto &item : subscribe_task_lease_operations_) {
1036-
auto &task_id = item.first;
1037-
RAY_CHECK_OK(item.second([this, task_id](const Status &status) {
1038-
fetch_task_lease_data_operations_[task_id](nullptr);
1039-
}));
1040-
}
1041-
} else {
1042-
for (auto &item : fetch_task_lease_data_operations_) {
1043-
item.second(nullptr);
1044-
}
1045-
}
1046-
}
1047-
1048-
bool ServiceBasedTaskInfoAccessor::IsTaskLeaseUnsubscribed(const TaskID &task_id) {
1049-
return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_LEASE_CHANNEL, task_id.Hex());
1050-
}
1051-
1052838
ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor(
1053839
ServiceBasedGcsClient *client_impl)
1054840
: client_impl_(client_impl) {}

0 commit comments

Comments
 (0)