Skip to content

Commit 2c2cc36

Browse files
committed
Add backoff in metadata
1 parent d8cea58 commit 2c2cc36

3 files changed

Lines changed: 42 additions & 49 deletions

File tree

mooncake-transfer-engine/include/v1/utility/rpc.h

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,13 @@ namespace v1 {
3737

3838
using RpcRawData = std::vector<char>;
3939
enum RpcErrorCode { OK = 0, ErrIncompleted, ErrInvalidFunc, ErrConnectFailed };
40-
enum RpcFuncID { GetSegmentDesc = 1, BootstrapRdma, SendData, RecvData, Notify };
40+
enum RpcFuncID {
41+
GetSegmentDesc = 1,
42+
BootstrapRdma,
43+
SendData,
44+
RecvData,
45+
Notify
46+
};
4147

4248
class AsioRpcServer {
4349
public:
@@ -73,20 +79,13 @@ class AsioRpcServer {
7379

7480
class AsioRpcClient {
7581
public:
76-
AsioRpcClient() {}
77-
~AsioRpcClient() {}
78-
79-
AsioRpcClient(const AsioRpcClient &) = delete;
80-
AsioRpcClient &operator=(const AsioRpcClient &) = delete;
81-
82-
static AsioRpcClient &Get() {
83-
static AsioRpcClient instance;
84-
return instance;
85-
}
82+
static Status Call(const std::string &server_addr, int func_id,
83+
const RpcRawData &request, RpcRawData &response);
8684

8785
public:
88-
RpcErrorCode call(const std::string &server_addr, int func_id,
89-
const RpcRawData &request, RpcRawData &response);
86+
static RpcErrorCode do_call(const std::string &server_addr, int func_id,
87+
const RpcRawData &request,
88+
RpcRawData &response);
9089
};
9190

9291
} // namespace v1

mooncake-transfer-engine/src/v1/metadata/metadata.cpp

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,9 @@ Status CentralMetadataStore::deleteSegmentDesc(
226226
Status P2PMetadataStore::getSegmentDesc(SegmentDescRef &desc,
227227
const std::string &segment_name) {
228228
RpcRawData request, response;
229-
RpcErrorCode errcode = AsioRpcClient::Get().call(
230-
segment_name, GetSegmentDesc, request, response);
231-
if (errcode) {
232-
return Status::MetadataError("RPC error: " + std::to_string(errcode) +
233-
LOC_MARK);
234-
}
229+
auto status =
230+
AsioRpcClient::Call(segment_name, GetSegmentDesc, request, response);
231+
if (!status.ok()) return status;
235232
auto jstr = std::string(response.data(), response.size());
236233
if (jstr.empty()) {
237234
return Status::InvalidEntry(std::string("Segment ") + segment_name +
@@ -309,16 +306,11 @@ Status RpcClient::bootstrap(const std::string &server_addr,
309306
const BootstrapDesc &request,
310307
BootstrapDesc &response) {
311308
RpcRawData request_raw, response_raw;
312-
313309
auto status = serializeBootstrapDesc(request, request_raw);
314310
if (!status.ok()) return status;
315-
316-
RpcErrorCode err_code = AsioRpcClient::Get().call(
317-
server_addr, BootstrapRdma, request_raw, response_raw);
318-
if (err_code)
319-
return Status::MetadataError(
320-
"RPC error found: " + std::to_string(err_code) + LOC_MARK);
321-
311+
status = AsioRpcClient::Call(server_addr, BootstrapRdma, request_raw,
312+
response_raw);
313+
if (!status.ok()) return status;
322314
return deserializeBootstrapDesc(response, response_raw);
323315
}
324316

@@ -330,13 +322,7 @@ Status RpcClient::sendData(const std::string &server_addr,
330322
request.resize(sizeof(XferDataDesc) + length);
331323
memcpy(&request[0], &desc, sizeof(desc));
332324
genericMemcpy(&request[sizeof(desc)], local_mem_addr, length);
333-
RpcErrorCode err_code =
334-
AsioRpcClient::Get().call(server_addr, SendData, request, response);
335-
if (err_code)
336-
return Status::MetadataError(
337-
"RPC error found: " + std::to_string(err_code) + LOC_MARK);
338-
assert(response.empty());
339-
return Status::OK();
325+
return AsioRpcClient::Call(server_addr, SendData, request, response);
340326
}
341327

342328
Status RpcClient::recvData(const std::string &server_addr,
@@ -346,12 +332,8 @@ Status RpcClient::recvData(const std::string &server_addr,
346332
XferDataDesc desc{peer_mem_addr, length};
347333
request.resize(sizeof(XferDataDesc) + length);
348334
memcpy(&request[0], &desc, sizeof(desc));
349-
RpcErrorCode err_code =
350-
AsioRpcClient::Get().call(server_addr, RecvData, request, response);
351-
if (err_code)
352-
return Status::MetadataError(
353-
"RPC error found: " + std::to_string(err_code) + LOC_MARK);
354-
assert(response.size() == length);
335+
auto status = AsioRpcClient::Call(server_addr, RecvData, request, response);
336+
if (!status.ok()) return status;
355337
genericMemcpy(local_mem_addr, response.data(), length);
356338
return Status::OK();
357339
}
@@ -361,12 +343,7 @@ Status RpcClient::notify(const std::string &server_addr,
361343
RpcRawData request, response;
362344
request.resize(message.size());
363345
memcpy(&request[0], message.c_str(), message.size());
364-
RpcErrorCode err_code =
365-
AsioRpcClient::Get().call(server_addr, Notify, request, response);
366-
if (err_code)
367-
return Status::MetadataError(
368-
"RPC error found: " + std::to_string(err_code) + LOC_MARK);
369-
return Status::OK();
346+
return AsioRpcClient::Call(server_addr, Notify, request, response);
370347
}
371348

372349
MetadataService::MetadataService(const std::string &type,

mooncake-transfer-engine/src/v1/utility/rpc.cpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,26 @@ int AsioRpcServer::process(int func_id, const RpcRawData &request,
181181
return 0;
182182
}
183183

184-
RpcErrorCode AsioRpcClient::call(const std::string &server_addr, int func_id,
185-
const RpcRawData &request,
186-
RpcRawData &response) {
184+
Status AsioRpcClient::Call(const std::string &server_addr, int func_id,
185+
const RpcRawData &request, RpcRawData &response) {
186+
const static int maxRetries = 6;
187+
uint64_t backoff = 50;
188+
for (int retry = 0; retry < maxRetries; ++retry) {
189+
auto code = do_call(server_addr, func_id, request, response);
190+
if (code == OK) return Status::OK();
191+
if (code != ErrConnectFailed)
192+
return Status::RpcServiceError("RPC error code " +
193+
std::to_string(code));
194+
auto real_backoff = backoff + SimpleRandom::Get().next(backoff / 5);
195+
std::this_thread::sleep_for(std::chrono::milliseconds(real_backoff));
196+
backoff *= 2;
197+
}
198+
return Status::RpcServiceError("RPC connection failed");
199+
}
200+
201+
RpcErrorCode AsioRpcClient::do_call(const std::string &server_addr, int func_id,
202+
const RpcRawData &request,
203+
RpcRawData &response) {
187204
try {
188205
asio::io_context local_context;
189206
asio::ip::tcp::resolver resolver(local_context);

0 commit comments

Comments
 (0)