From 1bd2833f81379f25b29ab5d929f14e51700fa471 Mon Sep 17 00:00:00 2001 From: xengineering Date: Mon, 23 Mar 2026 20:53:40 +0100 Subject: Add MQTT subscription for /cover//movement This let's the Sia server receive cover movement commands. For now they are simply logged. --- shelly.go | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 shelly.go (limited to 'shelly.go') diff --git a/shelly.go b/shelly.go new file mode 100644 index 0000000..0d33182 --- /dev/null +++ b/shelly.go @@ -0,0 +1,11 @@ +package main + +import ( + "log" +) + +func ShellyRun(config ShellyConfigs, rx chan MQTTMessage) { + for message := range rx { + log.Printf("Got MQTT message: %v", message) + } +} -- cgit v1.3 From bfd840bfd843f95183568f7ef6a9880a810ce049 Mon Sep 17 00:00:00 2001 From: xengineering Date: Mon, 23 Mar 2026 21:34:58 +0100 Subject: Add Shelly cover message parsing This results in the information of which command is to issue and which IP address the command has to be sent to. This is what is needed to deliver the message with Websockets. This delivery is the last step to implement basic Shelly cover support. --- shelly.go | 48 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) (limited to 'shelly.go') diff --git a/shelly.go b/shelly.go index 0d33182..7396bbb 100644 --- a/shelly.go +++ b/shelly.go @@ -1,11 +1,57 @@ package main import ( + "fmt" "log" + "net" + "strings" ) func ShellyRun(config ShellyConfigs, rx chan MQTTMessage) { for message := range rx { - log.Printf("Got MQTT message: %v", message) + ip, command, err := parseMessage(config, message) + if err != nil { + log.Println(err) + continue + } + + log.Printf("Send '%s' to '%s'.", command, ip) + } +} + +func parseMessage(config ShellyConfigs, m MQTTMessage) (ip *net.IP, command string, err error) { + elements := strings.Split(m.Topic, "/") + + if len(elements) != 3 { + return nil, "", fmt.Errorf( + "Expected three topic levels but got %d in '%s'.", + len(elements), m.Topic, + ) + } + + if elements[0] != "cover" || elements[2] != "movement" { + return nil, "", fmt.Errorf("Expected cover//movement but got: %s", m.Topic) } + + switch string(m.Payload) { + case "extend": + command = "Cover.Close" + case "retract": + command = "Cover.Open" + case "stop": + command = "Cover.Stop" + default: + return nil, "", fmt.Errorf("Invalid payload '%s'.", m.Payload) + } + + id := elements[1] + + for _, c := range config { + if c.ID == id { + ip := net.ParseIP(c.IP) + return &ip, command, nil + } + } + + return nil, "", fmt.Errorf("Got message for unknown cover '%s'", id) } -- cgit v1.3 From 476db7047a9c650057c034c647ea66f3c38e8a53 Mon Sep 17 00:00:00 2001 From: xengineering Date: Wed, 25 Mar 2026 20:33:22 +0100 Subject: Refactor and add routing concept This scales better when additional receiving routes will be added. --- main.go | 6 +++--- mqtt.go | 44 +++++++++++++++++++++++++++++--------------- shelly.go | 4 ++-- 3 files changed, 34 insertions(+), 20 deletions(-) (limited to 'shelly.go') diff --git a/main.go b/main.go index e123c83..1678eac 100644 --- a/main.go +++ b/main.go @@ -18,12 +18,12 @@ func main() { config := GetStartupConfig(flags.ConfigPath) - rx := make(chan MQTTMessage) tx := make(chan MQTTMessage) + coverMovement := NewRoute("cover/+/movement", QoS2) - go MQTTRun(config.MQTT, rx, tx) + go MQTTRun(config.MQTT, tx, coverMovement) go HomematicRun(config.Homematic, tx) - go ShellyRun(config.Shelly, rx) + go ShellyRun(config.Shelly, coverMovement) Await(syscall.SIGTERM, syscall.SIGINT) } diff --git a/mqtt.go b/mqtt.go index 7ad47ba..fd1cba1 100644 --- a/mqtt.go +++ b/mqtt.go @@ -10,7 +10,9 @@ import ( ) const ( - QOS = byte(1) + QoS0 = byte(0) + QoS1 = byte(1) + QoS2 = byte(2) RETAINED = true MQTT_CONNECT_TIMEOUT = 1 * time.Second MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second @@ -27,11 +29,21 @@ type MQTTMessage struct { Payload []byte } +type Route struct { + Topic string + QoS byte + Destination chan MQTTMessage +} + +func NewRoute(topic string, qos byte) Route { + return Route{topic, qos, make(chan MQTTMessage)} +} + func (m MQTTMessage) String() string { return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload)) } -func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { +func MQTTRun(config MQTTConfig, tx chan MQTTMessage, routes ...Route) { mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix) opts := mqtt.NewClientOptions() @@ -40,19 +52,21 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { opts.SetCleanSession(true) opts.SetOnConnectHandler(func(c mqtt.Client) { log.Printf("Connected to MQTT broker.") - c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) + c.Publish(mqttServerHealthTopic, QoS1, true, []byte(`good`)) - topic := fmt.Sprintf("%s/cover/+/movement", config.TopicPrefix) - token := c.Subscribe(topic, byte(2), func(c mqtt.Client, msg mqtt.Message) { - message := MQTTMessage{ - Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), - Payload: msg.Payload(), + for _, route := range routes { + topic := config.TopicPrefix + "/" + route.Topic + token := c.Subscribe(topic, route.QoS, func(c mqtt.Client, msg mqtt.Message) { + message := MQTTMessage{ + Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), + Payload: msg.Payload(), + } + route.Destination <- message + }) + success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) + if !success { + log.Fatalf("Topic subscription failed for topic '%s'", topic) } - rx <- message - }) - success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) - if !success { - log.Fatal("Initial topic subscription failed.") } }) opts.SetConnectionLostHandler(MQTTConnectionLostHandler) @@ -60,7 +74,7 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { opts.SetConnectRetry(true) opts.SetConnectTimeout(MQTT_CONNECT_TIMEOUT) opts.SetKeepAlive(MQTT_KEEPALIVE_PERIOD) - opts.SetWill(mqttServerHealthTopic, `bad`, QOS, true) + opts.SetWill(mqttServerHealthTopic, `bad`, QoS1, true) client := mqtt.NewClient(opts) @@ -74,7 +88,7 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { for message := range tx { topic := fmt.Sprintf("%s/%s", config.TopicPrefix, message.Topic) - client.Publish(topic, QOS, RETAINED, message.Payload) + client.Publish(topic, QoS1, RETAINED, message.Payload) } } diff --git a/shelly.go b/shelly.go index 7396bbb..5f6f2ad 100644 --- a/shelly.go +++ b/shelly.go @@ -7,8 +7,8 @@ import ( "strings" ) -func ShellyRun(config ShellyConfigs, rx chan MQTTMessage) { - for message := range rx { +func ShellyRun(config ShellyConfigs, route Route) { + for message := range route.Destination { ip, command, err := parseMessage(config, message) if err != nil { log.Println(err) -- cgit v1.3 From 6001997a66c4c4b12e9d8b0853fef0fc0ff14768 Mon Sep 17 00:00:00 2001 From: xengineering Date: Wed, 25 Mar 2026 21:00:13 +0100 Subject: Add Shelly command sending This allows basic control of Covers connected to Shelly devices. --- shelly.go | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) (limited to 'shelly.go') diff --git a/shelly.go b/shelly.go index 5f6f2ad..508b393 100644 --- a/shelly.go +++ b/shelly.go @@ -5,6 +5,8 @@ import ( "log" "net" "strings" + + "github.com/gorilla/websocket" ) func ShellyRun(config ShellyConfigs, route Route) { @@ -15,7 +17,10 @@ func ShellyRun(config ShellyConfigs, route Route) { continue } - log.Printf("Send '%s' to '%s'.", command, ip) + err = shellySendCommand(ip, command) + if err != nil { + log.Printf("Could not send command '%s' to %v: %v", command, ip, err) + } } } @@ -55,3 +60,31 @@ func parseMessage(config ShellyConfigs, m MQTTMessage) (ip *net.IP, command stri return nil, "", fmt.Errorf("Got message for unknown cover '%s'", id) } + +func shellySendCommand(ip *net.IP, command string) error { + template := ` +{ + "jsonrpc":"2.0", + "id": 1, + "src":"user_1", + "method":"%s", + "params": { + "id":0 + } +} +` + message := fmt.Appendf([]byte{}, template, command) + + c, _, err := websocket.DefaultDialer.Dial("ws://" + ip.String() + "/rpc", nil) + if err != nil { + return fmt.Errorf("Could not connect to Shelly: %w", err) + } + defer c.Close() + + err = c.WriteMessage(websocket.TextMessage, message) + if err != nil { + return fmt.Errorf("Failed writing websocket message to Shelly: %w", err) + } + + return nil +} -- cgit v1.3