Skip to content

Commit f0ffe67

Browse files
Add heartbeat (#17)
* Add heartbeat initial commit. * FIxing a few loose ends. * Add max retry time. * Add changing heartbeat value with response. * Stop old ticker. * Remove max retry time check. * Generate heartbeat id on heartbeat start. * Remove "indirect". * Rename consts. * Rename heartbeatFrequency to frequency. * Change header. * Move heartbeat interface to heartbeat file. * Move error logging to beat(). * A bit more cleanup. * Move userAgent to common. * Move retry transport to common. * Update heartbeat to use RetryTransport. * Nitpicks. * Small refactor of the heartbeat call. (#18) * Make retry transport configurable. * Configure transport without constructor. Co-authored-by: Charlie Crawford <ccrawford@pagerduty.com>
1 parent 439a92f commit f0ffe67

9 files changed

Lines changed: 176 additions & 59 deletions

File tree

cmd/init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func NewInitCmd() *cobra.Command {
3131
Use: "init",
3232
Short: "Generate a new initial configuration file.",
3333
Long: `Generate a new initial configuration file
34-
34+
3535
Can be run without options to automatically generate defaults, or will use
3636
configuration options or an existing config as its basis.`,
3737
Run: func(cmd *cobra.Command, args []string) {

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/boltdb/bolt v1.3.1 // indirect
1010
github.com/fsnotify/fsnotify v1.4.9 // indirect
1111
github.com/golang/snappy v0.0.1 // indirect
12+
github.com/google/uuid v1.2.0
1213
github.com/gorilla/mux v1.7.4
1314
github.com/mitchellh/go-homedir v1.1.0
1415
github.com/pelletier/go-toml v1.6.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
4949
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
5050
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
5151
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
52+
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
53+
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
5254
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
5355
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
5456
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=

pkg/common/common.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package common
22

33
import (
4+
"fmt"
45
"os"
6+
"runtime"
57
"time"
68
)
79

@@ -27,3 +29,12 @@ func init() {
2729
func IsProduction() bool {
2830
return os.Getenv("APP_ENV") == "production"
2931
}
32+
33+
func UserAgent() string {
34+
version := Version
35+
system := runtime.GOOS
36+
commit := Commit
37+
date := Date
38+
39+
return fmt.Sprintf("go-pdagent/%v (%v, commit: %v, date: %v)", version, system, commit, date)
40+
}
Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
package eventsapi
1+
package common
22

33
import (
44
"math"
55
"net"
66
"net/http"
77
"time"
88

9-
"github.com/PagerDuty/go-pdagent/pkg/common"
109
"go.uber.org/zap"
1110
"golang.org/x/net/http2"
1211
)
@@ -30,8 +29,9 @@ const defaultMaxRetries = 10
3029
//
3130
type RetryTransport struct {
3231
MaxRetries int
32+
MaxInterval time.Duration
3333
Transport http.RoundTripper
34-
Backoff func(int) time.Duration
34+
Backoff func(int, time.Duration) time.Duration
3535
IsRetryable func(*http.Response, error) bool
3636
IsSuccess func(*http.Response, error) bool
3737

@@ -40,14 +40,14 @@ type RetryTransport struct {
4040

4141
func NewRetryTransport() RetryTransport {
4242
return RetryTransport{
43-
MaxRetries: defaultMaxRetries,
44-
Transport: http.DefaultTransport,
43+
MaxRetries: defaultMaxRetries,
44+
MaxInterval: defaultMaxInterval,
45+
Transport: http.DefaultTransport,
4546

4647
Backoff: calculateBackoff,
4748
IsRetryable: isRetryable,
48-
IsSuccess: isSuccess,
49-
50-
log: common.Logger.Named("RetryTransport"),
49+
IsSuccess: IsSuccessResponse,
50+
log: Logger.Named("RetryTransport"),
5151
}
5252
}
5353

@@ -73,7 +73,7 @@ func (r RetryTransport) RoundTrip(req *http.Request) (*http.Response, error) {
7373
return nil, err
7474
}
7575

76-
backoff := r.Backoff(tries)
76+
backoff := r.Backoff(tries, r.MaxInterval)
7777
sleep := time.After(backoff)
7878
r.log.Infof("Retrying job, attempt %v, delay %v", tries+1, backoff)
7979

@@ -102,10 +102,10 @@ func (r RetryTransport) RoundTrip(req *http.Request) (*http.Response, error) {
102102
//
103103
// Currently back-off looks like: 1s, 2s, 4s, 8s, 16s, then capping at
104104
// MaxRetryTimeout.
105-
func calculateBackoff(try int) time.Duration {
105+
func calculateBackoff(try int, maxInterval time.Duration) time.Duration {
106106
duration := time.Duration(math.Pow(2, float64(try))) * time.Second
107-
if duration > defaultMaxInterval {
108-
duration = defaultMaxInterval
107+
if duration > maxInterval {
108+
duration = maxInterval
109109
}
110110
return duration
111111
}
@@ -137,11 +137,11 @@ func isRetryable(resp *http.Response, err error) bool {
137137
return resp.StatusCode == 429 || resp.StatusCode/100 == 5
138138
}
139139

140-
// isSuccess returns true if the corresponding request was successful.
140+
// IsSuccessResponse returns true if the corresponding request was successful.
141141
//
142142
// Per documentation this is when the server responds with a 202, but we treat
143143
// any 2XX as a success.
144-
func isSuccess(resp *http.Response, err error) bool {
144+
func IsSuccessResponse(resp *http.Response, err error) bool {
145145
if err != nil || resp == nil {
146146
return false
147147
}
Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,39 @@
1-
package eventsapi
1+
package common
22

33
import (
44
"bytes"
5-
"gopkg.in/h2non/gock.v1"
65
"net/http"
76
"testing"
87
"time"
9-
)
108

11-
func mockEventV2(key string) EventV2 {
12-
return EventV2{
13-
RoutingKey: key,
14-
EventAction: "trigger",
15-
Payload: PayloadV2{
16-
Summary: "Test summary",
17-
Source: "Test source",
18-
Severity: "Error",
19-
},
20-
}
21-
}
9+
"gopkg.in/h2non/gock.v1"
10+
)
2211

2312
func TestRetryTransportSuccess(t *testing.T) {
2413
defer gock.Off()
2514

2615
// Respond twice with 429s, which should be retryable.
2716
gock.New("https://events.pagerduty.com").
2817
Times(2).
29-
Post("/v2/enqueue").
18+
Post("/test").
3019
Reply(429)
3120

3221
// Then response with a 200, which should trigger success.
3322
gock.New("https://events.pagerduty.com").
34-
Post("/v2/enqueue").
23+
Post("/test").
3524
Reply(200).
36-
JSON(ResponseV2{
37-
Status: "success",
38-
Message: "Event processed",
39-
DedupKey: "12345",
40-
})
25+
JSON("reply")
4126

4227
transport := NewRetryTransport()
4328
transport.Transport = gock.NewTransport()
44-
transport.Backoff = func(_ int) time.Duration { return time.Millisecond }
29+
transport.Backoff = func(_ int, _ time.Duration) time.Duration { return time.Millisecond }
4530

4631
client := &http.Client{
4732
Transport: transport,
4833
Timeout: 10 * time.Second,
4934
}
5035

51-
resp, err := client.Post("https://events.pagerduty.com/v2/enqueue", "application/json", bytes.NewBuffer([]byte("Hello")))
36+
resp, err := client.Post("https://events.pagerduty.com/test", "application/json", bytes.NewBuffer([]byte("Hello")))
5237
if err != nil {
5338
t.Errorf("Unexpected error %v", err)
5439
}
@@ -64,19 +49,19 @@ func TestRetryTransportLimited(t *testing.T) {
6449
// Respond twice with 429s, which should be retryable.
6550
gock.New("https://events.pagerduty.com").
6651
Times(defaultMaxRetries).
67-
Post("/v2/enqueue").
52+
Post("/test").
6853
Reply(429)
6954

7055
transport := NewRetryTransport()
7156
transport.Transport = gock.NewTransport()
72-
transport.Backoff = func(_ int) time.Duration { return time.Millisecond }
57+
transport.Backoff = func(_ int, _ time.Duration) time.Duration { return time.Millisecond }
7358

7459
client := &http.Client{
7560
Transport: transport,
7661
Timeout: 10 * time.Second,
7762
}
7863

79-
resp, err := client.Post("https://events.pagerduty.com/v2/enqueue", "application/json", bytes.NewBuffer([]byte("Hello")))
64+
resp, err := client.Post("https://events.pagerduty.com/test", "application/json", bytes.NewBuffer([]byte("Hello")))
8065
if err != nil {
8166
t.Errorf("Unexpected error %v", err)
8267
}

pkg/eventsapi/eventsapi.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ import (
55
"context"
66
"encoding/json"
77
"errors"
8-
"fmt"
98
"io/ioutil"
109
"net/http"
11-
"runtime"
1210
"time"
1311

1412
"github.com/PagerDuty/go-pdagent/pkg/common"
@@ -58,15 +56,15 @@ var defaultUserAgent string
5856

5957
func init() {
6058
DefaultHTTPClient = &http.Client{
61-
Transport: NewRetryTransport(),
59+
Transport: common.NewRetryTransport(),
6260
Timeout: 5 * time.Minute,
6361
}
6462

6563
defaultEnqueueConfig = enqueueConfig{
6664
HTTPClient: DefaultHTTPClient,
6765
}
6866

69-
defaultUserAgent = userAgent()
67+
defaultUserAgent = common.UserAgent()
7068
}
7169

7270
type EnqueueOption func(*enqueueConfig)
@@ -125,22 +123,13 @@ func enqueueEvent(context context.Context, client *http.Client, url string, even
125123
}
126124

127125
_ = json.Unmarshal(respBody, &response)
128-
if isSuccess(httpResp, err) {
126+
if common.IsSuccessResponse(httpResp, err) {
129127
return nil
130128
}
131129

132130
return ErrAPIError
133131
}
134132

135-
func userAgent() string {
136-
version := common.Version
137-
system := runtime.GOOS
138-
commit := common.Commit
139-
date := common.Date
140-
141-
return fmt.Sprintf("go-pdagent/%v (%v, commit: %v, date: %v)", version, system, commit, date)
142-
}
143-
144133
func validateRoutingKey(routingKey string) error {
145134
if len(routingKey) < 32 {
146135
return ErrInvalidRoutingKey

pkg/server/heartbeat.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package server
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"io/ioutil"
7+
"net/http"
8+
"time"
9+
10+
"github.com/PagerDuty/go-pdagent/pkg/common"
11+
"go.uber.org/zap"
12+
)
13+
14+
const url = "https://api.pagerduty.com/agent/2014-03-14/heartbeat/go-pdagent"
15+
const frequencySeconds = 60 * 60 // Send heartbeat every hour
16+
const maxRetries = 3
17+
const maxRetryInterval = 15 * time.Second
18+
19+
var ErrHeartbeatError = errors.New("an error was encountered while sending the heartbeat")
20+
21+
type Heartbeat interface {
22+
Start()
23+
Shutdown()
24+
}
25+
26+
type heartbeat struct {
27+
ticker *time.Ticker
28+
shutdown chan bool
29+
logger *zap.SugaredLogger
30+
client *http.Client
31+
frequency int
32+
}
33+
34+
type heartbeatResponseBody struct {
35+
HeartBeatIntervalSeconds int `json:"heartbeat_interval_secs"`
36+
}
37+
38+
func NewHeartbeat() Heartbeat {
39+
transport := common.NewRetryTransport()
40+
transport.MaxRetries = maxRetries
41+
transport.MaxInterval = maxRetryInterval
42+
43+
hb := heartbeat{
44+
ticker: nil,
45+
shutdown: make(chan bool),
46+
logger: common.Logger.Named("Heartbeat"),
47+
client: &http.Client{
48+
Transport: transport,
49+
Timeout: 20 * time.Second,
50+
},
51+
frequency: frequencySeconds,
52+
}
53+
54+
return &hb
55+
}
56+
57+
func (hb *heartbeat) Start() {
58+
hb.logger.Info("Starting heartbeat.")
59+
hb.ticker = time.NewTicker(time.Duration(hb.frequency) * time.Second)
60+
61+
go func() {
62+
for {
63+
select {
64+
case <-hb.shutdown:
65+
return
66+
case <-hb.ticker.C:
67+
go hb.beat()
68+
}
69+
}
70+
}()
71+
}
72+
73+
func (hb *heartbeat) Shutdown() {
74+
hb.ticker.Stop()
75+
hb.shutdown <- true
76+
hb.logger.Info("Heartbeat stopped.")
77+
}
78+
79+
func (hb *heartbeat) beat() {
80+
hb.logger.Info("Sending heartbeat")
81+
82+
heartbeatResponse, err := hb.doHeartbeatRequest()
83+
if err != nil {
84+
hb.logger.Warnf("An error occurred while sending heartbeat: ", err)
85+
return
86+
}
87+
88+
hb.logger.Info("Heartbeat successful")
89+
90+
hb.logger.Info("Updating heartbeat frequency to ", heartbeatResponse.HeartBeatIntervalSeconds)
91+
hb.ticker.Stop()
92+
hb.ticker = time.NewTicker(time.Duration(heartbeatResponse.HeartBeatIntervalSeconds) * time.Second)
93+
}
94+
95+
func (hb *heartbeat) doHeartbeatRequest() (*heartbeatResponseBody, error) {
96+
req, err := http.NewRequest("GET", url, nil)
97+
if err != nil {
98+
return nil, err
99+
}
100+
101+
req.Header.Add("User-Agent", common.UserAgent())
102+
req.Header.Add("Accept", "application/json")
103+
104+
httpResp, err := hb.client.Do(req)
105+
if !common.IsSuccessResponse(httpResp, err) {
106+
return nil, ErrHeartbeatError
107+
}
108+
109+
defer httpResp.Body.Close()
110+
respBody, err := ioutil.ReadAll(httpResp.Body)
111+
if err != nil {
112+
return nil, err
113+
}
114+
115+
var responseBody heartbeatResponseBody
116+
err = json.Unmarshal(respBody, &responseBody)
117+
if err != nil {
118+
return nil, err
119+
}
120+
121+
return &responseBody, nil
122+
}

0 commit comments

Comments
 (0)