From 64f8cf0630ce51349b94aca2f91617d373ee800d Mon Sep 17 00:00:00 2001 From: xengineering Date: Mon, 23 Mar 2026 20:46:34 +0100 Subject: Add MQTTMessage.String() This simplifies debugging. --- mqtt.go | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'mqtt.go') diff --git a/mqtt.go b/mqtt.go index a7b374d..d9f0632 100644 --- a/mqtt.go +++ b/mqtt.go @@ -25,6 +25,10 @@ type MQTTMessage struct { Payload []byte } +func (m MQTTMessage) String() string { + return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload)) +} + func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix) -- cgit v1.3 From 1bd2833f81379f25b29ab5d929f14e51700fa471 Mon Sep 17 00:00:00 2001 From: xengineering Date: Mon, 23 Mar 2026 20:53:40 +0100 Subject: Add MQTT subscription for /cover//movement This let's the Sia server receive cover movement commands. For now they are simply logged. --- main.go | 4 +++- meson.build | 1 + mqtt.go | 27 ++++++++++++++++++++------- shelly.go | 11 +++++++++++ 4 files changed, 35 insertions(+), 8 deletions(-) create mode 100644 shelly.go (limited to 'mqtt.go') diff --git a/main.go b/main.go index 6a59d64..e123c83 100644 --- a/main.go +++ b/main.go @@ -18,10 +18,12 @@ func main() { config := GetStartupConfig(flags.ConfigPath) + rx := make(chan MQTTMessage) tx := make(chan MQTTMessage) - go MQTTRun(config.MQTT, tx) + go MQTTRun(config.MQTT, rx, tx) go HomematicRun(config.Homematic, tx) + go ShellyRun(config.Shelly, rx) Await(syscall.SIGTERM, syscall.SIGINT) } diff --git a/meson.build b/meson.build index 4bc72bd..e8878ce 100644 --- a/meson.build +++ b/meson.build @@ -23,6 +23,7 @@ sia_server_linux_amd64 = custom_target( meson.current_source_dir() / 'homematic.go', meson.current_source_dir() / 'config.go', meson.current_source_dir() / 'flags.go', + meson.current_source_dir() / 'shelly.go', ], output : 'sia-server-linux-amd64', env : {'GOOS': 'linux', 'GOARCH': 'amd64'}, diff --git a/mqtt.go b/mqtt.go index d9f0632..7ad47ba 100644 --- a/mqtt.go +++ b/mqtt.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "time" + "strings" mqtt "github.com/eclipse/paho.mqtt.golang" ) @@ -12,6 +13,7 @@ const ( QOS = byte(1) RETAINED = true MQTT_CONNECT_TIMEOUT = 1 * time.Second + MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second MQTT_DISCONNECT_TIMEOUT_US = 500 MQTT_KEEPALIVE_PERIOD = 2 * time.Second ) @@ -29,14 +31,30 @@ func (m MQTTMessage) String() string { return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload)) } -func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { +func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix) opts := mqtt.NewClientOptions() opts.AddBroker(config.Broker) opts.SetClientID(config.ClientID) opts.SetCleanSession(true) - opts.SetOnConnectHandler(MQTTOnConnectHandler) + opts.SetOnConnectHandler(func(c mqtt.Client) { + log.Printf("Connected to MQTT broker.") + c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) + + topic := fmt.Sprintf("%s/cover/+/movement", config.TopicPrefix) + token := c.Subscribe(topic, byte(2), func(c mqtt.Client, msg mqtt.Message) { + message := MQTTMessage{ + Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), + Payload: msg.Payload(), + } + rx <- message + }) + success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) + if !success { + log.Fatal("Initial topic subscription failed.") + } + }) opts.SetConnectionLostHandler(MQTTConnectionLostHandler) opts.SetAutoReconnect(true) opts.SetConnectRetry(true) @@ -60,11 +78,6 @@ func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { } } -func MQTTOnConnectHandler(c mqtt.Client) { - log.Printf("Connected to MQTT broker.") - c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) -} - func MQTTConnectionLostHandler(c mqtt.Client, err error) { log.Printf("Connection to MQTT broker lost: %v", err) } diff --git a/shelly.go b/shelly.go new file mode 100644 index 0000000..0d33182 --- /dev/null +++ b/shelly.go @@ -0,0 +1,11 @@ +package main + +import ( + "log" +) + +func ShellyRun(config ShellyConfigs, rx chan MQTTMessage) { + for message := range rx { + log.Printf("Got MQTT message: %v", message) + } +} -- cgit v1.3 From 476db7047a9c650057c034c647ea66f3c38e8a53 Mon Sep 17 00:00:00 2001 From: xengineering Date: Wed, 25 Mar 2026 20:33:22 +0100 Subject: Refactor and add routing concept This scales better when additional receiving routes will be added. --- main.go | 6 +++--- mqtt.go | 44 +++++++++++++++++++++++++++++--------------- shelly.go | 4 ++-- 3 files changed, 34 insertions(+), 20 deletions(-) (limited to 'mqtt.go') diff --git a/main.go b/main.go index e123c83..1678eac 100644 --- a/main.go +++ b/main.go @@ -18,12 +18,12 @@ func main() { config := GetStartupConfig(flags.ConfigPath) - rx := make(chan MQTTMessage) tx := make(chan MQTTMessage) + coverMovement := NewRoute("cover/+/movement", QoS2) - go MQTTRun(config.MQTT, rx, tx) + go MQTTRun(config.MQTT, tx, coverMovement) go HomematicRun(config.Homematic, tx) - go ShellyRun(config.Shelly, rx) + go ShellyRun(config.Shelly, coverMovement) Await(syscall.SIGTERM, syscall.SIGINT) } diff --git a/mqtt.go b/mqtt.go index 7ad47ba..fd1cba1 100644 --- a/mqtt.go +++ b/mqtt.go @@ -10,7 +10,9 @@ import ( ) const ( - QOS = byte(1) + QoS0 = byte(0) + QoS1 = byte(1) + QoS2 = byte(2) RETAINED = true MQTT_CONNECT_TIMEOUT = 1 * time.Second MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second @@ -27,11 +29,21 @@ type MQTTMessage struct { Payload []byte } +type Route struct { + Topic string + QoS byte + Destination chan MQTTMessage +} + +func NewRoute(topic string, qos byte) Route { + return Route{topic, qos, make(chan MQTTMessage)} +} + func (m MQTTMessage) String() string { return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload)) } -func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { +func MQTTRun(config MQTTConfig, tx chan MQTTMessage, routes ...Route) { mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix) opts := mqtt.NewClientOptions() @@ -40,19 +52,21 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { opts.SetCleanSession(true) opts.SetOnConnectHandler(func(c mqtt.Client) { log.Printf("Connected to MQTT broker.") - c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) + c.Publish(mqttServerHealthTopic, QoS1, true, []byte(`good`)) - topic := fmt.Sprintf("%s/cover/+/movement", config.TopicPrefix) - token := c.Subscribe(topic, byte(2), func(c mqtt.Client, msg mqtt.Message) { - message := MQTTMessage{ - Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), - Payload: msg.Payload(), + for _, route := range routes { + topic := config.TopicPrefix + "/" + route.Topic + token := c.Subscribe(topic, route.QoS, func(c mqtt.Client, msg mqtt.Message) { + message := MQTTMessage{ + Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), + Payload: msg.Payload(), + } + route.Destination <- message + }) + success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) + if !success { + log.Fatalf("Topic subscription failed for topic '%s'", topic) } - rx <- message - }) - success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) - if !success { - log.Fatal("Initial topic subscription failed.") } }) opts.SetConnectionLostHandler(MQTTConnectionLostHandler) @@ -60,7 +74,7 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { opts.SetConnectRetry(true) opts.SetConnectTimeout(MQTT_CONNECT_TIMEOUT) opts.SetKeepAlive(MQTT_KEEPALIVE_PERIOD) - opts.SetWill(mqttServerHealthTopic, `bad`, QOS, true) + opts.SetWill(mqttServerHealthTopic, `bad`, QoS1, true) client := mqtt.NewClient(opts) @@ -74,7 +88,7 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { for message := range tx { topic := fmt.Sprintf("%s/%s", config.TopicPrefix, message.Topic) - client.Publish(topic, QOS, RETAINED, message.Payload) + client.Publish(topic, QoS1, RETAINED, message.Payload) } } diff --git a/shelly.go b/shelly.go index 7396bbb..5f6f2ad 100644 --- a/shelly.go +++ b/shelly.go @@ -7,8 +7,8 @@ import ( "strings" ) -func ShellyRun(config ShellyConfigs, rx chan MQTTMessage) { - for message := range rx { +func ShellyRun(config ShellyConfigs, route Route) { + for message := range route.Destination { ip, command, err := parseMessage(config, message) if err != nil { log.Println(err) -- cgit v1.3