Skip to content

Commit bb8e75b

Browse files
zhijunfupcmoritz
authored andcommitted
[grpc] refactor rpc server to support multiple io services (#5023)
1 parent aa5fc52 commit bb8e75b

File tree

6 files changed

+104
-50
lines changed

6 files changed

+104
-50
lines changed

src/ray/raylet/node_manager.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
101101
gcs_client_->raylet_task_table(), gcs_client_->raylet_task_table(),
102102
config.max_lineage_size),
103103
actor_registry_(),
104-
node_manager_server_(config.node_manager_port, io_service, *this),
104+
node_manager_server_("NodeManager", config.node_manager_port),
105+
node_manager_service_(io_service, *this),
105106
client_call_manager_(io_service) {
106107
RAY_CHECK(heartbeat_period_.count() > 0);
107108
// Initialize the resource map with own cluster resource configuration.
@@ -119,6 +120,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
119120

120121
RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
121122
// Run the node manger rpc server.
123+
node_manager_server_.RegisterService(node_manager_service_);
122124
node_manager_server_.Run();
123125
}
124126

src/ray/raylet/node_manager.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
512512
std::unordered_map<ActorID, ActorCheckpointID> checkpoint_id_to_restore_;
513513

514514
/// The RPC server.
515-
rpc::NodeManagerServer node_manager_server_;
515+
rpc::GrpcServer node_manager_server_;
516+
517+
/// The RPC service.
518+
rpc::NodeManagerGrpcService node_manager_service_;
516519

517520
/// The `ClientCallManager` object that is shared by all `NodeManagerClient`s.
518521
rpc::ClientCallManager client_call_manager_;

src/ray/rpc/grpc_server.cc

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "ray/rpc/grpc_server.h"
2+
#include <grpcpp/impl/service_type.h>
23

34
namespace ray {
45
namespace rpc {
@@ -9,17 +10,18 @@ void GrpcServer::Run() {
910
grpc::ServerBuilder builder;
1011
// TODO(hchen): Add options for authentication.
1112
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_);
12-
// Allow subclasses to register concrete services.
13-
RegisterServices(builder);
13+
// Register all the services to this server.
14+
for (auto &entry : services_) {
15+
builder.RegisterService(&entry.get());
16+
}
1417
// Get hold of the completion queue used for the asynchronous communication
1518
// with the gRPC runtime.
1619
cq_ = builder.AddCompletionQueue();
1720
// Build and start server.
1821
server_ = builder.BuildAndStart();
1922
RAY_LOG(DEBUG) << name_ << " server started, listening on port " << port_ << ".";
2023

21-
// Allow subclasses to initialize the server call factories.
22-
InitServerCallFactories(&server_call_factories_and_concurrencies_);
24+
// Create calls for all the server call factories.
2325
for (auto &entry : server_call_factories_and_concurrencies_) {
2426
for (int i = 0; i < entry.second; i++) {
2527
// Create and request calls from the factory.
@@ -31,6 +33,11 @@ void GrpcServer::Run() {
3133
polling_thread.detach();
3234
}
3335

36+
void GrpcServer::RegisterService(GrpcService &service) {
37+
services_.emplace_back(service.GetGrpcService());
38+
service.InitServerCallFactories(cq_, &server_call_factories_and_concurrencies_);
39+
}
40+
3441
void GrpcServer::PollEventsFromCompletionQueue() {
3542
void *tag;
3643
bool ok;
@@ -48,7 +55,7 @@ void GrpcServer::PollEventsFromCompletionQueue() {
4855
// incoming request.
4956
server_call->GetFactory().CreateCall();
5057
server_call->SetState(ServerCallState::PROCESSING);
51-
main_service_.post([server_call] { server_call->HandleRequest(); });
58+
server_call->HandleRequest();
5259
break;
5360
case ServerCallState::SENDING_REPLY:
5461
// The reply has been sent, this call can be deleted now.

src/ray/rpc/grpc_server.h

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
namespace ray {
1313
namespace rpc {
1414

15-
/// Base class that represents an abstract gRPC server.
15+
class GrpcService;
16+
17+
/// Class that represents an gRPC server.
1618
///
1719
/// A `GrpcServer` listens on a specific port. It owns
1820
/// 1) a `ServerCompletionQueue` that is used for polling events from gRPC,
@@ -28,11 +30,7 @@ class GrpcServer {
2830
/// \param[in] name Name of this server, used for logging and debugging purpose.
2931
/// \param[in] port The port to bind this server to. If it's 0, a random available port
3032
/// will be chosen.
31-
/// \param[in] main_service The main event loop, to which service handler functions
32-
/// will be posted.
33-
GrpcServer(const std::string &name, const uint32_t port,
34-
boost::asio::io_service &main_service)
35-
: name_(name), port_(port), main_service_(main_service) {}
33+
GrpcServer(const std::string &name, const uint32_t port) : name_(name), port_(port) {}
3634

3735
/// Destruct this gRPC server.
3836
~GrpcServer() {
@@ -46,36 +44,25 @@ class GrpcServer {
4644
/// Get the port of this gRPC server.
4745
int GetPort() const { return port_; }
4846

49-
protected:
50-
/// Subclasses should implement this method and register one or multiple gRPC services
51-
/// to the given `ServerBuilder`.
47+
/// Register a grpc service. Multiple services can be registered to the same server.
48+
/// Note that the `service` registered must remain valid for the lifetime of the
49+
/// `GrpcServer`, as it holds the underlying `grpc::Service`.
5250
///
53-
/// \param[in] builder The `ServerBuilder` instance to register services to.
54-
virtual void RegisterServices(grpc::ServerBuilder &builder) = 0;
55-
56-
/// Subclasses should implement this method to initialize the `ServerCallFactory`
57-
/// instances, as well as specify maximum number of concurrent requests that gRPC
58-
/// server can "accept" (not "handle"). Each factory will be used to create
59-
/// `accept_concurrency` `ServerCall` objects, each of which will be used to accept and
60-
/// handle an incoming request.
61-
///
62-
/// \param[out] server_call_factories_and_concurrencies The `ServerCallFactory` objects,
63-
/// and the maximum number of concurrent requests that gRPC server can accept.
64-
virtual void InitServerCallFactories(
65-
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
66-
*server_call_factories_and_concurrencies) = 0;
51+
/// \param[in] service A `GrpcService` to register to this server.
52+
void RegisterService(GrpcService &service);
6753

54+
protected:
6855
/// This function runs in a background thread. It keeps polling events from the
6956
/// `ServerCompletionQueue`, and dispaches the event to the `ServiceHandler` instances
7057
/// via the `ServerCall` objects.
7158
void PollEventsFromCompletionQueue();
7259

73-
/// The main event loop, to which the service handler functions will be posted.
74-
boost::asio::io_service &main_service_;
7560
/// Name of this server, used for logging and debugging purpose.
7661
const std::string name_;
7762
/// Port of this server.
7863
int port_;
64+
/// The `grpc::Service` objects which should be registered to `ServerBuilder`.
65+
std::vector<std::reference_wrapper<grpc::Service>> services_;
7966
/// The `ServerCallFactory` objects, and the maximum number of concurrent requests that
8067
/// gRPC server can accept.
8168
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
@@ -86,6 +73,46 @@ class GrpcServer {
8673
std::unique_ptr<grpc::Server> server_;
8774
};
8875

76+
/// Base class that represents an abstract gRPC service.
77+
///
78+
/// Subclass should implement `InitServerCallFactories` to decide
79+
/// which kinds of requests this service should accept.
80+
class GrpcService {
81+
public:
82+
/// Constructor.
83+
///
84+
/// \param[in] main_service The main event loop, to which service handler functions
85+
/// will be posted.
86+
GrpcService(boost::asio::io_service &main_service) : main_service_(main_service) {}
87+
88+
/// Destruct this gRPC service.
89+
~GrpcService() {}
90+
91+
protected:
92+
/// Return the underlying grpc::Service object for this class.
93+
/// This is passed to `GrpcServer` to be registered to grpc `ServerBuilder`.
94+
virtual grpc::Service &GetGrpcService() = 0;
95+
96+
/// Subclasses should implement this method to initialize the `ServerCallFactory`
97+
/// instances, as well as specify maximum number of concurrent requests that gRPC
98+
/// server can "accept" (not "handle"). Each factory will be used to create
99+
/// `accept_concurrency` `ServerCall` objects, each of which will be used to accept and
100+
/// handle an incoming request.
101+
///
102+
/// \param[in] cq The grpc completion queue.
103+
/// \param[out] server_call_factories_and_concurrencies The `ServerCallFactory` objects,
104+
/// and the maximum number of concurrent requests that gRPC server can accept.
105+
virtual void InitServerCallFactories(
106+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
107+
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
108+
*server_call_factories_and_concurrencies) = 0;
109+
110+
/// The main event loop, to which the service handler functions will be posted.
111+
boost::asio::io_service &main_service_;
112+
113+
friend class GrpcServer;
114+
};
115+
89116
} // namespace rpc
90117
} // namespace ray
91118

src/ray/rpc/node_manager_server.h

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,31 @@ class NodeManagerServiceHandler {
2525
RequestDoneCallback done_callback) = 0;
2626
};
2727

28-
/// The `GrpcServer` for `NodeManagerService`.
29-
class NodeManagerServer : public GrpcServer {
28+
/// The `GrpcService` for `NodeManagerService`.
29+
class NodeManagerGrpcService : public GrpcService {
3030
public:
3131
/// Constructor.
3232
///
33-
/// \param[in] port See super class.
34-
/// \param[in] main_service See super class.
33+
/// \param[in] io_service See super class.
3534
/// \param[in] handler The service handler that actually handle the requests.
36-
NodeManagerServer(const uint32_t port, boost::asio::io_service &main_service,
37-
NodeManagerServiceHandler &service_handler)
38-
: GrpcServer("NodeManager", port, main_service),
39-
service_handler_(service_handler){};
35+
NodeManagerGrpcService(boost::asio::io_service &io_service,
36+
NodeManagerServiceHandler &service_handler)
37+
: GrpcService(io_service), service_handler_(service_handler){};
4038

41-
void RegisterServices(grpc::ServerBuilder &builder) override {
42-
/// Register `NodeManagerService`.
43-
builder.RegisterService(&service_);
44-
}
39+
protected:
40+
grpc::Service &GetGrpcService() override { return service_; }
4541

4642
void InitServerCallFactories(
43+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
4744
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
4845
*server_call_factories_and_concurrencies) override {
4946
// Initialize the factory for `ForwardTask` requests.
5047
std::unique_ptr<ServerCallFactory> forward_task_call_factory(
5148
new ServerCallFactoryImpl<NodeManagerService, NodeManagerServiceHandler,
5249
ForwardTaskRequest, ForwardTaskReply>(
5350
service_, &NodeManagerService::AsyncService::RequestForwardTask,
54-
service_handler_, &NodeManagerServiceHandler::HandleForwardTask, cq_));
51+
service_handler_, &NodeManagerServiceHandler::HandleForwardTask, cq,
52+
main_service_));
5553

5654
// Set `ForwardTask`'s accept concurrency to 100.
5755
server_call_factories_and_concurrencies->emplace_back(
@@ -61,6 +59,7 @@ class NodeManagerServer : public GrpcServer {
6159
private:
6260
/// The grpc async service object.
6361
NodeManagerService::AsyncService service_;
62+
6463
/// The service handler that actually handle the requests.
6564
NodeManagerServiceHandler &service_handler_;
6665
};

src/ray/rpc/server_call.h

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,20 +94,27 @@ class ServerCallImpl : public ServerCall {
9494
/// \param[in] factory The factory which created this call.
9595
/// \param[in] service_handler The service handler that handles the request.
9696
/// \param[in] handle_request_function Pointer to the service handler function.
97+
/// \param[in] io_service The event loop.
9798
ServerCallImpl(
9899
const ServerCallFactory &factory, ServiceHandler &service_handler,
99-
HandleRequestFunction<ServiceHandler, Request, Reply> handle_request_function)
100+
HandleRequestFunction<ServiceHandler, Request, Reply> handle_request_function,
101+
boost::asio::io_service &io_service)
100102
: state_(ServerCallState::PENDING),
101103
factory_(factory),
102104
service_handler_(service_handler),
103105
handle_request_function_(handle_request_function),
104-
response_writer_(&context_) {}
106+
response_writer_(&context_),
107+
io_service_(io_service) {}
105108

106109
ServerCallState GetState() const override { return state_; }
107110

108111
void SetState(const ServerCallState &new_state) override { state_ = new_state; }
109112

110113
void HandleRequest() override {
114+
io_service_.post([this] { HandleRequestImpl(); });
115+
}
116+
117+
void HandleRequestImpl() {
111118
state_ = ServerCallState::PROCESSING;
112119
(service_handler_.*handle_request_function_)(request_, &reply_,
113120
[this](Status status) {
@@ -146,6 +153,9 @@ class ServerCallImpl : public ServerCall {
146153
/// The reponse writer.
147154
grpc::ServerAsyncResponseWriter<Reply> response_writer_;
148155

156+
/// The event loop.
157+
boost::asio::io_service &io_service_;
158+
149159
/// The request message.
150160
Request request_;
151161

@@ -185,23 +195,26 @@ class ServerCallFactoryImpl : public ServerCallFactory {
185195
/// \param[in] service_handler The service handler that handles the request.
186196
/// \param[in] handle_request_function Pointer to the service handler function.
187197
/// \param[in] cq The `CompletionQueue`.
198+
/// \param[in] io_service The event loop.
188199
ServerCallFactoryImpl(
189200
AsyncService &service,
190201
RequestCallFunction<GrpcService, Request, Reply> request_call_function,
191202
ServiceHandler &service_handler,
192203
HandleRequestFunction<ServiceHandler, Request, Reply> handle_request_function,
193-
const std::unique_ptr<grpc::ServerCompletionQueue> &cq)
204+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
205+
boost::asio::io_service &io_service)
194206
: service_(service),
195207
request_call_function_(request_call_function),
196208
service_handler_(service_handler),
197209
handle_request_function_(handle_request_function),
198-
cq_(cq) {}
210+
cq_(cq),
211+
io_service_(io_service) {}
199212

200213
ServerCall *CreateCall() const override {
201214
// Create a new `ServerCall`. This object will eventually be deleted by
202215
// `GrpcServer::PollEventsFromCompletionQueue`.
203216
auto call = new ServerCallImpl<ServiceHandler, Request, Reply>(
204-
*this, service_handler_, handle_request_function_);
217+
*this, service_handler_, handle_request_function_, io_service_);
205218
/// Request gRPC runtime to starting accepting this kind of request, using the call as
206219
/// the tag.
207220
(service_.*request_call_function_)(&call->context_, &call->request_,
@@ -225,6 +238,9 @@ class ServerCallFactoryImpl : public ServerCallFactory {
225238

226239
/// The `CompletionQueue`.
227240
const std::unique_ptr<grpc::ServerCompletionQueue> &cq_;
241+
242+
/// The event loop.
243+
boost::asio::io_service &io_service_;
228244
};
229245

230246
} // namespace rpc

0 commit comments

Comments
 (0)