summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxengineering <me@xengineering.eu>2026-03-23 20:53:40 +0100
committerxengineering <me@xengineering.eu>2026-03-23 21:36:30 +0100
commit2463ce39f4aeb99e38b5d7f83d0179e9547aa3eb (patch)
tree130facd6bf6f328a1b5f716082524c0b9a7b1fe2
parent74ddd135ebc63399d9f71b585f23ae8b97d46866 (diff)
downloadsia-server-2463ce39f4aeb99e38b5d7f83d0179e9547aa3eb.tar
sia-server-2463ce39f4aeb99e38b5d7f83d0179e9547aa3eb.tar.zst
sia-server-2463ce39f4aeb99e38b5d7f83d0179e9547aa3eb.zip
Add MQTT subscription for /cover/<id>/movement
This let's the Sia server receive cover movement commands. For now they are simply logged.
-rw-r--r--main.go4
-rw-r--r--meson.build1
-rw-r--r--mqtt.go27
-rw-r--r--shelly.go11
4 files changed, 35 insertions, 8 deletions
diff --git a/main.go b/main.go
index 6a59d64..e123c83 100644
--- a/main.go
+++ b/main.go
@@ -18,10 +18,12 @@ func main() {
config := GetStartupConfig(flags.ConfigPath)
+ rx := make(chan MQTTMessage)
tx := make(chan MQTTMessage)
- go MQTTRun(config.MQTT, tx)
+ go MQTTRun(config.MQTT, rx, tx)
go HomematicRun(config.Homematic, tx)
+ go ShellyRun(config.Shelly, rx)
Await(syscall.SIGTERM, syscall.SIGINT)
}
diff --git a/meson.build b/meson.build
index 4bc72bd..e8878ce 100644
--- a/meson.build
+++ b/meson.build
@@ -23,6 +23,7 @@ sia_server_linux_amd64 = custom_target(
meson.current_source_dir() / 'homematic.go',
meson.current_source_dir() / 'config.go',
meson.current_source_dir() / 'flags.go',
+ meson.current_source_dir() / 'shelly.go',
],
output : 'sia-server-linux-amd64',
env : {'GOOS': 'linux', 'GOARCH': 'amd64'},
diff --git a/mqtt.go b/mqtt.go
index d9f0632..7ad47ba 100644
--- a/mqtt.go
+++ b/mqtt.go
@@ -4,6 +4,7 @@ import (
"fmt"
"log"
"time"
+ "strings"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
@@ -12,6 +13,7 @@ const (
QOS = byte(1)
RETAINED = true
MQTT_CONNECT_TIMEOUT = 1 * time.Second
+ MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second
MQTT_DISCONNECT_TIMEOUT_US = 500
MQTT_KEEPALIVE_PERIOD = 2 * time.Second
)
@@ -29,14 +31,30 @@ func (m MQTTMessage) String() string {
return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload))
}
-func MQTTRun(config MQTTConfig, tx chan MQTTMessage) {
+func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) {
mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix)
opts := mqtt.NewClientOptions()
opts.AddBroker(config.Broker)
opts.SetClientID(config.ClientID)
opts.SetCleanSession(true)
- opts.SetOnConnectHandler(MQTTOnConnectHandler)
+ opts.SetOnConnectHandler(func(c mqtt.Client) {
+ log.Printf("Connected to MQTT broker.")
+ c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`))
+
+ topic := fmt.Sprintf("%s/cover/+/movement", config.TopicPrefix)
+ token := c.Subscribe(topic, byte(2), func(c mqtt.Client, msg mqtt.Message) {
+ message := MQTTMessage{
+ Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"),
+ Payload: msg.Payload(),
+ }
+ rx <- message
+ })
+ success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT)
+ if !success {
+ log.Fatal("Initial topic subscription failed.")
+ }
+ })
opts.SetConnectionLostHandler(MQTTConnectionLostHandler)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
@@ -60,11 +78,6 @@ func MQTTRun(config MQTTConfig, tx chan MQTTMessage) {
}
}
-func MQTTOnConnectHandler(c mqtt.Client) {
- log.Printf("Connected to MQTT broker.")
- c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`))
-}
-
func MQTTConnectionLostHandler(c mqtt.Client, err error) {
log.Printf("Connection to MQTT broker lost: %v", err)
}
diff --git a/shelly.go b/shelly.go
new file mode 100644
index 0000000..0d33182
--- /dev/null
+++ b/shelly.go
@@ -0,0 +1,11 @@
+package main
+
+import (
+ "log"
+)
+
+func ShellyRun(config ShellyConfigs, rx chan MQTTMessage) {
+ for message := range rx {
+ log.Printf("Got MQTT message: %v", message)
+ }
+}