diff options
| -rw-r--r-- | README.md | 14 | ||||
| -rw-r--r-- | config.go | 15 | ||||
| -rw-r--r-- | configs/meson.build | 2 | ||||
| -rw-r--r-- | configs/valid/mqtt-topic-prefix-max-characters.json | 11 | ||||
| -rw-r--r-- | configs/valid/shelly.json | 21 | ||||
| -rw-r--r-- | main.go | 4 | ||||
| -rw-r--r-- | meson.build | 1 | ||||
| -rw-r--r-- | mqtt.go | 31 | ||||
| -rw-r--r-- | shelly.go | 53 |
9 files changed, 143 insertions, 9 deletions
@@ -112,6 +112,20 @@ For all terms not explained here see the [MQTT version 3.1.1 documentation][9]. - `open`: contact is open - `closed`: contact is closed +### `/cover/<id>/movement` + +- description: Allows control of Shelly 2PM Gen3 covers +- direction: client to Sia server +- Quality of Service: QoS 2 (exactly once) +- retained: no +- receives will message: no +- topic parameters: + - `id`: ID of the Shelly 2PM Gen3 cover +- payloads: + - `extend`: cover increases the covering surface + - `retract`: cover decreases the covering surface + - `stop`: cover stops current motion if given + [1]: https://homematic-ip.com/ [2]: https://openccu.de/ [3]: https://homematic-ip.com/en/product/window-and-door-contact-optical @@ -45,9 +45,17 @@ type HomematicConfig struct { PollingPeriod string `json:"polling-period"` } +type ShellyConfig struct { + ID string `json:"id"` + IP string `json:"ip"` +} + +type ShellyConfigs []ShellyConfig + type StartupConfig struct { MQTT MQTTConfig `json:"mqtt"` Homematic HomematicConfig `json:"homematic"` + Shelly ShellyConfigs `json:"shelly"` } func (sc StartupConfig) String() string { @@ -117,6 +125,13 @@ func (sc StartupConfig) Validate() error { return fmt.Errorf("homematic/polling-period configuration '%s' could not be parsed to duration: %v", sc.Homematic.PollingPeriod, err) } + for _, shelly := range sc.Shelly { + ip := net.ParseIP(shelly.IP) + if ip == nil { + return fmt.Errorf("Failed to parse IP address '%s'.", shelly.IP) + } + } + return nil } diff --git a/configs/meson.build b/configs/meson.build index 98110da..aa16a47 100644 --- a/configs/meson.build +++ b/configs/meson.build @@ -1,3 +1,3 @@ fs = import('fs') -default_config = fs.copyfile(meson.current_source_dir() / 'default.json') +default_config = fs.copyfile(meson.current_source_dir() / 'valid' / 'default.json') diff --git a/configs/valid/mqtt-topic-prefix-max-characters.json b/configs/valid/mqtt-topic-prefix-max-characters.json new file mode 100644 index 0000000..99f3cf1 --- /dev/null +++ b/configs/valid/mqtt-topic-prefix-max-characters.json @@ -0,0 +1,11 @@ +{ + "mqtt": { + "broker": "tcp://127.0.0.1:1883", + "client-id": "siaserver", + "topic-prefix": "aaaaaaaaaaaaaaaaaaaa" + }, + "homematic": { + "ccu": "http://127.0.0.1:8080", + "polling-period": "50ms" + } +} diff --git a/configs/valid/shelly.json b/configs/valid/shelly.json new file mode 100644 index 0000000..578f6dc --- /dev/null +++ b/configs/valid/shelly.json @@ -0,0 +1,21 @@ +{ + "mqtt": { + "broker": "tcp://127.0.0.1:1883", + "client-id": "siaserver", + "topic-prefix": "sia" + }, + "homematic": { + "ccu": "http://127.0.0.1:8080", + "polling-period": "50ms" + }, + "shelly": [ + { + "id": "shelly1", + "ip": "192.168.1.20" + }, + { + "id": "shelly2", + "ip": "2001:db8::68" + } + ] +} @@ -18,10 +18,12 @@ func main() { config := GetStartupConfig(flags.ConfigPath) + rx := make(chan MQTTMessage) tx := make(chan MQTTMessage) - go MQTTRun(config.MQTT, tx) + go MQTTRun(config.MQTT, rx, tx) go HomematicRun(config.Homematic, tx) + go ShellyRun(config.Shelly, rx) Await(syscall.SIGTERM, syscall.SIGINT) } diff --git a/meson.build b/meson.build index 4bc72bd..e8878ce 100644 --- a/meson.build +++ b/meson.build @@ -23,6 +23,7 @@ sia_server_linux_amd64 = custom_target( meson.current_source_dir() / 'homematic.go', meson.current_source_dir() / 'config.go', meson.current_source_dir() / 'flags.go', + meson.current_source_dir() / 'shelly.go', ], output : 'sia-server-linux-amd64', env : {'GOOS': 'linux', 'GOARCH': 'amd64'}, @@ -4,6 +4,7 @@ import ( "fmt" "log" "time" + "strings" mqtt "github.com/eclipse/paho.mqtt.golang" ) @@ -12,6 +13,7 @@ const ( QOS = byte(1) RETAINED = true MQTT_CONNECT_TIMEOUT = 1 * time.Second + MQTT_SUBSCRIBE_TIMEOUT = 1 * time.Second MQTT_DISCONNECT_TIMEOUT_US = 500 MQTT_KEEPALIVE_PERIOD = 2 * time.Second ) @@ -25,14 +27,34 @@ type MQTTMessage struct { Payload []byte } -func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { +func (m MQTTMessage) String() string { + return fmt.Sprintf("topic='%s' message='%s'", m.Topic, string(m.Payload)) +} + +func MQTTRun(config MQTTConfig, rx chan MQTTMessage, tx chan MQTTMessage) { mqttServerHealthTopic = fmt.Sprintf("%s/server/health", config.TopicPrefix) opts := mqtt.NewClientOptions() opts.AddBroker(config.Broker) opts.SetClientID(config.ClientID) opts.SetCleanSession(true) - opts.SetOnConnectHandler(MQTTOnConnectHandler) + opts.SetOnConnectHandler(func(c mqtt.Client) { + log.Printf("Connected to MQTT broker.") + c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) + + topic := fmt.Sprintf("%s/cover/+/movement", config.TopicPrefix) + token := c.Subscribe(topic, byte(2), func(c mqtt.Client, msg mqtt.Message) { + message := MQTTMessage{ + Topic: strings.TrimPrefix(msg.Topic(), config.TopicPrefix + "/"), + Payload: msg.Payload(), + } + rx <- message + }) + success := token.WaitTimeout(MQTT_SUBSCRIBE_TIMEOUT) + if !success { + log.Fatal("Initial topic subscription failed.") + } + }) opts.SetConnectionLostHandler(MQTTConnectionLostHandler) opts.SetAutoReconnect(true) opts.SetConnectRetry(true) @@ -56,11 +78,6 @@ func MQTTRun(config MQTTConfig, tx chan MQTTMessage) { } } -func MQTTOnConnectHandler(c mqtt.Client) { - log.Printf("Connected to MQTT broker.") - c.Publish(mqttServerHealthTopic, QOS, true, []byte(`good`)) -} - func MQTTConnectionLostHandler(c mqtt.Client, err error) { log.Printf("Connection to MQTT broker lost: %v", err) } diff --git a/shelly.go b/shelly.go new file mode 100644 index 0000000..0f133bc --- /dev/null +++ b/shelly.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + "log" + "net" + "strings" +) + +func ShellyRun(config ShellyConfigs, rx chan MQTTMessage) { + for message := range rx { + ip, command, err := parseMessage(config, message) + if err != nil { + log.Println(err) + continue + } + + log.Printf("Send '%s' to '%s'.", command, ip) + } +} + +func parseMessage(config ShellyConfigs, m MQTTMessage) (ip *net.IP, command string, err error) { + elements := strings.Split(m.Topic, "/") + + if len(elements) != 3 { + return nil, "", fmt.Errorf( + "Expected three topic levels but got %d in '%s'.", + len(elements), m.Topic, + ) + } + + switch string(m.Payload) { + case "extend": + command = "Cover.Close" + case "retract": + command = "Cover.Open" + case "stop": + command = "Cover.Stop" + default: + return nil, "", fmt.Errorf("Invalid payload '%s'.", m.Payload) + } + + id := elements[1] + + for _, c := range config { + if c.ID == id { + ip := net.ParseIP(c.IP) + return &ip, command, nil + } + } + + return nil, "", fmt.Errorf("Got message for unknown cover '%s'", id) +} |
