@@ -26,78 +26,47 @@ import (
2626// Version is a kafka version
2727type Version string
2828
29- // TODO: remove me.
30- // Compat version overwrite for missing versions in sarama
31- // Public API is compatible between these versions.
3229var (
33- v0_10_2_1 = parseKafkaVersion ("0.10.2.1" )
34- v0_11_0_1 = parseKafkaVersion ("0.11.0.1" )
35- v0_11_0_2 = parseKafkaVersion ("0.11.0.2" )
36- v1_0_1 = parseKafkaVersion ("1.0.1" )
37- v1_0_2 = parseKafkaVersion ("1.0.2" )
38- v1_1_1 = parseKafkaVersion ("1.1.1" )
39-
40- kafkaVersions = map [string ]sarama.KafkaVersion {
41- "0.8.2.0" : sarama .V0_8_2_0 ,
42- "0.8.2.1" : sarama .V0_8_2_1 ,
43- "0.8.2.2" : sarama .V0_8_2_2 ,
44- "0.8.2" : sarama .V0_8_2_2 ,
45- "0.8" : sarama .V0_8_2_2 ,
46-
47- "0.9.0.0" : sarama .V0_9_0_0 ,
48- "0.9.0.1" : sarama .V0_9_0_1 ,
49- "0.9.0" : sarama .V0_9_0_1 ,
50- "0.9" : sarama .V0_9_0_1 ,
51-
52- "0.10.0.0" : sarama .V0_10_0_0 ,
53- "0.10.0.1" : sarama .V0_10_0_1 ,
54- "0.10.0" : sarama .V0_10_0_1 ,
55- "0.10.1.0" : sarama .V0_10_1_0 ,
56- "0.10.1" : sarama .V0_10_1_0 ,
57- "0.10.2.0" : sarama .V0_10_2_0 ,
58- "0.10.2.1" : v0_10_2_1 ,
59- "0.10.2" : v0_10_2_1 ,
60- "0.10" : v0_10_2_1 ,
61-
62- "0.11.0.0" : sarama .V0_11_0_0 ,
63- "0.11.0.1" : v0_11_0_1 ,
64- "0.11.0.2" : v0_11_0_2 ,
65- "0.11.0" : v0_11_0_2 ,
66- "0.11" : v0_11_0_2 ,
67-
68- "1.0.0" : sarama .V1_0_0_0 ,
69- "1.0.1" : v1_0_1 ,
70- "1.0.2" : v1_0_2 ,
71- "1.0" : v1_0_2 ,
72- "1.1.0" : sarama .V1_1_0_0 ,
73- "1.1.1" : v1_1_1 ,
74- "1.1" : v1_1_1 ,
75- "1" : v1_1_1 ,
76-
77- "2.0.0" : sarama .V2_0_0_0 ,
78- "2.0.1" : sarama .V2_0_1_0 ,
79- "2.0" : sarama .V2_0_1_0 ,
80- "2.1" : sarama .V2_1_0_0 ,
81- "2.2" : sarama .V2_2_0_0 ,
82- "2" : sarama .V2_1_0_0 ,
30+ // Sarama expects version strings to be fully expanded, e.g. "1.1.1".
31+ // We also allow versions to be specified as a prefix, e.g. "1",
32+ // understood as referencing the most recent version starting with "1".
33+ // truncatedKafkaVersions stores a lookup of the abbreviations we accept.
34+ truncatedKafkaVersions = map [string ]sarama.KafkaVersion {
35+ "0.8.2" : sarama .V0_8_2_2 ,
36+ "0.8" : sarama .V0_8_2_2 ,
37+
38+ "0.9.0" : sarama .V0_9_0_1 ,
39+ "0.9" : sarama .V0_9_0_1 ,
40+
41+ "0.10.0" : sarama .V0_10_0_1 ,
42+ "0.10.1" : sarama .V0_10_1_0 ,
43+ "0.10.2" : sarama .V0_10_2_1 ,
44+ "0.10" : sarama .V0_10_2_1 ,
45+
46+ "0.11.0" : sarama .V0_11_0_2 ,
47+ "0.11" : sarama .V0_11_0_2 ,
48+
49+ "1.0" : sarama .V1_0_0_0 ,
50+ "1.1" : sarama .V1_1_1_0 ,
51+ "1" : sarama .V1_1_1_0 ,
52+
53+ "2.0" : sarama .V2_0_1_0 ,
54+ "2.1" : sarama .V2_1_0_0 ,
55+ "2.2" : sarama .V2_2_0_0 ,
56+ "2.3" : sarama .V2_3_0_0 ,
57+ "2.4" : sarama .V2_4_0_0 ,
58+ "2.5" : sarama .V2_5_0_0 ,
59+ "2.6" : sarama .V2_6_0_0 ,
60+ "2" : sarama .V2_6_0_0 ,
8361 }
8462)
8563
86- func parseKafkaVersion (s string ) sarama.KafkaVersion {
87- v , err := sarama .ParseKafkaVersion (s )
88- if err != nil {
89- panic (err )
90- }
91- return v
92- }
93-
9464// Validate that a kafka version is among the possible options
9565func (v * Version ) Validate () error {
96- if _ , ok := kafkaVersions [ string ( * v )]; ! ok {
97- return fmt . Errorf ( "unknown/unsupported kafka vesion '%v'" , * v )
66+ if _ , ok := v . Get (); ok {
67+ return nil
9868 }
99-
100- return nil
69+ return fmt .Errorf ("unknown/unsupported kafka version '%v'" , * v )
10170}
10271
10372// Unpack a kafka version
@@ -113,6 +82,20 @@ func (v *Version) Unpack(s string) error {
11382
11483// Get a sarama kafka version
11584func (v Version ) Get () (sarama.KafkaVersion , bool ) {
116- kv , ok := kafkaVersions [string (v )]
117- return kv , ok
85+ // First check if it's one of the abbreviations we accept.
86+ // If not, let sarama parse it.
87+ s := string (v )
88+ if version , ok := truncatedKafkaVersions [s ]; ok {
89+ return version , true
90+ }
91+ version , err := sarama .ParseKafkaVersion (s )
92+ if err != nil {
93+ return sarama.KafkaVersion {}, false
94+ }
95+ for _ , supp := range sarama .SupportedVersions {
96+ if version == supp {
97+ return version , true
98+ }
99+ }
100+ return sarama.KafkaVersion {}, false
118101}
0 commit comments