Skip to content
This repository was archived by the owner on Oct 28, 2022. It is now read-only.

Commit e1fc7c4

Browse files
authored
Merge pull request #582 from karimra/internal-metrics-reg
add more internal metrics to prometheus registry
2 parents abb09ac + 3e1d03b commit e1fc7c4

28 files changed

Lines changed: 1480 additions & 869 deletions

app/api.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ func (a *App) newAPIServer() (*http.Server, error) {
3131
a.router.Handle("/metrics", promhttp.HandlerFor(a.reg, promhttp.HandlerOpts{}))
3232
a.reg.MustRegister(collectors.NewGoCollector())
3333
a.reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
34+
a.reg.MustRegister(subscribeResponseReceivedCounter)
35+
go a.startClusterMetrics()
3436
}
3537
s := &http.Server{
3638
Addr: a.Config.APIServer.Address,
@@ -103,7 +105,7 @@ func (a *App) handleConfigTargetsPost(w http.ResponseWriter, r *http.Request) {
103105
func (a *App) handleConfigTargetsDelete(w http.ResponseWriter, r *http.Request) {
104106
vars := mux.Vars(r)
105107
id := vars["id"]
106-
err := a.DeleteTarget(a.ctx, id)
108+
err := a.DeleteTarget(r.Context(), id)
107109
if err != nil {
108110
w.WriteHeader(http.StatusNotFound)
109111
json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}})

app/app.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/fsnotify/fsnotify"
1616
"github.com/fullstorydev/grpcurl"
1717
"github.com/gorilla/mux"
18+
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
1819
"github.com/jhump/protoreflect/desc"
1920
"github.com/karimra/gnmic/config"
2021
"github.com/karimra/gnmic/formatters"
@@ -110,6 +111,7 @@ func New() *App {
110111
sem: semaphore.NewWeighted(1),
111112
configLock: new(sync.RWMutex),
112113
Config: config.New(),
114+
reg: prometheus.NewRegistry(),
113115
//
114116
operLock: new(sync.RWMutex),
115117
Targets: make(map[string]*target.Target),
@@ -323,6 +325,14 @@ func (a *App) createCollectorDialOpts() []grpc.DialOption {
323325
if a.Config.Gzip {
324326
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
325327
}
328+
if a.Config.APIServer != nil && a.Config.APIServer.EnableMetrics && a.reg != nil {
329+
grpcClientMetrics := grpc_prometheus.NewClientMetrics()
330+
opts = append(opts,
331+
grpc.WithUnaryInterceptor(grpcClientMetrics.UnaryClientInterceptor()),
332+
grpc.WithStreamInterceptor(grpcClientMetrics.StreamClientInterceptor()),
333+
)
334+
a.reg.MustRegister(grpcClientMetrics)
335+
}
326336
a.dialOpts = opts
327337
return opts
328338
}

app/clustering.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ START:
187187
}
188188
}
189189
}()
190-
err := a.locker.WatchServices(a.ctx, serviceName, []string{"cluster-name=" + a.Config.Clustering.ClusterName}, membersChan, a.Config.Clustering.ServicesWatchTimer)
190+
err := a.locker.WatchServices(ctx, serviceName, []string{"cluster-name=" + a.Config.Clustering.ClusterName}, membersChan, a.Config.Clustering.ServicesWatchTimer)
191191
if err != nil {
192192
a.Logger.Printf("failed getting services: %v", err)
193193
time.Sleep(retryTimer)
@@ -549,8 +549,10 @@ func (a *App) deleteTarget(name string) error {
549549
},
550550
}
551551
}
552+
ctx, cancel := context.WithCancel(a.ctx)
553+
defer cancel()
552554
url := fmt.Sprintf("%s://%s/api/v1/config/targets/%s", scheme, s.Address, name)
553-
req, err := http.NewRequestWithContext(a.ctx, http.MethodDelete, url, nil)
555+
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
554556
if err != nil {
555557
a.Logger.Printf("failed to create a delete request: %v", err)
556558
errs = append(errs, err)
@@ -559,10 +561,12 @@ func (a *App) deleteTarget(name string) error {
559561

560562
rsp, err := client.Do(req)
561563
if err != nil {
564+
rsp.Body.Close()
562565
a.Logger.Printf("failed deleting target %q: %v", name, err)
563566
errs = append(errs, err)
564567
continue
565568
}
569+
rsp.Body.Close()
566570
a.Logger.Printf("received response code=%d, for DELETE %s", rsp.StatusCode, url)
567571
}
568572
if len(errs) == 0 {
@@ -604,6 +608,7 @@ func (a *App) assignTarget(ctx context.Context, tc *types.TargetConfig, service
604608
if err != nil {
605609
return err
606610
}
611+
defer resp.Body.Close()
607612
a.Logger.Printf("got response code=%d for target %q config add from %q", resp.StatusCode, tc.Name, service.Address)
608613
if resp.StatusCode > 200 {
609614
return fmt.Errorf("status code=%d", resp.StatusCode)
@@ -617,6 +622,7 @@ func (a *App) assignTarget(ctx context.Context, tc *types.TargetConfig, service
617622
if err != nil {
618623
return err
619624
}
625+
defer resp.Body.Close()
620626
a.Logger.Printf("got response code=%d for target %q assignment from %q", resp.StatusCode, tc.Name, service.Address)
621627
if resp.StatusCode > 200 {
622628
return fmt.Errorf("status code=%d", resp.StatusCode)
@@ -655,8 +661,10 @@ func (a *App) unassignTarget(name string, serviceID string) error {
655661
}
656662
rsp, err := client.Do(req)
657663
if err != nil {
664+
rsp.Body.Close()
658665
continue
659666
}
667+
rsp.Body.Close()
660668
a.Logger.Printf("received response code=%d, for DELETE %s", rsp.StatusCode, url)
661669
break
662670
}

app/collector.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,13 @@ func (a *App) StartCollector(ctx context.Context) {
5252
for {
5353
select {
5454
case rsp := <-rspChan:
55+
subscribeResponseReceivedCounter.WithLabelValues(t.Config.Name, rsp.SubscriptionConfig.Name).Add(1)
5556
if a.Config.Debug {
56-
a.Logger.Printf("received gNMI Subscribe Response: %+v", rsp)
57+
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp)
5758
}
5859
err := t.DecodeProtoBytes(rsp.Response)
5960
if err != nil {
60-
a.Logger.Printf("target %q, failed to decode proto bytes: %v", t.Config.Name, err)
61+
a.Logger.Printf("target %q: failed to decode proto bytes: %v", t.Config.Name, err)
6162
continue
6263
}
6364
m := outputs.Meta{
@@ -89,9 +90,9 @@ func (a *App) StartCollector(ctx context.Context) {
8990
}
9091
case tErr := <-errChan:
9192
if errors.Is(tErr.Err, io.EOF) {
92-
a.Logger.Printf("target %q, subscription %s closed stream(EOF)", t.Config.Name, tErr.SubscriptionName)
93+
a.Logger.Printf("target %q: subscription %s closed stream(EOF)", t.Config.Name, tErr.SubscriptionName)
9394
} else {
94-
a.Logger.Printf("target %q, subscription %s rcv error: %v", t.Config.Name, tErr.SubscriptionName, tErr.Err)
95+
a.Logger.Printf("target %q: subscription %s rcv error: %v", t.Config.Name, tErr.SubscriptionName, tErr.Err)
9596
}
9697
if remainingOnceSubscriptions > 0 {
9798
if a.subscriptionMode(tErr.SubscriptionName) == "ONCE" {
@@ -105,10 +106,10 @@ func (a *App) StartCollector(ctx context.Context) {
105106
return
106107
}
107108
case <-t.StopChan:
108-
a.Logger.Printf("stopping target %q listener", t.Config.Name)
109109
a.operLock.Lock()
110110
delete(a.activeTargets, t.Config.Name)
111111
a.operLock.Unlock()
112+
a.Logger.Printf("target %q: listener stopped", t.Config.Name)
112113
return
113114
case <-ctx.Done():
114115
a.operLock.Lock()

app/gnmi_client_subscribe.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,15 @@ func (a *App) clientSubscribe(ctx context.Context, tc *types.TargetConfig) error
168168
}
169169
subRequests = append(subRequests, subscriptionRequest{name: sc.Name, req: req})
170170
}
171+
if t.Cfn != nil {
172+
t.Cfn()
173+
}
171174
gnmiCtx, cancel := context.WithCancel(ctx)
172175
t.Cfn = cancel
173176
CRCLIENT:
174177
select {
175178
case <-gnmiCtx.Done():
179+
return gnmiCtx.Err()
176180
default:
177181
targetDialOpts := a.dialOpts
178182
if a.Config.UseTunnelServer {

app/loaders.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ START:
6969
// clustered, dispatch
7070
a.configLock.Lock()
7171
a.Config.Targets[add.Name] = add
72-
err = a.dispatchTarget(a.ctx, add)
72+
err = a.dispatchTarget(ctx, add)
7373
if err != nil {
7474
a.Logger.Printf("failed dispatching target %q: %v", add.Name, err)
7575
}

app/metrics.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package app
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/prometheus/client_golang/prometheus"
9+
)
10+
11+
const (
12+
clusterMetricsUpdatePeriod = 10 * time.Second
13+
)
14+
15+
// subscribe
16+
var subscribeResponseReceivedCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
17+
Namespace: "gnmic",
18+
Subsystem: "subscribe",
19+
Name: "number_of_received_subscribe_response_messages_total",
20+
Help: "Total number of received subscribe response messages",
21+
}, []string{"source", "subscription"})
22+
23+
// cluster
24+
var clusterNumberOfLockedTargets = prometheus.NewGauge(prometheus.GaugeOpts{
25+
Namespace: "gnmic",
26+
Subsystem: "cluster",
27+
Name: "number_of_locked_targets",
28+
Help: "number of locked targets",
29+
})
30+
var clusterIsLeader = prometheus.NewGauge(prometheus.GaugeOpts{
31+
Namespace: "gnmic",
32+
Subsystem: "cluster",
33+
Name: "is_leader",
34+
Help: "Has value 1 if this gnmic instance is the cluster leader, 0 otherwise",
35+
})
36+
37+
func (a *App) startClusterMetrics() {
38+
if a.Config.APIServer == nil || !a.Config.APIServer.EnableMetrics || a.Config.Clustering == nil {
39+
return
40+
}
41+
var err error
42+
err = a.reg.Register(clusterNumberOfLockedTargets)
43+
if err != nil {
44+
a.Logger.Printf("failed to register metric: %v", err)
45+
}
46+
err = a.reg.Register(clusterIsLeader)
47+
if err != nil {
48+
a.Logger.Printf("failed to register metric: %v", err)
49+
}
50+
ticker := time.NewTicker(clusterMetricsUpdatePeriod)
51+
defer ticker.Stop()
52+
for {
53+
select {
54+
case <-a.ctx.Done():
55+
return
56+
case <-ticker.C:
57+
ctx, cancel := context.WithTimeout(a.ctx, clusterMetricsUpdatePeriod/2)
58+
leaderKey := fmt.Sprintf("gnmic/%s/leader", a.Config.ClusterName)
59+
leader, err := a.locker.List(ctx, leaderKey)
60+
cancel()
61+
if err != nil {
62+
a.Logger.Printf("failed to get leader key: %v", err)
63+
}
64+
if leader[leaderKey] == a.Config.InstanceName {
65+
clusterIsLeader.Set(1)
66+
} else {
67+
clusterIsLeader.Set(0)
68+
}
69+
70+
lockedNodesPrefix := fmt.Sprintf("gnmic/%s/targets", a.Config.ClusterName)
71+
ctx, cancel = context.WithTimeout(a.ctx, clusterMetricsUpdatePeriod/2)
72+
lockedNodes, err := a.locker.List(ctx, lockedNodesPrefix)
73+
cancel()
74+
if err != nil {
75+
a.Logger.Printf("failed to get locked nodes key: %v", err)
76+
}
77+
numLockedNodes := 0
78+
for _, v := range lockedNodes {
79+
if v == a.Config.InstanceName {
80+
numLockedNodes++
81+
}
82+
}
83+
clusterNumberOfLockedTargets.Set(float64(numLockedNodes))
84+
}
85+
}
86+
}

app/outputs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ func (a *App) InitOutput(ctx context.Context, name string, tcs map[string]*types
2424
outputs.WithLogger(a.Logger),
2525
outputs.WithEventProcessors(
2626
a.Config.Processors,
27-
a.Logger,
27+
a.Logger,
2828
a.Config.Targets,
2929
a.Config.Actions,
3030
),
31-
outputs.WithRegister(a.reg),
31+
outputs.WithRegistry(a.reg),
3232
outputs.WithName(a.Config.InstanceName),
3333
outputs.WithClusterName(a.Config.ClusterName),
3434
outputs.WithTargetsConfig(tcs),

app/subscribe.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/manifoldco/promptui"
1616
"github.com/openconfig/gnmi/proto/gnmi"
1717
"github.com/openconfig/grpctunnel/tunnel"
18-
"github.com/prometheus/client_golang/prometheus"
1918
"github.com/spf13/cobra"
2019
"github.com/spf13/pflag"
2120
)
@@ -77,9 +76,6 @@ func (a *App) SubscribeRunE(cmd *cobra.Command, args []string) error {
7776
if err != nil {
7877
return err
7978
}
80-
if a.Config.APIServer != nil && a.Config.APIServer.EnableMetrics {
81-
a.reg = prometheus.NewRegistry()
82-
}
8379
err = a.Config.GetClustering()
8480
if err != nil {
8581
return err

app/target.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ func (a *App) DeleteTarget(ctx context.Context, name string) error {
7474
if cfn, ok := a.targetsLockFn[name]; ok {
7575
cfn()
7676
}
77+
if a.c != nil && a.c.HasTarget(name) {
78+
a.c.Remove(name)
79+
}
7780
if t, ok := a.Targets[name]; ok {
7881
delete(a.Targets, name)
7982
t.Close()

0 commit comments

Comments
 (0)