diff options
| -rw-r--r-- | cache.go | 13 | ||||
| -rw-r--r-- | main.go | 57 | ||||
| -rw-r--r-- | meson.build | 1 | ||||
| -rw-r--r-- | mqtt.go | 51 |
4 files changed, 67 insertions, 55 deletions
@@ -3,21 +3,19 @@ package main import ( "fmt" "log" - - mqtt "github.com/eclipse/paho.mqtt.golang" ) type States map[string]bool type Cache struct { - Client mqtt.Client + Tx chan MQTTMessage States map[string]bool } -func NewCache(client mqtt.Client) Cache { +func NewCache(tx chan MQTTMessage) Cache { var cache Cache - cache.Client = client + cache.Tx = tx cache.States = make(States) return cache @@ -42,7 +40,10 @@ func (c *Cache) Update(states States) { } else { payload = []byte("closed") } - _ = c.Client.Publish(topic, QOS, RETAINED, payload) + c.Tx <- MQTTMessage{ + Topic: topic, + Payload: payload, + } } c.States[id] = state } @@ -8,19 +8,11 @@ import ( "syscall" "time" - mqtt "github.com/eclipse/paho.mqtt.golang" "xengineering.eu/homematic-go/homematic" ) const ( OPENCCU = `http://127.0.0.1:8080` - BROKER = `tcp://127.0.0.1:1883` - CLIENT_ID = `siaserver` - TOPIC_PREFIX = `sia` - QOS = byte(1) - RETAINED = true - MQTT_CONNECT_TIMEOUT = time.Second * 5 - MQTT_DISCONNECT_TIMEOUT_US = 500 POLLING_PERIOD = 50 * time.Millisecond ) @@ -29,13 +21,15 @@ func main() { defer log.Println("--- Stopped Sia server ---") go func() { - req, inventory, client, err := Start() - defer Stop(client) + tx := make(chan MQTTMessage) + go MQTTRun(tx) + + req, inventory, err := Start() if err != nil { log.Fatalf("Failed startup process: %v", err) } - cache := NewCache(client) + cache := NewCache(tx) for { start := time.Now() @@ -54,33 +48,9 @@ func main() { Await(syscall.SIGTERM, syscall.SIGINT) } -func ConnectMQTT(broker string, id string) (mqtt.Client, error) { - opts := mqtt.NewClientOptions() - opts.AddBroker(broker) - opts.SetClientID(id) - opts.SetCleanSession(true) - - client := mqtt.NewClient(opts) - - token := client.Connect() - - success := token.WaitTimeout(MQTT_CONNECT_TIMEOUT) - if !success { - return client, fmt.Errorf("Timed out after %v.", MQTT_CONNECT_TIMEOUT) - } - - err := token.Error() - if err != nil { - return client, err - } - - return client, nil -} - -func Start() (homematic.Requester, homematic.Devices, mqtt.Client, error) { +func Start() (homematic.Requester, homematic.Devices, error) { var req homematic.Requester var inventory homematic.Devices - var client mqtt.Client var err error req = homematic.NewRequester(OPENCCU) @@ -88,22 +58,11 @@ func Start() (homematic.Requester, homematic.Devices, mqtt.Client, error) { inventory, err = req.ListDevices() if err != nil { - return req, inventory, client, fmt.Errorf("Failed getting initial device list: %w", err) + return req, inventory, fmt.Errorf("Failed getting initial device list: %w", err) } log.Printf("Retrieved Homematic inventory with %d devices.", len(inventory)) - client, err = ConnectMQTT(BROKER, CLIENT_ID) - if err != nil { - return req, inventory, client, fmt.Errorf("Failed connecting to MQTT broker: %w", err) - } - log.Printf("Connected to MQTT broker (%s).", BROKER) - - return req, inventory, client, nil -} - -func Stop(c mqtt.Client) { - c.Disconnect(MQTT_DISCONNECT_TIMEOUT_US) - log.Println("Disconnected from MQTT broker.") + return req, inventory, nil } func Poll(req homematic.Requester, inventory homematic.Devices) (States, error) { diff --git a/meson.build b/meson.build index e2ce711..6200e2f 100644 --- a/meson.build +++ b/meson.build @@ -6,6 +6,7 @@ sia_server = custom_target( input : [ meson.current_source_dir() / 'main.go', meson.current_source_dir() / 'cache.go', + meson.current_source_dir() / 'mqtt.go', ], output : 'sia-server', command : [ @@ -0,0 +1,51 @@ +package main + +import ( + "log" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +const ( + BROKER = `tcp://127.0.0.1:1883` + CLIENT_ID = `siaserver` + TOPIC_PREFIX = `sia` + QOS = byte(1) + RETAINED = true + MQTT_CONNECT_TIMEOUT = 1 * time.Second + MQTT_DISCONNECT_TIMEOUT_US = 500 + MQTT_RECONNECT_PERIOD = 2 * time.Second +) + +type MQTTMessage struct { + Topic string + Payload []byte +} + +func MQTTRun(tx chan MQTTMessage) { + opts := mqtt.NewClientOptions() + opts.AddBroker(BROKER) + opts.SetClientID(CLIENT_ID) + opts.SetCleanSession(true) + opts.SetOnConnectHandler(MQTTOnConnectHandler) + + client := mqtt.NewClient(opts) + + token := client.Connect() + + success := token.WaitTimeout(MQTT_CONNECT_TIMEOUT) + if !success { + log.Printf("Timed out after %v.", MQTT_CONNECT_TIMEOUT) + return + } + defer client.Disconnect(MQTT_DISCONNECT_TIMEOUT_US) + + for message := range tx { + client.Publish(message.Topic, QOS, RETAINED, message.Payload) + } +} + +func MQTTOnConnectHandler(c mqtt.Client) { + log.Printf("Connected to MQTT broker (%s)", BROKER) +} |
