diff options
| author | xengineering <me@xengineering.eu> | 2026-03-23 20:53:40 +0100 |
|---|---|---|
| committer | xengineering <me@xengineering.eu> | 2026-03-23 21:36:30 +0100 |
| commit | 2463ce39f4aeb99e38b5d7f83d0179e9547aa3eb (patch) | |
| tree | 130facd6bf6f328a1b5f716082524c0b9a7b1fe2 /mqtt.go | |
| parent | 74ddd135ebc63399d9f71b585f23ae8b97d46866 (diff) | |
| download | sia-server-2463ce39f4aeb99e38b5d7f83d0179e9547aa3eb.tar sia-server-2463ce39f4aeb99e38b5d7f83d0179e9547aa3eb.tar.zst sia-server-2463ce39f4aeb99e38b5d7f83d0179e9547aa3eb.zip | |
Add MQTT subscription for /cover/<id>/movement
This let's the Sia server receive cover movement commands. For now they
are simply logged.
Diffstat (limited to 'mqtt.go')
| -rw-r--r-- | mqtt.go | 27 |
1 files changed, 20 insertions, 7 deletions
@@ -4,6 +4,7 @@ import ( "fmt" "log" "time" + "strings" mqtt "github.com/eclipse/paho.mqtt.golang" ) @@ -12,6 +13,7 @@ const ( QOS = byte(1) RETAINED = true MQTT_CONNECT_TIMEOUT = 1 * time.Second + MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second MQTT_DISCONNECT_TIMEOUT_US = 500 MQTT_KEEPALIVE_PERIOD = 2 * time.Second ) @@ -29,14 +31,30 @@ func (m MQTTMessage) String() string { return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload)) } -func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { +func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix) opts := mqtt.NewClientOptions() opts.AddBroker(config.Broker) opts.SetClientID(config.ClientID) opts.SetCleanSession(true) - opts.SetOnConnectHandler(MQTTOnConnectHandler) + opts.SetOnConnectHandler(func(c mqtt.Client) { + log.Printf("Connected to MQTT broker.") + c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) + + topic := fmt.Sprintf("%s/cover/+/movement", config.TopicPrefix) + token := c.Subscribe(topic, byte(2), func(c mqtt.Client, msg mqtt.Message) { + message := MQTTMessage{ + Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), + Payload: msg.Payload(), + } + rx <- message + }) + success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) + if !success { + log.Fatal("Initial topic subscription failed.") + } + }) opts.SetConnectionLostHandler(MQTTConnectionLostHandler) opts.SetAutoReconnect(true) opts.SetConnectRetry(true) @@ -60,11 +78,6 @@ func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { } } -func MQTTOnConnectHandler(c mqtt.Client) { - log.Printf("Connected to MQTT broker.") - c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) -} - func MQTTConnectionLostHandler(c mqtt.Client, err error) { log.Printf("Connection to MQTT broker lost: %v", err) } |
