Skip to content

Prevent the mqtt establish the second connection (#4594)#4814

Closed
trankennykhang wants to merge 2 commits intoinfluxdata:masterfrom
g3i:bugfix/4594
Closed

Prevent the mqtt establish the second connection (#4594)#4814
trankennykhang wants to merge 2 commits intoinfluxdata:masterfrom
g3i:bugfix/4594

Conversation

@trankennykhang
Copy link
Copy Markdown

Required for all PRs:

  • [x ] Signed CLA.

@glinton glinton added fix pr to fix corresponding bug area/mqtt labels Oct 5, 2018
@danielnelson
Copy link
Copy Markdown
Contributor

@trankennykhang Thanks this looks right but I'm still suspect about our handling of the connected boolean in onConnect. It seems like there is a race condition with the connect function. Do you think we could merge onConnect and connect like:

diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
index 5853ad93..85b7074a 100644
--- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go
+++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
@@ -150,27 +150,25 @@ func (m *MQTTConsumer) connect() error {
                return err
        }

-       go m.receiver()
-
-       return nil
-}
-
-func (m *MQTTConsumer) onConnect(c mqtt.Client) {
        log.Printf("I! MQTT Client Connected")
-       if !m.PersistentSession || !m.connected {
+       m.connected = true
+
+       if !m.PersistentSession {
                topics := make(map[string]byte)
                for _, topic := range m.Topics {
                        topics[topic] = byte(m.QoS)
                }
-               subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
+               subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
                subscribeToken.Wait()
                if subscribeToken.Error() != nil {
                        m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
                                strings.Join(m.Topics[:], ","), subscribeToken.Error()))
                }
-               m.connected = true
        }
-       return
+
+       go m.receiver()
+
+       return nil
 }
 
 func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
@@ -271,10 +269,9 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
                
                opts.AddBroker(server)
        }
-       opts.SetAutoReconnect(true)
+       opts.SetAutoReconnect(false)
        opts.SetKeepAlive(time.Second * 60)
        opts.SetCleanSession(!m.PersistentSession)
-       opts.SetOnConnectHandler(m.onConnect)
        opts.SetConnectionLostHandler(m.onConnectionLost)

        return opts, nil

@danielnelson
Copy link
Copy Markdown
Contributor

@trankennykhang Based on your change here I reorganized how the mqtt_consumer plugin makes connections. Could you review #4846?

@danielnelson
Copy link
Copy Markdown
Contributor

This change was included into #4846

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/mqtt fix pr to fix corresponding bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants