@@ -2,29 +2,44 @@ package main
22
33/*
44#include <stdint.h>
5+ #include <stdlib.h>
6+ #include <string.h>
57*/
68import "C"
79
810import (
911 "context"
12+ "strings"
1013 "sync"
1114 "time"
1215
1316 clientv3 "go.etcd.io/etcd/client/v3"
1417)
1518
19+ // Use different etcd client so they are not affected by each other,
20+ // and can be configured separately.
1621var (
22+ // etcd client for transform engine
1723 globalClient * clientv3.Client
18- mutex sync.Mutex
19- refCount int
24+ globalMutex sync.Mutex
25+ globalRefCount int
26+ // etcd client for store
27+ storeClient * clientv3.Client
28+ storeMutex sync.Mutex
29+ // keep alive contexts for store
30+ storeKeepAliveCtx = make (map [int64 ]context.CancelFunc )
31+ storeKeepAliveMutex sync.Mutex
32+ // watch contexts for store
33+ storeWatchCtx = make (map [string ]context.CancelFunc )
34+ storeWatchMutex sync.Mutex
2035)
2136
2237//export NewEtcdClient
2338func NewEtcdClient (endpoints * C.char , errMsg * * C.char ) int {
24- mutex .Lock ()
25- defer mutex .Unlock ()
39+ globalMutex .Lock ()
40+ defer globalMutex .Unlock ()
2641 if globalClient != nil {
27- refCount ++
42+ globalRefCount ++
2843 return 0
2944 }
3045
@@ -40,7 +55,7 @@ func NewEtcdClient(endpoints *C.char, errMsg **C.char) int {
4055 }
4156
4257 globalClient = cli
43- refCount ++
58+ globalRefCount ++
4459 return 0
4560}
4661
@@ -104,15 +119,291 @@ func EtcdDeleteWrapper(key *C.char, errMsg **C.char) int {
104119
105120//export EtcdCloseWrapper
106121func EtcdCloseWrapper () {
107- mutex .Lock ()
108- defer mutex .Unlock ()
122+ globalMutex .Lock ()
123+ defer globalMutex .Unlock ()
109124 if globalClient != nil {
110- refCount --
111- if refCount == 0 {
125+ globalRefCount --
126+ if globalRefCount == 0 {
112127 globalClient .Close ()
113128 globalClient = nil
114129 }
115130 }
116131}
117132
133+ //export NewStoreEtcdClient
134+ func NewStoreEtcdClient (endpoints * C.char , errMsg * * C.char ) int {
135+ storeMutex .Lock ()
136+ defer storeMutex .Unlock ()
137+ if storeClient != nil {
138+ * errMsg = C .CString ("etcd client can be initialized only once" )
139+ return - 2
140+ }
141+
142+ endpointStr := C .GoString (endpoints )
143+ endpointList := strings .Split (endpointStr , ";" )
144+
145+ // Filter out any empty strings that might result from splitting
146+ var validEndpoints []string
147+ for _ , ep := range endpointList {
148+ if ep != "" {
149+ validEndpoints = append (validEndpoints , ep )
150+ }
151+ }
152+
153+ if len (validEndpoints ) == 0 {
154+ * errMsg = C .CString ("no valid endpoints provided" )
155+ return - 1
156+ }
157+
158+ cli , err := clientv3 .New (clientv3.Config {
159+ Endpoints : validEndpoints ,
160+ DialTimeout : 5 * time .Second ,
161+ })
162+
163+ if err != nil {
164+ * errMsg = C .CString (err .Error ())
165+ return - 1
166+ }
167+
168+ storeClient = cli
169+ return 0
170+ }
171+
172+ //export EtcdStoreGetWrapper
173+ func EtcdStoreGetWrapper (key * C.char , keySize C.int , value * * C.char ,
174+ valueSize * C.int , revisionId * int64 , errMsg * * C.char ) int {
175+ if storeClient == nil {
176+ * errMsg = C .CString ("etcd client not initialized" )
177+ return - 1
178+ }
179+ k := C .GoStringN (key , keySize )
180+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
181+ defer cancel ()
182+ resp , err := storeClient .Get (ctx , k )
183+ if err != nil {
184+ * errMsg = C .CString (err .Error ())
185+ return - 1
186+ }
187+ if len (resp .Kvs ) == 0 {
188+ * errMsg = C .CString ("key not found in etcd" )
189+ return - 2
190+ } else {
191+ kv := resp .Kvs [0 ]
192+ * value = C .CString (string (kv .Value ))
193+ * valueSize = C .int (len (kv .Value ))
194+ * revisionId = kv .CreateRevision
195+ return 0
196+ }
197+ }
198+
199+ //export EtcdStoreGrantLeaseWrapper
200+ func EtcdStoreGrantLeaseWrapper (ttl int64 , leaseId * int64 , errMsg * * C.char ) int {
201+ if storeClient == nil {
202+ * errMsg = C .CString ("etcd client not initialized" )
203+ return - 1
204+ }
205+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
206+ defer cancel ()
207+ resp , err := storeClient .Grant (ctx , ttl )
208+ if err != nil {
209+ * errMsg = C .CString (err .Error ())
210+ return - 1
211+ }
212+ * leaseId = int64 (resp .ID )
213+ return 0
214+ }
215+
216+ //export EtcdStoreCreateWithLeaseWrapper
217+ func EtcdStoreCreateWithLeaseWrapper (key * C.char , keySize C.int , value * C.char , valueSize C.int ,
218+ leaseId int64 , revisionId * int64 , errMsg * * C.char ) int {
219+ if storeClient == nil {
220+ * errMsg = C .CString ("etcd client not initialized" )
221+ return - 1
222+ }
223+ k := C .GoStringN (key , keySize )
224+ v := C .GoStringN (value , valueSize )
225+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
226+ defer cancel ()
227+
228+ // Create a transaction
229+ txn := storeClient .Txn (ctx )
230+
231+ // Only put the key if it does not exist
232+ resp , err := txn .If (clientv3 .Compare (clientv3 .CreateRevision (k ), "=" , 0 )).
233+ Then (clientv3 .OpPut (k , v , clientv3 .WithLease (clientv3 .LeaseID (leaseId )))).
234+ Commit ()
235+
236+ if err != nil {
237+ * errMsg = C .CString (err .Error ())
238+ return - 1
239+ }
240+
241+ // If the key already existed, resp.Succeeded will be false
242+ // If we created the key, resp.Succeeded will be true
243+ if resp .Succeeded {
244+ * revisionId = resp .Header .Revision
245+ return 0 ;
246+ } else {
247+ * errMsg = C .CString ("etcd transaction failed" )
248+ return - 2
249+ }
250+ }
251+
252+ /*
253+ * @brief First cancel the watch context, then delete it from the map.
254+ * Cancel must be called before delete in case this is a new context
255+ * other than the one we want to delete. In that case, that context will
256+ * be deleted before being cancelled and will not be able to be cancelled
257+ * anymore.
258+ */
259+ func cancelAndDeleteWatch (k string ) int {
260+ storeWatchMutex .Lock ()
261+ defer storeWatchMutex .Unlock ()
262+
263+ if cancel , exists := storeWatchCtx [k ]; exists {
264+ cancel ()
265+ delete (storeWatchCtx , k )
266+ return 0
267+ }
268+ return - 1
269+ }
270+
271+ //export EtcdStoreWatchUntilDeletedWrapper
272+ func EtcdStoreWatchUntilDeletedWrapper (key * C.char , keySize C.int , errMsg * * C.char ) int {
273+ if storeClient == nil {
274+ * errMsg = C .CString ("etcd client not initialized" )
275+ return - 1
276+ }
277+ k := C .GoStringN (key , keySize )
278+
279+ // Create a context with cancel function
280+ ctx , cancel := context .WithCancel (context .Background ())
281+
282+ // Store the cancel function
283+ storeWatchMutex .Lock ()
284+ if _ , exists := storeWatchCtx [k ]; exists {
285+ storeWatchMutex .Unlock ()
286+ * errMsg = C .CString ("This key is already being watched" )
287+ return - 1
288+ }
289+ storeWatchCtx [k ] = cancel
290+ storeWatchMutex .Unlock ()
291+
292+ // Make sure to delete from the map before returning
293+ defer cancelAndDeleteWatch (k )
294+
295+ // Start watching the key
296+ watchChan := storeClient .Watch (ctx , k )
297+
298+ // Wait for the key to be deleted
299+ for {
300+ select {
301+ case watchResp , ok := <- watchChan :
302+ if ! ok {
303+ // Channel closed unexpectedly
304+ * errMsg = C .CString ("watch channel closed unexpectedly" )
305+ return - 1
306+ }
307+ for _ , event := range watchResp .Events {
308+ if event .Type == clientv3 .EventTypeDelete {
309+ // Clean up the context when done
310+ return 0
311+ }
312+ }
313+ case <- ctx .Done ():
314+ // Context was cancelled
315+ * errMsg = C .CString ("watch context cancelled" )
316+ return - 2
317+ }
318+ }
319+ }
320+
321+ //export EtcdStoreCancelWatchWrapper
322+ func EtcdStoreCancelWatchWrapper (key * C.char , keySize C.int , errMsg * * C.char ) int {
323+ k := C .GoStringN (key , keySize )
324+ if cancelAndDeleteWatch (k ) == - 1 {
325+ * errMsg = C .CString ("no watch context found for the given key" )
326+ return - 1
327+ }
328+ return 0
329+ }
330+
331+ /*
332+ * @brief First cancel the keep alive context, then delete it from the map.
333+ * Cancel must be called before deleting in case this is a new context
334+ * other than the one we want to delete. In that case, that context will
335+ * be deleted before being cancelled and will not be able to be cancelled
336+ * anymore.
337+ */
338+ func cancelAndDeleteKeepAlive (leaseId int64 ) int {
339+ storeKeepAliveMutex .Lock ()
340+ defer storeKeepAliveMutex .Unlock ()
341+
342+ if cancel , exists := storeKeepAliveCtx [leaseId ]; exists {
343+ cancel ()
344+ delete (storeKeepAliveCtx , leaseId )
345+ return 0
346+ }
347+ return - 1
348+ }
349+
350+ //export EtcdStoreKeepAliveWrapper
351+ func EtcdStoreKeepAliveWrapper (leaseId int64 , errMsg * * C.char ) int {
352+ if storeClient == nil {
353+ * errMsg = C .CString ("etcd client not initialized" )
354+ return - 1
355+ }
356+
357+ // Create a context with cancel function
358+ ctx , cancel := context .WithCancel (context .Background ())
359+
360+ // Store the cancel function
361+ storeKeepAliveMutex .Lock ()
362+ if _ , exists := storeKeepAliveCtx [leaseId ]; exists {
363+ storeKeepAliveMutex .Unlock ()
364+ * errMsg = C .CString ("This lease id is already being kept alive" )
365+ return - 1
366+ }
367+ storeKeepAliveCtx [leaseId ] = cancel
368+ storeKeepAliveMutex .Unlock ()
369+ // Make sure to delete from the map before returning
370+ defer cancelAndDeleteKeepAlive (leaseId )
371+
372+ // Start keep alive
373+ keepAliveChan , err := storeClient .KeepAlive (ctx , clientv3 .LeaseID (leaseId ))
374+ if err != nil {
375+ * errMsg = C .CString (err .Error ())
376+ return - 1
377+ }
378+
379+ // Wait for keep alive responses
380+ for {
381+ select {
382+ case resp , ok := <- keepAliveChan :
383+ if ! ok {
384+ * errMsg = C .CString ("keep alive channel closed" )
385+ return - 1
386+ }
387+ if resp == nil {
388+ * errMsg = C .CString ("keep alive response is nil" )
389+ return - 1
390+ }
391+ // Keep alive successful, continue
392+ case <- ctx .Done ():
393+ // Context cancelled
394+ * errMsg = C .CString ("keep alive context cancelled" )
395+ return - 2
396+ }
397+ }
398+ }
399+
400+ //export EtcdStoreCancelKeepAliveWrapper
401+ func EtcdStoreCancelKeepAliveWrapper (leaseId int64 , errMsg * * C.char ) int {
402+ if cancelAndDeleteKeepAlive (leaseId ) == - 1 {
403+ * errMsg = C .CString ("no keep alive context found for the given lease ID" )
404+ return - 1
405+ }
406+ return 0
407+ }
408+
118409func main () {}
0 commit comments