summaryrefslogtreecommitdiff
path: root/mqtt.go
diff options
context:
space:
mode:
authorxengineering <me@xengineering.eu>2026-03-25 21:37:20 +0100
committerxengineering <me@xengineering.eu>2026-03-25 21:37:20 +0100
commit4bc67b734dc8c90dd4679877e8825da32e67b7eb (patch)
treefc4b97bdb6b91caff22b771bb9d8f5ca64791772 /mqtt.go
parent7afbc98e6d715eef8809beb9793ccf5096104e26 (diff)
parent6001997a66c4c4b12e9d8b0853fef0fc0ff14768 (diff)
downloadsia-server-4bc67b734dc8c90dd4679877e8825da32e67b7eb.tar
sia-server-4bc67b734dc8c90dd4679877e8825da32e67b7eb.tar.zst
sia-server-4bc67b734dc8c90dd4679877e8825da32e67b7eb.zip
Merge branch 'shelly'
This adds basic support for Shelly 2PM Gen3 devices.
Diffstat (limited to 'mqtt.go')
-rw-r--r--mqtt.go51
1 files changed, 41 insertions, 10 deletions
diff --git a/mqtt.go b/mqtt.go
index a7b374d..fd1cba1 100644
--- a/mqtt.go
+++ b/mqtt.go
@@ -4,14 +4,18 @@ import (
"fmt"
"log"
"time"
+ "strings"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
- QOS = byte(1)
+ QoS0 = byte(0)
+ QoS1 = byte(1)
+ QoS2 = byte(2)
RETAINED = true
MQTT_CONNECT_TIMEOUT = 1 * time.Second
+ MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second
MQTT_DISCONNECT_TIMEOUT_US = 500
MQTT_KEEPALIVE_PERIOD = 2 * time.Second
)
@@ -25,20 +29,52 @@ type MQTTMessage struct {
Payload []byte
}
-func MQTTRun(config MQTTConfig, tx chan MQTTMessage) {
+type Route struct {
+ Topic string
+ QoS byte
+ Destination chan MQTTMessage
+}
+
+func NewRoute(topic string, qos byte) Route {
+ return Route{topic, qos, make(chan MQTTMessage)}
+}
+
+func (m MQTTMessage) String() string {
+ return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload))
+}
+
+func MQTTRun(config MQTTConfig, tx chan MQTTMessage, routes ...Route) {
mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix)
opts := mqtt.NewClientOptions()
opts.AddBroker(config.Broker)
opts.SetClientID(config.ClientID)
opts.SetCleanSession(true)
- opts.SetOnConnectHandler(MQTTOnConnectHandler)
+ opts.SetOnConnectHandler(func(c mqtt.Client) {
+ log.Printf("Connected to MQTT broker.")
+ c.Publish(mqttServerHealthTopic, QoS1, true, []byte(`good`))
+
+ for _, route := range routes {
+ topic := config.TopicPrefix + "/" + route.Topic
+ token := c.Subscribe(topic, route.QoS, func(c mqtt.Client, msg mqtt.Message) {
+ message := MQTTMessage{
+ Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"),
+ Payload: msg.Payload(),
+ }
+ route.Destination <- message
+ })
+ success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT)
+ if !success {
+ log.Fatalf("Topic subscription failed for topic '%s'", topic)
+ }
+ }
+ })
opts.SetConnectionLostHandler(MQTTConnectionLostHandler)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectTimeout(MQTT_CONNECT_TIMEOUT)
opts.SetKeepAlive(MQTT_KEEPALIVE_PERIOD)
- opts.SetWill(mqttServerHealthTopic, `bad`, QOS, true)
+ opts.SetWill(mqttServerHealthTopic, `bad`, QoS1, true)
client := mqtt.NewClient(opts)
@@ -52,15 +88,10 @@ func MQTTRun(config MQTTConfig, tx chan MQTTMessage) {
for message := range tx {
topic := fmt.Sprintf("%s/%s", config.TopicPrefix, message.Topic)
- client.Publish(topic, QOS, RETAINED, message.Payload)
+ client.Publish(topic, QoS1, RETAINED, message.Payload)
}
}
-func MQTTOnConnectHandler(c mqtt.Client) {
- log.Printf("Connected to MQTT broker.")
- c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`))
-}
-
func MQTTConnectionLostHandler(c mqtt.Client, err error) {
log.Printf("Connection to MQTT broker lost: %v", err)
}