summaryrefslogtreecommitdiff
path: root/mqtt.go
diff options
context:
space:
mode:
Diffstat (limited to 'mqtt.go')
-rw-r--r--mqtt.go27
1 files changed, 20 insertions, 7 deletions
diff --git a/mqtt.go b/mqtt.go
index d9f0632..7ad47ba 100644
--- a/mqtt.go
+++ b/mqtt.go
@@ -4,6 +4,7 @@ import (
"fmt"
"log"
"time"
+ "strings"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
@@ -12,6 +13,7 @@ const (
QOS = byte(1)
RETAINED = true
MQTT_CONNECT_TIMEOUT = 1 * time.Second
+ MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second
MQTT_DISCONNECT_TIMEOUT_US = 500
MQTT_KEEPALIVE_PERIOD = 2 * time.Second
)
@@ -29,14 +31,30 @@ func (m MQTTMessage) String() string {
return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload))
}
-func MQTTRun(config MQTTConfig, tx chan MQTTMessage) {
+func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) {
mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix)
opts := mqtt.NewClientOptions()
opts.AddBroker(config.Broker)
opts.SetClientID(config.ClientID)
opts.SetCleanSession(true)
- opts.SetOnConnectHandler(MQTTOnConnectHandler)
+ opts.SetOnConnectHandler(func(c mqtt.Client) {
+ log.Printf("Connected to MQTT broker.")
+ c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`))
+
+ topic := fmt.Sprintf("%s/cover/+/movement", config.TopicPrefix)
+ token := c.Subscribe(topic, byte(2), func(c mqtt.Client, msg mqtt.Message) {
+ message := MQTTMessage{
+ Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"),
+ Payload: msg.Payload(),
+ }
+ rx <- message
+ })
+ success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT)
+ if !success {
+ log.Fatal("Initial topic subscription failed.")
+ }
+ })
opts.SetConnectionLostHandler(MQTTConnectionLostHandler)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
@@ -60,11 +78,6 @@ func MQTTRun(config MQTTConfig, tx chan MQTTMessage) {
}
}
-func MQTTOnConnectHandler(c mqtt.Client) {
- log.Printf("Connected to MQTT broker.")
- c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`))
-}
-
func MQTTConnectionLostHandler(c mqtt.Client, err error) {
log.Printf("Connection to MQTT broker lost: %v", err)
}