Skip to content

Commit a2ca348

Browse files
authored
[Store] Add initial support for master high availability failover (kvcache-ai#451)
* A temp version. Better to continue development after merging the latest main branch * Temp version to merge the latest main branch * Allow optional use HA mode, in default use non-HA mode. Fix a minor metrics bug. * Refactor the etcd_helper * refactor ha_helper * Add some unit tests. Refactor the code * Update cmakelists: build etcd_wrapper in default * Fix ci problems. Compile etcd wrapper only when use_etcd or with_store are set. * Update python config relating to mooncake-store client * make some blocking etcd helper function cancellable. bug fix: add string name of new errors that will be used in tostring. * Refactor etcd related code * Bug fix * Add basic masterviewhelper unit tests * In ci flow, install and start etcd to run HA feature unit test. * Fix a ci bug * Reuse master_server_address parameter and remove enable_ha parameter. * Format the code. Fix a minor bug. * Handle the error case: the coro server may fail to start or return internal error.
1 parent 4675e9d commit a2ca348

21 files changed

Lines changed: 1453 additions & 101 deletions

.github/workflows/ci.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ jobs:
2323
with:
2424
python-version: ${{ matrix.python-version }}
2525

26+
- name: Install and start etcd
27+
run: |
28+
wget https://github.com/etcd-io/etcd/releases/download/v3.6.1/etcd-v3.6.1-linux-amd64.tar.gz
29+
tar xzf etcd-v3.6.1-linux-amd64.tar.gz
30+
sudo mv etcd-v3.6.1-linux-amd64/etcd* /usr/local/bin/
31+
etcd --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://127.0.0.1:2379 &
32+
sleep 3 # Give etcd time to start
33+
etcdctl --endpoints=http://127.0.0.1:2379 endpoint health
34+
shell: bash
35+
2636
- name: Free up disk space
2737
run: |
2838
sudo rm -rf /usr/share/dotnet

mooncake-common/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
if (USE_ETCD AND NOT USE_ETCD_LEGACY)
2-
add_subdirectory(etcd)
1+
if ((USE_ETCD AND NOT USE_ETCD_LEGACY) OR WITH_STORE)
2+
add_subdirectory(etcd)
33
endif()

mooncake-common/etcd/etcd_wrapper.go

Lines changed: 301 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,44 @@ package main
22

33
/*
44
#include <stdint.h>
5+
#include <stdlib.h>
6+
#include <string.h>
57
*/
68
import "C"
79

810
import (
911
"context"
12+
"strings"
1013
"sync"
1114
"time"
1215

1316
clientv3 "go.etcd.io/etcd/client/v3"
1417
)
1518

19+
// Use different etcd client so they are not affected by each other,
20+
// and can be configured separately.
1621
var (
22+
// etcd client for transform engine
1723
globalClient *clientv3.Client
18-
mutex sync.Mutex
19-
refCount int
24+
globalMutex sync.Mutex
25+
globalRefCount int
26+
// etcd client for store
27+
storeClient *clientv3.Client
28+
storeMutex sync.Mutex
29+
// keep alive contexts for store
30+
storeKeepAliveCtx = make(map[int64]context.CancelFunc)
31+
storeKeepAliveMutex sync.Mutex
32+
// watch contexts for store
33+
storeWatchCtx = make(map[string]context.CancelFunc)
34+
storeWatchMutex sync.Mutex
2035
)
2136

2237
//export NewEtcdClient
2338
func NewEtcdClient(endpoints *C.char, errMsg **C.char) int {
24-
mutex.Lock()
25-
defer mutex.Unlock()
39+
globalMutex.Lock()
40+
defer globalMutex.Unlock()
2641
if globalClient != nil {
27-
refCount++
42+
globalRefCount++
2843
return 0
2944
}
3045

@@ -40,7 +55,7 @@ func NewEtcdClient(endpoints *C.char, errMsg **C.char) int {
4055
}
4156

4257
globalClient = cli
43-
refCount++
58+
globalRefCount++
4459
return 0
4560
}
4661

@@ -104,15 +119,291 @@ func EtcdDeleteWrapper(key *C.char, errMsg **C.char) int {
104119

105120
//export EtcdCloseWrapper
106121
func EtcdCloseWrapper() {
107-
mutex.Lock()
108-
defer mutex.Unlock()
122+
globalMutex.Lock()
123+
defer globalMutex.Unlock()
109124
if globalClient != nil {
110-
refCount--
111-
if refCount == 0 {
125+
globalRefCount--
126+
if globalRefCount == 0 {
112127
globalClient.Close()
113128
globalClient = nil
114129
}
115130
}
116131
}
117132

133+
//export NewStoreEtcdClient
134+
func NewStoreEtcdClient(endpoints *C.char, errMsg **C.char) int {
135+
storeMutex.Lock()
136+
defer storeMutex.Unlock()
137+
if storeClient != nil {
138+
*errMsg = C.CString("etcd client can be initialized only once")
139+
return -2
140+
}
141+
142+
endpointStr := C.GoString(endpoints)
143+
endpointList := strings.Split(endpointStr, ";")
144+
145+
// Filter out any empty strings that might result from splitting
146+
var validEndpoints []string
147+
for _, ep := range endpointList {
148+
if ep != "" {
149+
validEndpoints = append(validEndpoints, ep)
150+
}
151+
}
152+
153+
if len(validEndpoints) == 0 {
154+
*errMsg = C.CString("no valid endpoints provided")
155+
return -1
156+
}
157+
158+
cli, err := clientv3.New(clientv3.Config{
159+
Endpoints: validEndpoints,
160+
DialTimeout: 5 * time.Second,
161+
})
162+
163+
if err != nil {
164+
*errMsg = C.CString(err.Error())
165+
return -1
166+
}
167+
168+
storeClient = cli
169+
return 0
170+
}
171+
172+
//export EtcdStoreGetWrapper
173+
func EtcdStoreGetWrapper(key *C.char, keySize C.int, value **C.char,
174+
valueSize *C.int, revisionId *int64, errMsg **C.char) int {
175+
if storeClient == nil {
176+
*errMsg = C.CString("etcd client not initialized")
177+
return -1
178+
}
179+
k := C.GoStringN(key, keySize)
180+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
181+
defer cancel()
182+
resp, err := storeClient.Get(ctx, k)
183+
if err != nil {
184+
*errMsg = C.CString(err.Error())
185+
return -1
186+
}
187+
if len(resp.Kvs) == 0 {
188+
*errMsg = C.CString("key not found in etcd")
189+
return -2
190+
} else {
191+
kv := resp.Kvs[0]
192+
*value = C.CString(string(kv.Value))
193+
*valueSize = C.int(len(kv.Value))
194+
*revisionId = kv.CreateRevision
195+
return 0
196+
}
197+
}
198+
199+
//export EtcdStoreGrantLeaseWrapper
200+
func EtcdStoreGrantLeaseWrapper(ttl int64, leaseId *int64, errMsg **C.char) int {
201+
if storeClient == nil {
202+
*errMsg = C.CString("etcd client not initialized")
203+
return -1
204+
}
205+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
206+
defer cancel()
207+
resp, err := storeClient.Grant(ctx, ttl)
208+
if err != nil {
209+
*errMsg = C.CString(err.Error())
210+
return -1
211+
}
212+
*leaseId = int64(resp.ID)
213+
return 0
214+
}
215+
216+
//export EtcdStoreCreateWithLeaseWrapper
217+
func EtcdStoreCreateWithLeaseWrapper(key *C.char, keySize C.int, value *C.char, valueSize C.int,
218+
leaseId int64, revisionId *int64, errMsg **C.char) int {
219+
if storeClient == nil {
220+
*errMsg = C.CString("etcd client not initialized")
221+
return -1
222+
}
223+
k := C.GoStringN(key, keySize)
224+
v := C.GoStringN(value, valueSize)
225+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
226+
defer cancel()
227+
228+
// Create a transaction
229+
txn := storeClient.Txn(ctx)
230+
231+
// Only put the key if it does not exist
232+
resp, err := txn.If(clientv3.Compare(clientv3.CreateRevision(k), "=", 0)).
233+
Then(clientv3.OpPut(k, v, clientv3.WithLease(clientv3.LeaseID(leaseId)))).
234+
Commit()
235+
236+
if err != nil {
237+
*errMsg = C.CString(err.Error())
238+
return -1
239+
}
240+
241+
// If the key already existed, resp.Succeeded will be false
242+
// If we created the key, resp.Succeeded will be true
243+
if resp.Succeeded {
244+
*revisionId = resp.Header.Revision
245+
return 0;
246+
} else {
247+
*errMsg = C.CString("etcd transaction failed")
248+
return -2
249+
}
250+
}
251+
252+
/*
253+
* @brief First cancel the watch context, then delete it from the map.
254+
* Cancel must be called before delete in case this is a new context
255+
* other than the one we want to delete. In that case, that context will
256+
* be deleted before being cancelled and will not be able to be cancelled
257+
* anymore.
258+
*/
259+
func cancelAndDeleteWatch(k string) int {
260+
storeWatchMutex.Lock()
261+
defer storeWatchMutex.Unlock()
262+
263+
if cancel, exists := storeWatchCtx[k]; exists {
264+
cancel()
265+
delete(storeWatchCtx, k)
266+
return 0
267+
}
268+
return -1
269+
}
270+
271+
//export EtcdStoreWatchUntilDeletedWrapper
272+
func EtcdStoreWatchUntilDeletedWrapper(key *C.char, keySize C.int, errMsg **C.char) int {
273+
if storeClient == nil {
274+
*errMsg = C.CString("etcd client not initialized")
275+
return -1
276+
}
277+
k := C.GoStringN(key, keySize)
278+
279+
// Create a context with cancel function
280+
ctx, cancel := context.WithCancel(context.Background())
281+
282+
// Store the cancel function
283+
storeWatchMutex.Lock()
284+
if _, exists := storeWatchCtx[k]; exists {
285+
storeWatchMutex.Unlock()
286+
*errMsg = C.CString("This key is already being watched")
287+
return -1
288+
}
289+
storeWatchCtx[k] = cancel
290+
storeWatchMutex.Unlock()
291+
292+
// Make sure to delete from the map before returning
293+
defer cancelAndDeleteWatch(k)
294+
295+
// Start watching the key
296+
watchChan := storeClient.Watch(ctx, k)
297+
298+
// Wait for the key to be deleted
299+
for {
300+
select {
301+
case watchResp, ok := <-watchChan:
302+
if !ok {
303+
// Channel closed unexpectedly
304+
*errMsg = C.CString("watch channel closed unexpectedly")
305+
return -1
306+
}
307+
for _, event := range watchResp.Events {
308+
if event.Type == clientv3.EventTypeDelete {
309+
// Clean up the context when done
310+
return 0
311+
}
312+
}
313+
case <-ctx.Done():
314+
// Context was cancelled
315+
*errMsg = C.CString("watch context cancelled")
316+
return -2
317+
}
318+
}
319+
}
320+
321+
//export EtcdStoreCancelWatchWrapper
322+
func EtcdStoreCancelWatchWrapper(key *C.char, keySize C.int, errMsg **C.char) int {
323+
k := C.GoStringN(key, keySize)
324+
if cancelAndDeleteWatch(k) == -1 {
325+
*errMsg = C.CString("no watch context found for the given key")
326+
return -1
327+
}
328+
return 0
329+
}
330+
331+
/*
332+
* @brief First cancel the keep alive context, then delete it from the map.
333+
* Cancel must be called before deleting in case this is a new context
334+
* other than the one we want to delete. In that case, that context will
335+
* be deleted before being cancelled and will not be able to be cancelled
336+
* anymore.
337+
*/
338+
func cancelAndDeleteKeepAlive(leaseId int64) int {
339+
storeKeepAliveMutex.Lock()
340+
defer storeKeepAliveMutex.Unlock()
341+
342+
if cancel, exists := storeKeepAliveCtx[leaseId]; exists {
343+
cancel()
344+
delete(storeKeepAliveCtx, leaseId)
345+
return 0
346+
}
347+
return -1
348+
}
349+
350+
//export EtcdStoreKeepAliveWrapper
351+
func EtcdStoreKeepAliveWrapper(leaseId int64, errMsg **C.char) int {
352+
if storeClient == nil {
353+
*errMsg = C.CString("etcd client not initialized")
354+
return -1
355+
}
356+
357+
// Create a context with cancel function
358+
ctx, cancel := context.WithCancel(context.Background())
359+
360+
// Store the cancel function
361+
storeKeepAliveMutex.Lock()
362+
if _, exists := storeKeepAliveCtx[leaseId]; exists {
363+
storeKeepAliveMutex.Unlock()
364+
*errMsg = C.CString("This lease id is already being kept alive")
365+
return -1
366+
}
367+
storeKeepAliveCtx[leaseId] = cancel
368+
storeKeepAliveMutex.Unlock()
369+
// Make sure to delete from the map before returning
370+
defer cancelAndDeleteKeepAlive(leaseId)
371+
372+
// Start keep alive
373+
keepAliveChan, err := storeClient.KeepAlive(ctx, clientv3.LeaseID(leaseId))
374+
if err != nil {
375+
*errMsg = C.CString(err.Error())
376+
return -1
377+
}
378+
379+
// Wait for keep alive responses
380+
for {
381+
select {
382+
case resp, ok := <-keepAliveChan:
383+
if !ok {
384+
*errMsg = C.CString("keep alive channel closed")
385+
return -1
386+
}
387+
if resp == nil {
388+
*errMsg = C.CString("keep alive response is nil")
389+
return -1
390+
}
391+
// Keep alive successful, continue
392+
case <-ctx.Done():
393+
// Context cancelled
394+
*errMsg = C.CString("keep alive context cancelled")
395+
return -2
396+
}
397+
}
398+
}
399+
400+
//export EtcdStoreCancelKeepAliveWrapper
401+
func EtcdStoreCancelKeepAliveWrapper(leaseId int64, errMsg **C.char) int {
402+
if cancelAndDeleteKeepAlive(leaseId) == -1 {
403+
*errMsg = C.CString("no keep alive context found for the given lease ID")
404+
return -1
405+
}
406+
return 0
407+
}
408+
118409
func main() {}

mooncake-store/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
project(MooncakeStore)
22

3+
set(ETCD_WRAPPER_INCLUDE ${CMAKE_CURRENT_BINARY_DIR}/../mooncake-common/etcd/)
4+
set(ETCD_WRAPPER_LIB ${CMAKE_CURRENT_BINARY_DIR}/../mooncake-common/etcd/libetcd_wrapper.so)
35

46
# Add include directories
57
include_directories(
@@ -9,8 +11,9 @@ include_directories(
911
${CMAKE_CURRENT_SOURCE_DIR}/include/mooncake-store/proto/
1012
${CMAKE_CURRENT_SOURCE_DIR}/include/
1113
${CMAKE_CURRENT_SOURCE_DIR}/../mooncake-transfer-engine/include
14+
${ETCD_WRAPPER_INCLUDE}
1215
)
1316

1417
# Add subdirectories
1518
add_subdirectory(src)
16-
add_subdirectory(tests)
19+
add_subdirectory(tests)

0 commit comments

Comments
 (0)