Skip to content

Commit 964e0a9

Browse files
committed
Share transfer buffer across all worker instances
1 parent bbcf85c commit 964e0a9

7 files changed

Lines changed: 164 additions & 94 deletions

File tree

mooncake-ep/include/mooncake_backend.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef MOONCAKE_BACKEND_H
22
#define MOONCAKE_BACKEND_H
33

4+
#include <mooncake_backend_buffer.h>
45
#include <mooncake_worker.cuh>
56
#include <torch/torch.h>
67
#include <torch/csrc/distributed/c10d/Backend.hpp>
@@ -21,7 +22,7 @@ class MooncakeBackend final : public ::c10d::Backend {
2122

2223
MooncakeBackend(c10::intrusive_ptr<::c10d::Store> store, int rank, int size,
2324
c10::intrusive_ptr<MooncakeBackendOptions> options,
24-
bool isCpu = false);
25+
bool isCpu = false, bool isTest = false);
2526

2627
~MooncakeBackend() override;
2728

@@ -58,11 +59,7 @@ class MooncakeBackend final : public ::c10d::Backend {
5859
TransferEngine engine_{true};
5960
bool isCpu_{false};
6061
static std::string hostIp_;
61-
int device_id_;
62-
void* send_buffer_[2];
63-
void* recv_buffer_[2];
64-
int32_t* cpu_sync_send_region_[2];
65-
int32_t* cpu_sync_recv_region_[2];
62+
static std::unique_ptr<BackendBuffer> buffer_;
6663
MooncakeWorker worker_;
6764
};
6865

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#ifndef MOONCAKE_BACKEND_BUFFER_H
2+
#define MOONCAKE_BACKEND_BUFFER_H
3+
4+
#include <cstdlib>
5+
#include <cuda_runtime.h>
6+
#include <torch/csrc/distributed/c10d/Utils.hpp>
7+
8+
namespace mooncake {
9+
10+
constexpr size_t kBufferSize = 1u << 29;
11+
constexpr size_t kMaxNumRanks = 64;
12+
13+
struct BackendBuffer {
14+
void* cpuSendBuffer_[2];
15+
void* cpuRecvBuffer_[2];
16+
void* cudaSendBuffer_[2];
17+
void* cudaRecvBuffer_[2];
18+
int32_t* cpuSyncSendRegion_[2];
19+
int32_t* cpuSyncRecvRegion_[2];
20+
int32_t* cudaSyncSendRegion_[2];
21+
int32_t* cudaSyncRecvRegion_[2];
22+
int cpuTaskCount_ = 0;
23+
int cudaTaskCount_ = 0;
24+
25+
BackendBuffer() {
26+
for (size_t i = 0; i < 2; i++) {
27+
cpuSendBuffer_[i] = malloc(kBufferSize);
28+
TORCH_CHECK(cpuSendBuffer_[i],
29+
c10::str("Failed to allocate CPU send buffer"));
30+
31+
cpuRecvBuffer_[i] = malloc(kBufferSize);
32+
TORCH_CHECK(cpuRecvBuffer_[i],
33+
c10::str("Failed to allocate CPU recv buffer"));
34+
35+
cudaError err = cudaMalloc(&cudaSendBuffer_[i], kBufferSize);
36+
TORCH_CHECK(!err, c10::str("Failed to allocate CUDA send buffer"));
37+
38+
err = cudaMalloc(&cudaRecvBuffer_[i], kBufferSize);
39+
TORCH_CHECK(!err, c10::str("Failed to allocate CUDA recv buffer"));
40+
41+
cpuSyncSendRegion_[i] = new int32_t[kMaxNumRanks]{};
42+
cpuSyncRecvRegion_[i] = new int32_t[kMaxNumRanks]{};
43+
44+
cudaSyncSendRegion_[i] = new int32_t[kMaxNumRanks]{};
45+
cudaSyncRecvRegion_[i] = new int32_t[kMaxNumRanks]{};
46+
}
47+
}
48+
49+
~BackendBuffer() {
50+
for (size_t i = 0; i < 2; i++) {
51+
free(cpuSendBuffer_[i]);
52+
free(cpuRecvBuffer_[i]);
53+
cudaFree(cudaSendBuffer_[i]);
54+
cudaFree(cudaRecvBuffer_[i]);
55+
delete[] cpuSyncSendRegion_[i];
56+
delete[] cpuSyncRecvRegion_[i];
57+
delete[] cudaSyncSendRegion_[i];
58+
delete[] cudaSyncRecvRegion_[i];
59+
}
60+
}
61+
};
62+
63+
} // namespace mooncake
64+
65+
#endif // MOONCAKE_BACKEND_BUFFER_H

mooncake-ep/include/mooncake_worker.cuh

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <cuda_bf16.h>
55
#include <cuda_runtime.h>
6+
#include <mooncake_backend_buffer.h>
67
#include <torch/torch.h>
78
#include <torch/csrc/distributed/c10d/Types.hpp>
89
#include <torch/csrc/distributed/c10d/Work.hpp>
@@ -43,25 +44,25 @@ class MooncakeWorker {
4344

4445
void initWorker(const std::vector<std::string>& server_names);
4546

47+
void setBackendBuffer(BackendBuffer* buffer) { buffer_ = buffer; }
48+
4649
bool* getBrokenRanks() { return brokenRanks_; }
4750

4851
private:
4952
static constexpr size_t kNumTasks_ = 2;
50-
static constexpr size_t kMaxNumRanks = 64;
5153

5254
Task *tasks_, *tasks_device_;
5355
bool *brokenRanks_, *brokenRanksDevice_;
5456
bool hasCallback_[kNumTasks_]{};
5557
std::function<void()> callbacks_[kNumTasks_]{};
5658

5759
int rank_, size_;
60+
BackendBuffer* buffer_ = nullptr;
5861
at::Tensor brokenRanksTensor_;
5962

6063
TransferEngine* engine_;
6164
std::vector<TransferMetadata::SegmentID> segment_ids_;
6265
std::vector<std::shared_ptr<TransferMetadata::SegmentDesc>> segment_descs_;
63-
64-
int taskCount = 0;
6566
};
6667

6768
} // namespace mooncake

mooncake-ep/src/mooncake_backend.cpp

Lines changed: 69 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
namespace mooncake {
88

9+
constexpr const char* REGISTER_BUFFER_ERROR_MSG =
10+
"Failed to register local memory.";
911
constexpr const char* MULTI_DEVICE_ERROR_MSG =
1012
"Expecting one tensor only but got multiple.";
1113
constexpr const char* SYNC_OP_ERROR_MSG = "Expecting async op but got sync op.";
@@ -15,9 +17,11 @@ constexpr const char* REDUCE_DTYPE_ERROR_MSG = "Unsupported reduce dtype: ";
1517

1618
std::string MooncakeBackend::hostIp_ = "127.0.0.1";
1719

20+
std::unique_ptr<BackendBuffer> MooncakeBackend::buffer_ = nullptr;
21+
1822
MooncakeBackend::MooncakeBackend(
1923
c10::intrusive_ptr<::c10d::Store> store, int rank, int size,
20-
c10::intrusive_ptr<MooncakeBackendOptions> options, bool isCpu)
24+
c10::intrusive_ptr<MooncakeBackendOptions> options, bool isCpu, bool isTest)
2125
: Backend(rank, size),
2226
isCpu_(isCpu),
2327
worker_(&engine_, rank, size,
@@ -26,7 +30,8 @@ MooncakeBackend::MooncakeBackend(
2630
{size},
2731
torch::dtype(torch::kInt32).device(torch::kCUDA))) {
2832
// Get device data
29-
cudaError err = cudaGetDevice(&device_id_);
33+
int deviceId_;
34+
cudaError err = cudaGetDevice(&deviceId_);
3035
TORCH_CHECK(!err, c10::str("Failed to get device id"));
3136

3237
// Initialize transfer engine
@@ -37,62 +42,70 @@ MooncakeBackend::MooncakeBackend(
3742
std::string localServerName = localRpcMeta.ip_or_host_name + ":" +
3843
std::to_string(localRpcMeta.rpc_port);
3944

40-
// Register GPU buffers
41-
constexpr size_t buffer_size = 1u << 29;
45+
// Register buffers
46+
BackendBuffer* buffer;
47+
if (isTest) {
48+
buffer = new BackendBuffer();
49+
} else {
50+
if (!buffer_) {
51+
buffer_ = std::make_unique<BackendBuffer>();
52+
}
53+
buffer = buffer_.get();
54+
}
55+
4256
if (isCpu) {
4357
for (size_t i = 0; i < 2; i++) {
44-
send_buffer_[i] = malloc(buffer_size);
45-
TORCH_CHECK(send_buffer_[i],
46-
c10::str("Failed to allocate CPU send buffer"));
58+
int rc = engine_.registerLocalMemory(buffer->cpuSendBuffer_[i],
59+
kBufferSize);
60+
TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG);
61+
}
4762

48-
int rc = engine_.registerLocalMemory(send_buffer_[i], buffer_size);
49-
TORCH_CHECK(!rc, c10::str("Failed to register local memory"));
63+
for (size_t i = 0; i < 2; i++) {
64+
int rc = engine_.registerLocalMemory(buffer->cpuRecvBuffer_[i],
65+
kBufferSize);
66+
TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG);
5067
}
5168

5269
for (size_t i = 0; i < 2; i++) {
53-
recv_buffer_[i] = malloc(buffer_size);
54-
TORCH_CHECK(recv_buffer_[i],
55-
c10::str("Failed to allocate CPU recv buffer"));
70+
int rc = engine_.registerLocalMemory(buffer->cpuSyncSendRegion_[i],
71+
kMaxNumRanks * sizeof(int32_t),
72+
kWildcardLocation);
73+
TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG);
74+
}
5675

57-
int rc = engine_.registerLocalMemory(recv_buffer_[i], buffer_size);
58-
TORCH_CHECK(!rc, c10::str("Failed to register local memory"));
76+
for (size_t i = 0; i < 2; i++) {
77+
int rc = engine_.registerLocalMemory(buffer->cpuSyncRecvRegion_[i],
78+
kMaxNumRanks * sizeof(int32_t),
79+
kWildcardLocation);
80+
TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG);
5981
}
6082
} else {
61-
std::string location = "cuda:" + std::to_string(device_id_);
83+
std::string location = "cuda:" + std::to_string(deviceId_);
6284
for (size_t i = 0; i < 2; i++) {
63-
err = cudaMalloc(&send_buffer_[i], buffer_size);
64-
TORCH_CHECK(!err, c10::str("Failed to allocate CUDA send buffer"));
65-
66-
int rc = engine_.registerLocalMemory(send_buffer_[i], buffer_size,
67-
location);
68-
TORCH_CHECK(!rc, c10::str("Failed to register local memory"));
85+
int rc = engine_.registerLocalMemory(buffer->cudaSendBuffer_[i],
86+
kBufferSize, location);
87+
TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG);
6988
}
7089

7190
for (size_t i = 0; i < 2; i++) {
72-
err = cudaMalloc(&recv_buffer_[i], buffer_size);
73-
TORCH_CHECK(!err, c10::str("Failed to allocate CUDA recv buffer"));
74-
75-
int rc = engine_.registerLocalMemory(recv_buffer_[i], buffer_size,
76-
location);
77-
TORCH_CHECK(!rc, c10::str("Failed to register local memory"));
91+
int rc = engine_.registerLocalMemory(buffer->cudaRecvBuffer_[i],
92+
kBufferSize, location);
93+
TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG);
7894
}
79-
}
8095

81-
// Register CPU sync regions
82-
for (size_t i = 0; i < 2; i++) {
83-
cpu_sync_send_region_[i] = new int32_t[size];
84-
int rc = engine_.registerLocalMemory(cpu_sync_send_region_[i],
85-
size * sizeof(int32_t),
86-
kWildcardLocation);
87-
TORCH_CHECK(!rc, c10::str("Failed to register local memory"));
88-
}
96+
for (size_t i = 0; i < 2; i++) {
97+
int rc = engine_.registerLocalMemory(buffer->cudaSyncSendRegion_[i],
98+
kMaxNumRanks * sizeof(int32_t),
99+
kWildcardLocation);
100+
TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG);
101+
}
89102

90-
for (size_t i = 0; i < 2; i++) {
91-
cpu_sync_recv_region_[i] = new int32_t[size];
92-
int rc = engine_.registerLocalMemory(cpu_sync_recv_region_[i],
93-
size * sizeof(int32_t),
94-
kWildcardLocation);
95-
TORCH_CHECK(!rc, c10::str("Failed to register local memory"));
103+
for (size_t i = 0; i < 2; i++) {
104+
int rc = engine_.registerLocalMemory(buffer->cudaSyncRecvRegion_[i],
105+
kMaxNumRanks * sizeof(int32_t),
106+
kWildcardLocation);
107+
TORCH_CHECK(!rc, REGISTER_BUFFER_ERROR_MSG);
108+
}
96109
}
97110

98111
// Sync metadata
@@ -103,23 +116,24 @@ MooncakeBackend::MooncakeBackend(
103116
server_names.push_back(
104117
store->get_to_str({"server_name_" + std::to_string(i)}));
105118
}
119+
worker_.setBackendBuffer(buffer);
106120
worker_.initWorker(server_names);
107121
}
108122

109123
MooncakeBackend::~MooncakeBackend() {
110-
for (size_t i = 0; i < 2; i++) {
111-
engine_.unregisterLocalMemory(cpu_sync_send_region_[i]);
112-
delete[] cpu_sync_send_region_[i];
113-
engine_.unregisterLocalMemory(cpu_sync_recv_region_[i]);
114-
delete[] cpu_sync_recv_region_[i];
115-
engine_.unregisterLocalMemory(send_buffer_[i]);
116-
engine_.unregisterLocalMemory(recv_buffer_[i]);
117-
if (isCpu_) {
118-
free(send_buffer_[i]);
119-
free(recv_buffer_[i]);
120-
} else {
121-
cudaFree(send_buffer_[i]);
122-
cudaFree(recv_buffer_[i]);
124+
if (buffer_) {
125+
for (size_t i = 0; i < 2; i++) {
126+
if (isCpu_) {
127+
engine_.unregisterLocalMemory(buffer_->cpuSendBuffer_[i]);
128+
engine_.unregisterLocalMemory(buffer_->cpuRecvBuffer_[i]);
129+
engine_.unregisterLocalMemory(buffer_->cpuSyncSendRegion_[i]);
130+
engine_.unregisterLocalMemory(buffer_->cpuSyncRecvRegion_[i]);
131+
} else {
132+
engine_.unregisterLocalMemory(buffer_->cudaSendBuffer_[i]);
133+
engine_.unregisterLocalMemory(buffer_->cudaRecvBuffer_[i]);
134+
engine_.unregisterLocalMemory(buffer_->cudaSyncSendRegion_[i]);
135+
engine_.unregisterLocalMemory(buffer_->cudaSyncRecvRegion_[i]);
136+
}
123137
}
124138
}
125139
}

mooncake-ep/src/mooncake_worker.cu

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -174,42 +174,40 @@ c10::intrusive_ptr<c10d::Work> MooncakeWorker::putTaskCpu(
174174
c10d::OpType opType, size_t tensorSize, int64_t broadcastRoot,
175175
const std::function<void(void* dst)>& tensorToBuffer,
176176
const std::function<void(void* src)>& bufferToTensor) {
177-
TORCH_CHECK(tensorSize * size_ < (1u << 29), "Too large!");
177+
TORCH_CHECK(tensorSize * size_ < kBufferSize, "Too large!");
178178
auto future = c10::make_intrusive<c10::ivalue::Future>(
179179
c10::ListType::create(c10::TensorType::get()));
180-
int taskId = taskCount % kNumTasks_;
180+
int taskId = buffer_->cpuTaskCount_ % kNumTasks_;
181181
TORCH_CHECK(!tasks_[taskId].active);
182182

183183
tasks_[taskId].opType = opType;
184184
tasks_[taskId].tensorSize = tensorSize;
185185
tasks_[taskId].broadcastRoot = broadcastRoot;
186-
tensorToBuffer((void*)segment_descs_[rank_]->buffers[taskCount % 2].addr);
186+
tensorToBuffer((void*)segment_descs_[rank_]->buffers[taskId].addr);
187187

188188
hasCallback_[taskId] = true;
189-
callbacks_[taskId] = [this, bufferToTensor, future] {
190-
bufferToTensor(
191-
(void*)segment_descs_[rank_]->buffers[2 + taskCount % 2].addr);
192-
++taskCount;
189+
callbacks_[taskId] = [this, bufferToTensor, taskId, future] {
190+
bufferToTensor((void*)segment_descs_[rank_]->buffers[2 + taskId].addr);
193191
future->markCompleted(c10::IValue());
194192
};
195193

196194
tasks_[taskId].active = true;
195+
++buffer_->cpuTaskCount_;
197196
return c10::make_intrusive<MooncakeWorkCpu>(opType, future);
198197
}
199198

200199
c10::intrusive_ptr<c10d::Work> MooncakeWorker::putTaskCuda(
201200
c10d::OpType opType, size_t tensorSize, int64_t broadcastRoot,
202201
cudaStream_t stream, const std::function<void(void* dst)>& tensorToBuffer,
203202
const std::function<void(void* src)>& bufferToTensor) {
204-
TORCH_CHECK(tensorSize * size_ < (1u << 29), "Too large!");
205-
tensorToBuffer((void*)segment_descs_[rank_]->buffers[taskCount % 2].addr);
203+
TORCH_CHECK(tensorSize * size_ < kBufferSize, "Too large!");
204+
int taskId = buffer_->cudaTaskCount_ % kNumTasks_;
205+
tensorToBuffer((void*)segment_descs_[rank_]->buffers[taskId].addr);
206206
enqueueTaskKernel<<<1, 1, 0, stream>>>(
207207
opType, tensorSize, broadcastRoot, tasks_device_, size_,
208-
brokenRanksDevice_, brokenRanksTensor_.data_ptr<int>(),
209-
taskCount % kNumTasks_);
210-
bufferToTensor(
211-
(void*)segment_descs_[rank_]->buffers[2 + taskCount % 2].addr);
212-
++taskCount;
208+
brokenRanksDevice_, brokenRanksTensor_.data_ptr<int>(), taskId);
209+
bufferToTensor((void*)segment_descs_[rank_]->buffers[2 + taskId].addr);
210+
++buffer_->cudaTaskCount_;
213211
cudaEvent_t event;
214212
cudaEventCreateWithFlags(&event, cudaEventDisableTiming);
215213
cudaEventRecord(event, stream);

0 commit comments

Comments
 (0)