diff options
Diffstat (limited to 'mqtt.go')
| -rw-r--r-- | mqtt.go | 51 |
1 files changed, 41 insertions, 10 deletions
@@ -4,14 +4,18 @@ import ( "fmt" "log" "time" + "strings" mqtt "github.com/eclipse/paho.mqtt.golang" ) const ( - QOS = byte(1) + QoS0 = byte(0) + QoS1 = byte(1) + QoS2 = byte(2) RETAINED = true MQTT_CONNECT_TIMEOUT = 1 * time.Second + MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second MQTT_DISCONNECT_TIMEOUT_US = 500 MQTT_KEEPALIVE_PERIOD = 2 * time.Second ) @@ -25,20 +29,52 @@ type MQTTMessage struct { Payload []byte } -func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { +type Route struct { + Topic string + QoS byte + Destination chan MQTTMessage +} + +func NewRoute(topic string, qos byte) Route { + return Route{topic, qos, make(chan MQTTMessage)} +} + +func (m MQTTMessage) String() string { + return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload)) +} + +func MQTTRun(config MQTTConfig, tx chan MQTTMessage, routes ...Route) { mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix) opts := mqtt.NewClientOptions() opts.AddBroker(config.Broker) opts.SetClientID(config.ClientID) opts.SetCleanSession(true) - opts.SetOnConnectHandler(MQTTOnConnectHandler) + opts.SetOnConnectHandler(func(c mqtt.Client) { + log.Printf("Connected to MQTT broker.") + c.Publish(mqttServerHealthTopic, QoS1, true, []byte(`good`)) + + for _, route := range routes { + topic := config.TopicPrefix + "/" + route.Topic + token := c.Subscribe(topic, route.QoS, func(c mqtt.Client, msg mqtt.Message) { + message := MQTTMessage{ + Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), + Payload: msg.Payload(), + } + route.Destination <- message + }) + success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) + if !success { + log.Fatalf("Topic subscription failed for topic '%s'", topic) + } + } + }) opts.SetConnectionLostHandler(MQTTConnectionLostHandler) opts.SetAutoReconnect(true) opts.SetConnectRetry(true) opts.SetConnectTimeout(MQTT_CONNECT_TIMEOUT) opts.SetKeepAlive(MQTT_KEEPALIVE_PERIOD) - opts.SetWill(mqttServerHealthTopic, `bad`, QOS, true) + opts.SetWill(mqttServerHealthTopic, `bad`, QoS1, true) client := mqtt.NewClient(opts) @@ -52,15 +88,10 @@ func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { for message := range tx { topic := fmt.Sprintf("%s/%s", config.TopicPrefix, message.Topic) - client.Publish(topic, QOS, RETAINED, message.Payload) + client.Publish(topic, QoS1, RETAINED, message.Payload) } } -func MQTTOnConnectHandler(c mqtt.Client) { - log.Printf("Connected to MQTT broker.") - c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) -} - func MQTTConnectionLostHandler(c mqtt.Client, err error) { log.Printf("Connection to MQTT broker lost: %v", err) } |
