summaryrefslogtreecommitdiff
path: root/fw/src
diff options
context:
space:
mode:
Diffstat (limited to 'fw/src')
-rw-r--r--fw/src/http.c15
-rw-r--r--fw/src/ws.c251
-rw-r--r--fw/src/ws.h17
3 files changed, 283 insertions, 0 deletions
diff --git a/fw/src/http.c b/fw/src/http.c
index 96a70fe..b9a76ba 100644
--- a/fw/src/http.c
+++ b/fw/src/http.c
@@ -13,6 +13,8 @@
#include <zephyr/net/http/service.h>
#include <zephyr/net/http/status.h>
+#include "ws.h"
+
LOG_MODULE_REGISTER(http);
@@ -59,11 +61,24 @@ static struct http_resource_detail_dynamic favicon_resource_detail = {
.user_data = NULL,
};
+static uint8_t websocket_read_buffer[1024];
+
+struct http_resource_detail_websocket websocket_resource_detail = {
+ .common = {
+ .type = HTTP_RESOURCE_TYPE_WEBSOCKET,
+ .bitmask_of_supported_http_methods = BIT(HTTP_GET),
+ },
+ .cb = ws_upgrade_handler,
+ .data_buffer = websocket_read_buffer,
+ .data_buffer_len = sizeof(websocket_read_buffer),
+};
+
static uint16_t http_port = 80;
HTTP_SERVICE_DEFINE(http_service, NULL, &http_port, 1, 10, NULL, NULL);
HTTP_RESOURCE_DEFINE(index_resource, http_service, "/", &index_resource_detail);
+HTTP_RESOURCE_DEFINE(websocket_resource, http_service, "/", &websocket_resource_detail);
HTTP_RESOURCE_DEFINE(favicon_resource, http_service, "/favicon.ico", &favicon_resource_detail);
int init_http_server(void) {
diff --git a/fw/src/ws.c b/fw/src/ws.c
new file mode 100644
index 0000000..6f68538
--- /dev/null
+++ b/fw/src/ws.c
@@ -0,0 +1,251 @@
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public License,
+ * v. 2.0. If a copy of the MPL was not distributed with this file, You can
+ * obtain one at https://mozilla.org/MPL/2.0/.
+ */
+
+
+#include <errno.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <zephyr/kernel.h>
+#include <zephyr/logging/log.h>
+#include <zephyr/net/http/server.h>
+#include <zephyr/net/websocket.h>
+#include <zephyr/sys_clock.h>
+#include <zephyr/zbus/zbus.h>
+
+#include "heart.h"
+
+
+LOG_MODULE_REGISTER(ws);
+
+
+#define WS_NUM_HANDLERS 1
+#define WS_HANDLER_STACK_SIZE 2048
+#define WS_HANDLER_PRIO K_PRIO_PREEMPT(8)
+#define WS_CONNECTION_BACKLOG_LEN 5
+#define WS_TX_BUFFER_LEN 150
+#define WS_CLIENT_HEARTBEAT_GRACE_TIME_MS 10000
+
+K_MSGQ_DEFINE(ws_connection_backlog, sizeof(int), WS_CONNECTION_BACKLOG_LEN, 1);
+ZBUS_MSG_SUBSCRIBER_DEFINE(ws_tx_messages);
+ZBUS_CHAN_ADD_OBS(heartbeat_channel, ws_tx_messages, 3);
+
+static void ws_tx_thread_function(void *ptr1, void *ptr2, void *ptr3) {
+ int fd = -1;
+ struct k_msgq *tx_fds = ptr1;
+ uint8_t buffer[WS_TX_BUFFER_LEN];
+ const struct zbus_channel *chan;
+ struct heartbeat heartbeat;
+
+ while (true) {
+ int ret = zbus_obs_set_enable(&ws_tx_messages, false);
+ if (ret < 0) {
+ LOG_ERR("Could not disable zbus observer (%d)", ret);
+ continue;
+ }
+
+ ret = k_msgq_get(tx_fds, &fd, K_FOREVER);
+ if (ret < 0) {
+ LOG_ERR("Error getting new file descriptor (%d)", ret);
+ continue;
+ }
+ LOG_INF("Handling TX of connection with file descriptor %d", fd);
+
+ ret = zbus_obs_set_enable(&ws_tx_messages, true);
+ if (ret < 0) {
+ LOG_ERR("Could not enable zbus observer (%d)", ret);
+ continue;
+ }
+
+ while (true) {
+ ret = zbus_sub_wait_msg(&ws_tx_messages, &chan, &heartbeat, K_FOREVER);
+ if (ret < 0) {
+ LOG_ERR("Could not wait for zbus messages (%d)", ret);
+ break;
+ }
+ if (chan != &heartbeat_channel) {
+ LOG_ERR("Unsupported channel");
+ continue;
+ }
+
+ ret = snprintf((char *)buffer, sizeof(buffer),
+ "{\"type\":0,\"ttl_ms\":%d}\n", heartbeat.ttl_ms);
+ if (ret < 0) {
+ LOG_ERR("Could not serialize heartbeat message");
+ continue;
+ }
+ size_t len = ret;
+
+ int ret = websocket_send_msg(
+ fd, (const uint8_t *)buffer, len,
+ WEBSOCKET_OPCODE_DATA_TEXT, false, true, 100
+ );
+ if (ret < 0) {
+ if (ret == -EBADF) {
+ LOG_DBG("Connection closed, waiting for new one");
+ } else {
+ LOG_ERR("Error on websocket_send_msg (%d)", ret);
+ }
+ break;
+ }
+ }
+ }
+}
+
+static int ws_recv_full_message(int fd, void *buf, const size_t len,
+ uint32_t *message_type, int64_t deadline) {
+ uint64_t remaining = 0;
+ size_t read = 0;
+ int ret;
+
+ while (true) {
+ int64_t timeout = MIN(50, deadline - k_uptime_get());
+ if (timeout <= 0) {
+ ret = -ETIMEDOUT;
+ LOG_ERR("No time left to read message from %d (%d)", fd, ret);
+ return ret;
+ }
+
+ if (read >= len) {
+ ret = -ENOSPC;
+ LOG_ERR("No buffer space left to store message from %d (%d)", fd, ret);
+ return ret;
+ }
+
+ ret = websocket_recv_msg(fd, (unsigned char *)buf+read, len-read,
+ message_type, &remaining, timeout);
+ if (ret < 0) {
+ if (ret == -EAGAIN || ret == -EINTR) {
+ LOG_DBG("Trying again to read full message");
+ continue;
+ } else {
+ LOG_ERR("Unhandled error on message reading (%d)", ret);
+ return ret;
+ }
+ }
+
+ read += ret;
+
+ if (remaining <= 0) {
+ return read;
+ }
+ }
+}
+
+static int ws_handle_data_text(const uint8_t *buf, size_t len, int64_t *deadline) {
+ LOG_DBG("Got text message");
+
+ static const char heartbeat[] = "{\"type\":0,\"ttl_ms\":1100}\n";
+
+ if (len + 1 == sizeof(heartbeat)) {
+ if (memcmp((const void *)buf, heartbeat, len) == 0) {
+ LOG_DBG("Received heartbeat from client");
+ *deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS;
+ return 0;
+ }
+ }
+
+ LOG_HEXDUMP_DBG(buf, len, "Could not handle message");
+ return -EIO;
+}
+
+static int ws_rx_handle_connection(int fd) {
+ LOG_INF("Handling RX of connection with file descriptor %d", fd);
+
+ uint8_t rx_buf[100];
+ uint32_t message_type = 0;
+ int64_t rx_heartbeat_deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS;
+
+ while (true) {
+ memset(rx_buf, 0, sizeof(rx_buf));
+
+ int ret = ws_recv_full_message(fd, rx_buf, sizeof(rx_buf),
+ &message_type, rx_heartbeat_deadline);
+ if (ret < 0) {
+ LOG_ERR("Could not receive full message from %d (%d)", fd, ret);
+ return ret;
+ }
+ size_t len = ret;
+ LOG_DBG("Received message with opcode %d of length %d", message_type, len);
+
+ if (message_type == WEBSOCKET_OPCODE_DATA_TEXT ||
+ message_type == 3) { // FIXME for some reason opcode 1 parses to 3
+ ret = ws_handle_data_text(rx_buf, len, &rx_heartbeat_deadline);
+ if (ret < 0) {
+ LOG_ERR("Failed to handle text data (%d)", ret);
+ }
+ } else if (message_type == WEBSOCKET_OPCODE_CLOSE) {
+ LOG_INF("Client closed connection");
+ return 0;
+ } else {
+ LOG_WRN("Received unhandled message opcode %d", message_type);
+ LOG_HEXDUMP_WRN(rx_buf, len, "Message content:");
+ }
+
+ if (k_uptime_get() > rx_heartbeat_deadline) {
+ LOG_INF("Client heartbeat timeout expired on %d - closing", fd);
+ return -ETIMEDOUT;
+ }
+ }
+}
+
+static void ws_rx_thread_function(void *ptr1, void *ptr2, void *ptr3) {
+ struct k_msgq *tx_fds = ptr1;
+
+ while (true) {
+ int fd;
+ int ret = k_msgq_get(&ws_connection_backlog, &fd, K_FOREVER);
+ if (ret < 0) {
+ LOG_ERR("Error getting new file descriptor (%d)", ret);
+ continue;
+ }
+
+ ret = k_msgq_put(tx_fds, (const void *)&fd, K_FOREVER);
+ if (ret < 0) {
+ LOG_ERR("Could not pass file descriptor %d to TX thread (%d)", fd, ret);
+ goto unregister;
+ }
+
+ ret = ws_rx_handle_connection(fd);
+ if (ret < 0) {
+ LOG_ERR("Failed to handle connection %d (%d)", fd, ret);
+ goto unregister;
+ }
+
+unregister:
+ ret = websocket_unregister(fd);
+ if (ret < 0) {
+ LOG_ERR("Failed to unregister connection %d (%d)", fd, ret);
+ }
+ }
+}
+
+K_MSGQ_DEFINE(ws_tx_fd_1, sizeof(int), 1, 1);
+K_THREAD_DEFINE(ws_rx_thread_1, WS_HANDLER_STACK_SIZE,
+ ws_rx_thread_function, &ws_tx_fd_1, NULL, NULL,
+ WS_HANDLER_PRIO, 0, 0);
+K_THREAD_DEFINE(ws_tx_thread_1, WS_HANDLER_STACK_SIZE,
+ ws_tx_thread_function, &ws_tx_fd_1, NULL, NULL,
+ WS_HANDLER_PRIO, 0, 0);
+
+int ws_upgrade_handler(
+ int ws_socket,
+ struct http_request_ctx *request_ctx,
+ void *user_data
+) {
+ LOG_DBG("Handling WebSocket upgrade for file descriptor %d", ws_socket);
+
+ int ret = k_msgq_put(&ws_connection_backlog, (const void *)&ws_socket,
+ K_NO_WAIT);
+ if (ret < 0) {
+ LOG_ERR("Connection backlog full, dropping file descriptor %d", ws_socket);
+ return -ENOENT;
+ }
+
+ LOG_INF("Added file descriptor %d to connection backlog", ws_socket);
+ return 0;
+}
diff --git a/fw/src/ws.h b/fw/src/ws.h
new file mode 100644
index 0000000..0c13039
--- /dev/null
+++ b/fw/src/ws.h
@@ -0,0 +1,17 @@
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public License,
+ * v. 2.0. If a copy of the MPL was not distributed with this file, You can
+ * obtain one at https://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef SRC_WS_H
+#define SRC_WS_H
+
+
+#include <zephyr/net/http/server.h>
+
+
+int ws_upgrade_handler(int ws_socket, struct http_request_ctx *request_ctx,
+ void *user_data);
+
+#endif // !SRC_WS_H