package main import ( "fmt" "log" "time" "strings" mqtt "github.com/eclipse/paho.mqtt.golang" ) const ( QoS0 = byte(0) QoS1 = byte(1) QoS2 = byte(2) RETAINED = true MQTT_CONNECT_TIMEOUT = 1 * time.Second MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second MQTT_DISCONNECT_TIMEOUT_US = 500 MQTT_KEEPALIVE_PERIOD = 2 * time.Second ) var ( mqttServerHealthTopic string ) type MQTTMessage struct { Topic string Payload []byte Retain bool } type Route struct { Topic string QoS byte Destination chan MQTTMessage } func NewRoute(topic string, qos byte) Route { return Route{topic, qos, make(chan MQTTMessage)} } func (m MQTTMessage) String() string { return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload)) } func MQTTRun(config MQTTConfig, tx chan MQTTMessage, routes ...Route) { mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix) mqttServerVersionTopic := fmt.Sprintf("%s/server/version", config.TopicPrefix) opts := mqtt.NewClientOptions() opts.AddBroker(config.Broker) opts.SetClientID(config.ClientID) opts.SetCleanSession(true) opts.SetOnConnectHandler(func(c mqtt.Client) { log.Printf("Connected to MQTT broker.") c.Publish(mqttServerHealthTopic, QoS1, true, []byte(`good`)) c.Publish(mqttServerVersionTopic, QoS1, true, []byte(Version())) for _, route := range routes { topic := config.TopicPrefix + "/" + route.Topic token := c.Subscribe(topic, route.QoS, func(c mqtt.Client, msg mqtt.Message) { message := MQTTMessage{ Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), Payload: msg.Payload(), Retain: msg.Retained(), } route.Destination <- message }) success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) if !success { log.Fatalf("Topic subscription failed for topic '%s'", topic) } } }) opts.SetConnectionLostHandler(MQTTConnectionLostHandler) opts.SetAutoReconnect(true) opts.SetConnectRetry(true) opts.SetConnectTimeout(MQTT_CONNECT_TIMEOUT) opts.SetKeepAlive(MQTT_KEEPALIVE_PERIOD) opts.SetWill(mqttServerHealthTopic, `bad`, QoS1, true) client := mqtt.NewClient(opts) token := client.Connect() success := token.WaitTimeout(MQTT_CONNECT_TIMEOUT) if !success { log.Fatal("Initial connection to MQTT broker failed.") } defer client.Disconnect(MQTT_DISCONNECT_TIMEOUT_US) for message := range tx { topic := fmt.Sprintf("%s/%s", config.TopicPrefix, message.Topic) client.Publish(topic, QoS1, message.Retain, message.Payload) } } func MQTTConnectionLostHandler(c mqtt.Client, err error) { log.Printf("Connection to MQTT broker lost: %v", err) }