summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxengineering <me@xengineering.eu>2026-03-25 20:33:22 +0100
committerxengineering <me@xengineering.eu>2026-03-25 21:09:11 +0100
commit476db7047a9c650057c034c647ea66f3c38e8a53 (patch)
tree72fd518f6549943ba62ec724b8b51421a537f0ca
parentbfd840bfd843f95183568f7ef6a9880a810ce049 (diff)
downloadsia-server-476db7047a9c650057c034c647ea66f3c38e8a53.tar
sia-server-476db7047a9c650057c034c647ea66f3c38e8a53.tar.zst
sia-server-476db7047a9c650057c034c647ea66f3c38e8a53.zip
Refactor and add routing concept
This scales better when additional receiving routes will be added.
-rw-r--r--main.go6
-rw-r--r--mqtt.go44
-rw-r--r--shelly.go4
3 files changed, 34 insertions, 20 deletions
diff --git a/main.go b/main.go
index e123c83..1678eac 100644
--- a/main.go
+++ b/main.go
@@ -18,12 +18,12 @@ func main() {
config := GetStartupConfig(flags.ConfigPath)
- rx := make(chan MQTTMessage)
tx := make(chan MQTTMessage)
+ coverMovement := NewRoute("cover/+/movement", QoS2)
- go MQTTRun(config.MQTT, rx, tx)
+ go MQTTRun(config.MQTT, tx, coverMovement)
go HomematicRun(config.Homematic, tx)
- go ShellyRun(config.Shelly, rx)
+ go ShellyRun(config.Shelly, coverMovement)
Await(syscall.SIGTERM, syscall.SIGINT)
}
diff --git a/mqtt.go b/mqtt.go
index 7ad47ba..fd1cba1 100644
--- a/mqtt.go
+++ b/mqtt.go
@@ -10,7 +10,9 @@ import (
)
const (
- QOS = byte(1)
+ QoS0 = byte(0)
+ QoS1 = byte(1)
+ QoS2 = byte(2)
RETAINED = true
MQTT_CONNECT_TIMEOUT = 1 * time.Second
MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second
@@ -27,11 +29,21 @@ type MQTTMessage struct {
Payload []byte
}
+type Route struct {
+ Topic string
+ QoS byte
+ Destination chan MQTTMessage
+}
+
+func NewRoute(topic string, qos byte) Route {
+ return Route{topic, qos, make(chan MQTTMessage)}
+}
+
func (m MQTTMessage) String() string {
return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload))
}
-func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) {
+func MQTTRun(config MQTTConfig, tx chan MQTTMessage, routes ...Route) {
mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix)
opts := mqtt.NewClientOptions()
@@ -40,19 +52,21 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) {
opts.SetCleanSession(true)
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Printf("Connected to MQTT broker.")
- c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`))
+ c.Publish(mqttServerHealthTopic, QoS1, true, []byte(`good`))
- topic := fmt.Sprintf("%s/cover/+/movement", config.TopicPrefix)
- token := c.Subscribe(topic, byte(2), func(c mqtt.Client, msg mqtt.Message) {
- message := MQTTMessage{
- Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"),
- Payload: msg.Payload(),
+ for _, route := range routes {
+ topic := config.TopicPrefix + "/" + route.Topic
+ token := c.Subscribe(topic, route.QoS, func(c mqtt.Client, msg mqtt.Message) {
+ message := MQTTMessage{
+ Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"),
+ Payload: msg.Payload(),
+ }
+ route.Destination <- message
+ })
+ success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT)
+ if !success {
+ log.Fatalf("Topic subscription failed for topic '%s'", topic)
}
- rx <- message
- })
- success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT)
- if !success {
- log.Fatal("Initial topic subscription failed.")
}
})
opts.SetConnectionLostHandler(MQTTConnectionLostHandler)
@@ -60,7 +74,7 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) {
opts.SetConnectRetry(true)
opts.SetConnectTimeout(MQTT_CONNECT_TIMEOUT)
opts.SetKeepAlive(MQTT_KEEPALIVE_PERIOD)
- opts.SetWill(mqttServerHealthTopic, `bad`, QOS, true)
+ opts.SetWill(mqttServerHealthTopic, `bad`, QoS1, true)
client := mqtt.NewClient(opts)
@@ -74,7 +88,7 @@ func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) {
for message := range tx {
topic := fmt.Sprintf("%s/%s", config.TopicPrefix, message.Topic)
- client.Publish(topic, QOS, RETAINED, message.Payload)
+ client.Publish(topic, QoS1, RETAINED, message.Payload)
}
}
diff --git a/shelly.go b/shelly.go
index 7396bbb..5f6f2ad 100644
--- a/shelly.go
+++ b/shelly.go
@@ -7,8 +7,8 @@ import (
"strings"
)
-func ShellyRun(config ShellyConfigs, rx chan MQTTMessage) {
- for message := range rx {
+func ShellyRun(config ShellyConfigs, route Route) {
+ for message := range route.Destination {
ip, command, err := parseMessage(config, message)
if err != nil {
log.Println(err)