Skip to content

Commit d858d82

Browse files
authored
Fix known mysql type conversion issues (#6647)
1 parent eb93dab commit d858d82

3 files changed

Lines changed: 215 additions & 2 deletions

File tree

plugins/inputs/mysql/mysql.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/influxdata/telegraf/internal/tls"
1515
"github.com/influxdata/telegraf/plugins/inputs"
1616
"github.com/influxdata/telegraf/plugins/inputs/mysql/v1"
17+
"github.com/influxdata/telegraf/plugins/inputs/mysql/v2"
1718
)
1819

1920
type Mysql struct {
@@ -37,6 +38,8 @@ type Mysql struct {
3738
GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"`
3839
IntervalSlow string `toml:"interval_slow"`
3940
MetricVersion int `toml:"metric_version"`
41+
42+
Log telegraf.Logger `toml:"-"`
4043
tls.ClientConfig
4144
lastT time.Time
4245
initDone bool
@@ -554,14 +557,20 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
554557
return err
555558
}
556559
key = strings.ToLower(key)
560+
557561
// parse mysql version and put into field and tag
558562
if strings.Contains(key, "version") {
559563
fields[key] = string(val)
560564
tags[key] = string(val)
561565
}
562-
if value, ok := m.parseValue(val); ok {
566+
567+
value, err := m.parseGlobalVariables(key, val)
568+
if err != nil {
569+
m.Log.Debugf("Error parsing global variable %q: %v", key, err)
570+
} else {
563571
fields[key] = value
564572
}
573+
565574
// Send 20 fields at a time
566575
if len(fields) >= 20 {
567576
acc.AddFields("mysql_variables", fields, tags)
@@ -575,6 +584,18 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
575584
return nil
576585
}
577586

587+
func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
588+
if m.MetricVersion < 2 {
589+
v, ok := v1.ParseValue(value)
590+
if ok {
591+
return v, nil
592+
}
593+
return v, fmt.Errorf("could not parse value: %q", string(value))
594+
} else {
595+
return v2.ConvertGlobalVariables(key, value)
596+
}
597+
}
598+
578599
// gatherSlaveStatuses can be used to get replication analytics
579600
// When the server is slave, then it returns only one row.
580601
// If the multi-source replication is set, then everything works differently
@@ -748,7 +769,10 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
748769
}
749770
} else {
750771
key = strings.ToLower(key)
751-
if value, ok := m.parseValue(val); ok {
772+
value, err := v2.ConvertGlobalStatus(key, val)
773+
if err != nil {
774+
m.Log.Debugf("Error parsing global status: %v", err)
775+
} else {
752776
fields[key] = value
753777
}
754778
}

plugins/inputs/mysql/v2/convert.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package v2
2+
3+
import (
4+
"bytes"
5+
"database/sql"
6+
"fmt"
7+
"strconv"
8+
)
9+
10+
type ConversionFunc func(value sql.RawBytes) (interface{}, error)
11+
12+
func ParseInt(value sql.RawBytes) (interface{}, error) {
13+
v, err := strconv.ParseInt(string(value), 10, 64)
14+
15+
// Ignore ErrRange. When this error is set the returned value is "the
16+
// maximum magnitude integer of the appropriate bitSize and sign."
17+
if err, ok := err.(*strconv.NumError); ok && err.Err == strconv.ErrRange {
18+
return v, nil
19+
}
20+
21+
return v, err
22+
}
23+
24+
func ParseBoolAsInteger(value sql.RawBytes) (interface{}, error) {
25+
if bytes.EqualFold(value, []byte("YES")) || bytes.EqualFold(value, []byte("ON")) {
26+
return int64(1), nil
27+
}
28+
29+
return int64(0), nil
30+
}
31+
32+
func ParseGTIDMode(value sql.RawBytes) (interface{}, error) {
33+
// https://dev.mysql.com/doc/refman/8.0/en/replication-mode-change-online-concepts.html
34+
v := string(value)
35+
switch v {
36+
case "OFF":
37+
return int64(0), nil
38+
case "ON":
39+
return int64(1), nil
40+
case "OFF_PERMISSIVE":
41+
return int64(0), nil
42+
case "ON_PERMISSIVE":
43+
return int64(1), nil
44+
default:
45+
return nil, fmt.Errorf("unrecognized gtid_mode: %q", v)
46+
}
47+
}
48+
49+
func ParseValue(value sql.RawBytes) (interface{}, error) {
50+
if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 {
51+
return 1, nil
52+
}
53+
54+
if bytes.EqualFold(value, []byte("NO")) || bytes.Compare(value, []byte("OFF")) == 0 {
55+
return 0, nil
56+
}
57+
58+
if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
59+
return val, nil
60+
}
61+
if val, err := strconv.ParseFloat(string(value), 64); err == nil {
62+
return val, nil
63+
}
64+
65+
if len(string(value)) > 0 {
66+
return string(value), nil
67+
}
68+
69+
return nil, fmt.Errorf("unconvertible value: %q", string(value))
70+
}
71+
72+
var GlobalStatusConversions = map[string]ConversionFunc{
73+
"ssl_ctx_verify_depth": ParseInt,
74+
"ssl_verify_depth": ParseInt,
75+
}
76+
77+
var GlobalVariableConversions = map[string]ConversionFunc{
78+
"gtid_mode": ParseGTIDMode,
79+
}
80+
81+
func ConvertGlobalStatus(key string, value sql.RawBytes) (interface{}, error) {
82+
if bytes.Equal(value, []byte("")) {
83+
return nil, nil
84+
}
85+
86+
if conv, ok := GlobalStatusConversions[key]; ok {
87+
return conv(value)
88+
}
89+
90+
return ParseValue(value)
91+
}
92+
93+
func ConvertGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
94+
if bytes.Equal(value, []byte("")) {
95+
return nil, nil
96+
}
97+
98+
if conv, ok := GlobalVariableConversions[key]; ok {
99+
return conv(value)
100+
}
101+
102+
return ParseValue(value)
103+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package v2
2+
3+
import (
4+
"database/sql"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestConvertGlobalStatus(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
key string
14+
value sql.RawBytes
15+
expected interface{}
16+
expectedErr error
17+
}{
18+
{
19+
name: "default",
20+
key: "ssl_ctx_verify_depth",
21+
value: []byte("0"),
22+
expected: int64(0),
23+
expectedErr: nil,
24+
},
25+
{
26+
name: "overflow int64",
27+
key: "ssl_ctx_verify_depth",
28+
value: []byte("18446744073709551615"),
29+
expected: int64(9223372036854775807),
30+
expectedErr: nil,
31+
},
32+
{
33+
name: "defined variable but unset",
34+
key: "ssl_ctx_verify_depth",
35+
value: []byte(""),
36+
expected: nil,
37+
expectedErr: nil,
38+
},
39+
}
40+
for _, tt := range tests {
41+
t.Run(tt.name, func(t *testing.T) {
42+
actual, err := ConvertGlobalStatus(tt.key, tt.value)
43+
require.Equal(t, tt.expectedErr, err)
44+
require.Equal(t, tt.expected, actual)
45+
})
46+
}
47+
}
48+
49+
func TestCovertGlobalVariables(t *testing.T) {
50+
tests := []struct {
51+
name string
52+
key string
53+
value sql.RawBytes
54+
expected interface{}
55+
expectedErr error
56+
}{
57+
{
58+
name: "boolean type mysql<=5.6",
59+
key: "gtid_mode",
60+
value: []byte("ON"),
61+
expected: int64(1),
62+
expectedErr: nil,
63+
},
64+
{
65+
name: "enum type mysql>=5.7",
66+
key: "gtid_mode",
67+
value: []byte("ON_PERMISSIVE"),
68+
expected: int64(1),
69+
expectedErr: nil,
70+
},
71+
{
72+
name: "defined variable but unset",
73+
key: "ssl_ctx_verify_depth",
74+
value: []byte(""),
75+
expected: nil,
76+
expectedErr: nil,
77+
},
78+
}
79+
for _, tt := range tests {
80+
t.Run(tt.name, func(t *testing.T) {
81+
actual, err := ConvertGlobalVariables(tt.key, tt.value)
82+
require.Equal(t, tt.expectedErr, err)
83+
require.Equal(t, tt.expected, actual)
84+
})
85+
}
86+
}

0 commit comments

Comments
 (0)