summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxengineering <me@xengineering.eu>2025-12-20 13:27:42 +0100
committerxengineering <me@xengineering.eu>2025-12-20 13:27:42 +0100
commit96299467958aaffdbef5cb8ae780d3abeddfcaba (patch)
tree95e3ff2ccb6bdaffe0f1afedebb6794b72cd6865
parentfaf9fb7a8c5e8b31e9c1104b42d7e550e986ca61 (diff)
downloadsia-server-96299467958aaffdbef5cb8ae780d3abeddfcaba.tar
sia-server-96299467958aaffdbef5cb8ae780d3abeddfcaba.tar.zst
sia-server-96299467958aaffdbef5cb8ae780d3abeddfcaba.zip
Separate MQTT logic
This reduces the coupling between the MQTT-related code and everything else to a single `tx` channel of type `MQTTMessage`. This improves the code quality significantly.
-rw-r--r--cache.go13
-rw-r--r--main.go57
-rw-r--r--meson.build1
-rw-r--r--mqtt.go51
4 files changed, 67 insertions, 55 deletions
diff --git a/cache.go b/cache.go
index 31c8ba5..e5a4e4f 100644
--- a/cache.go
+++ b/cache.go
@@ -3,21 +3,19 @@ package main
import (
"fmt"
"log"
-
- mqtt "github.com/eclipse/paho.mqtt.golang"
)
type States map[string]bool
type Cache struct {
- Client mqtt.Client
+ Tx chan MQTTMessage
States map[string]bool
}
-func NewCache(client mqtt.Client) Cache {
+func NewCache(tx chan MQTTMessage) Cache {
var cache Cache
- cache.Client = client
+ cache.Tx = tx
cache.States = make(States)
return cache
@@ -42,7 +40,10 @@ func (c *Cache) Update(states States) {
} else {
payload = []byte("closed")
}
- _ = c.Client.Publish(topic, QOS, RETAINED, payload)
+ c.Tx <- MQTTMessage{
+ Topic: topic,
+ Payload: payload,
+ }
}
c.States[id] = state
}
diff --git a/main.go b/main.go
index 06ace4a..722bd40 100644
--- a/main.go
+++ b/main.go
@@ -8,19 +8,11 @@ import (
"syscall"
"time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
"xengineering.eu/homematic-go/homematic"
)
const (
OPENCCU = `http://127.0.0.1:8080`
- BROKER = `tcp://127.0.0.1:1883`
- CLIENT_ID = `siaserver`
- TOPIC_PREFIX = `sia`
- QOS = byte(1)
- RETAINED = true
- MQTT_CONNECT_TIMEOUT = time.Second * 5
- MQTT_DISCONNECT_TIMEOUT_US = 500
POLLING_PERIOD = 50 * time.Millisecond
)
@@ -29,13 +21,15 @@ func main() {
defer log.Println("--- Stopped Sia server ---")
go func() {
- req, inventory, client, err := Start()
- defer Stop(client)
+ tx := make(chan MQTTMessage)
+ go MQTTRun(tx)
+
+ req, inventory, err := Start()
if err != nil {
log.Fatalf("Failed startup process: %v", err)
}
- cache := NewCache(client)
+ cache := NewCache(tx)
for {
start := time.Now()
@@ -54,33 +48,9 @@ func main() {
Await(syscall.SIGTERM, syscall.SIGINT)
}
-func ConnectMQTT(broker string, id string) (mqtt.Client, error) {
- opts := mqtt.NewClientOptions()
- opts.AddBroker(broker)
- opts.SetClientID(id)
- opts.SetCleanSession(true)
-
- client := mqtt.NewClient(opts)
-
- token := client.Connect()
-
- success := token.WaitTimeout(MQTT_CONNECT_TIMEOUT)
- if !success {
- return client, fmt.Errorf("Timed out after %v.", MQTT_CONNECT_TIMEOUT)
- }
-
- err := token.Error()
- if err != nil {
- return client, err
- }
-
- return client, nil
-}
-
-func Start() (homematic.Requester, homematic.Devices, mqtt.Client, error) {
+func Start() (homematic.Requester, homematic.Devices, error) {
var req homematic.Requester
var inventory homematic.Devices
- var client mqtt.Client
var err error
req = homematic.NewRequester(OPENCCU)
@@ -88,22 +58,11 @@ func Start() (homematic.Requester, homematic.Devices, mqtt.Client, error) {
inventory, err = req.ListDevices()
if err != nil {
- return req, inventory, client, fmt.Errorf("Failed getting initial device list: %w", err)
+ return req, inventory, fmt.Errorf("Failed getting initial device list: %w", err)
}
log.Printf("Retrieved Homematic inventory with %d devices.", len(inventory))
- client, err = ConnectMQTT(BROKER, CLIENT_ID)
- if err != nil {
- return req, inventory, client, fmt.Errorf("Failed connecting to MQTT broker: %w", err)
- }
- log.Printf("Connected to MQTT broker (%s).", BROKER)
-
- return req, inventory, client, nil
-}
-
-func Stop(c mqtt.Client) {
- c.Disconnect(MQTT_DISCONNECT_TIMEOUT_US)
- log.Println("Disconnected from MQTT broker.")
+ return req, inventory, nil
}
func Poll(req homematic.Requester, inventory homematic.Devices) (States, error) {
diff --git a/meson.build b/meson.build
index e2ce711..6200e2f 100644
--- a/meson.build
+++ b/meson.build
@@ -6,6 +6,7 @@ sia_server = custom_target(
input : [
meson.current_source_dir() / 'main.go',
meson.current_source_dir() / 'cache.go',
+ meson.current_source_dir() / 'mqtt.go',
],
output : 'sia-server',
command : [
diff --git a/mqtt.go b/mqtt.go
new file mode 100644
index 0000000..2fd4d13
--- /dev/null
+++ b/mqtt.go
@@ -0,0 +1,51 @@
+package main
+
+import (
+ "log"
+ "time"
+
+ mqtt "github.com/eclipse/paho.mqtt.golang"
+)
+
+const (
+ BROKER = `tcp://127.0.0.1:1883`
+ CLIENT_ID = `siaserver`
+ TOPIC_PREFIX = `sia`
+ QOS = byte(1)
+ RETAINED = true
+ MQTT_CONNECT_TIMEOUT = 1 * time.Second
+ MQTT_DISCONNECT_TIMEOUT_US = 500
+ MQTT_RECONNECT_PERIOD = 2 * time.Second
+)
+
+type MQTTMessage struct {
+ Topic string
+ Payload []byte
+}
+
+func MQTTRun(tx chan MQTTMessage) {
+ opts := mqtt.NewClientOptions()
+ opts.AddBroker(BROKER)
+ opts.SetClientID(CLIENT_ID)
+ opts.SetCleanSession(true)
+ opts.SetOnConnectHandler(MQTTOnConnectHandler)
+
+ client := mqtt.NewClient(opts)
+
+ token := client.Connect()
+
+ success := token.WaitTimeout(MQTT_CONNECT_TIMEOUT)
+ if !success {
+ log.Printf("Timed out after %v.", MQTT_CONNECT_TIMEOUT)
+ return
+ }
+ defer client.Disconnect(MQTT_DISCONNECT_TIMEOUT_US)
+
+ for message := range tx {
+ client.Publish(message.Topic, QOS, RETAINED, message.Payload)
+ }
+}
+
+func MQTTOnConnectHandler(c mqtt.Client) {
+ log.Printf("Connected to MQTT broker (%s)", BROKER)
+}