diff options
author | xengineering <me@xengineering.eu> | 2025-03-21 22:33:41 +0100 |
---|---|---|
committer | xengineering <me@xengineering.eu> | 2025-03-21 22:33:41 +0100 |
commit | e042129eddeb06c9272a6544c67f9222c347ab10 (patch) | |
tree | 6fdd4e337485ef08961d3145479621ffd4a53184 | |
parent | 58e8dd56e233f482dc0dbe0cb8f56ef3bab7b998 (diff) | |
download | iot-contact-e042129eddeb06c9272a6544c67f9222c347ab10.tar iot-contact-e042129eddeb06c9272a6544c67f9222c347ab10.tar.zst iot-contact-e042129eddeb06c9272a6544c67f9222c347ab10.zip |
fw: ws: Add WebSocket interface
The primary interface for this firmware was so far HTTP. This protocol
is not suitable for small and bidirectional messages which are
time-critical.
If something like this needs to be implemented with HTTP the best
approach is likely long-polling which at least makes it possible for the
server / the firmware to send data to the client / user as reaction to
an event like a closed door sensor.
TCP would fix this issue and is a good choice. Nevertheless web clients
are not allowed to open TCP connections for security purposes.
Thus the WebSocket protocol was created to fill this gap.
To not duplicate the any effort the WebSocket API should be used for
small, time-critical messages for all clients (one with TCP support like
CLI tools as well as web clients).
HTTP is still kept to provide a web page but also for functionality
where HTTP is more suitable like firmware uploads.
-rw-r--r-- | fw/CMakeLists.txt | 1 | ||||
-rw-r--r-- | fw/prj.conf | 4 | ||||
-rw-r--r-- | fw/src/http.c | 15 | ||||
-rw-r--r-- | fw/src/ws.c | 251 | ||||
-rw-r--r-- | fw/src/ws.h | 17 |
5 files changed, 288 insertions, 0 deletions
diff --git a/fw/CMakeLists.txt b/fw/CMakeLists.txt index c0b9484..5aaf319 100644 --- a/fw/CMakeLists.txt +++ b/fw/CMakeLists.txt @@ -24,6 +24,7 @@ target_sources(app "${CMAKE_CURRENT_SOURCE_DIR}/src/syslog.c" "${CMAKE_CURRENT_SOURCE_DIR}/src/mac.c" "${CMAKE_CURRENT_SOURCE_DIR}/src/http.c" + "${CMAKE_CURRENT_SOURCE_DIR}/src/ws.c" "${CMAKE_CURRENT_SOURCE_DIR}/src/heart.c" ) diff --git a/fw/prj.conf b/fw/prj.conf index 7984caa..ac120b9 100644 --- a/fw/prj.conf +++ b/fw/prj.conf @@ -24,11 +24,15 @@ CONFIG_HTTP_PARSER=y CONFIG_HTTP_PARSER_URL=y CONFIG_HTTP_SERVER=y +CONFIG_HTTP_SERVER_WEBSOCKET=y CONFIG_FILE_SYSTEM=y CONFIG_EVENTFD=y +CONFIG_ZVFS_OPEN_MAX=32 +CONFIG_ZVFS_POLL_MAX=32 + CONFIG_ZBUS=y CONFIG_ZBUS_MSG_SUBSCRIBER=y CONFIG_HEAP_MEM_POOL_SIZE=2048 diff --git a/fw/src/http.c b/fw/src/http.c index 96a70fe..b9a76ba 100644 --- a/fw/src/http.c +++ b/fw/src/http.c @@ -13,6 +13,8 @@ #include <zephyr/net/http/service.h> #include <zephyr/net/http/status.h> +#include "ws.h" + LOG_MODULE_REGISTER(http); @@ -59,11 +61,24 @@ static struct http_resource_detail_dynamic favicon_resource_detail = { .user_data = NULL, }; +static uint8_t websocket_read_buffer[1024]; + +struct http_resource_detail_websocket websocket_resource_detail = { + .common = { + .type = HTTP_RESOURCE_TYPE_WEBSOCKET, + .bitmask_of_supported_http_methods = BIT(HTTP_GET), + }, + .cb = ws_upgrade_handler, + .data_buffer = websocket_read_buffer, + .data_buffer_len = sizeof(websocket_read_buffer), +}; + static uint16_t http_port = 80; HTTP_SERVICE_DEFINE(http_service, NULL, &http_port, 1, 10, NULL, NULL); HTTP_RESOURCE_DEFINE(index_resource, http_service, "/", &index_resource_detail); +HTTP_RESOURCE_DEFINE(websocket_resource, http_service, "/", &websocket_resource_detail); HTTP_RESOURCE_DEFINE(favicon_resource, http_service, "/favicon.ico", &favicon_resource_detail); int init_http_server(void) { diff --git a/fw/src/ws.c b/fw/src/ws.c new file mode 100644 index 0000000..6f68538 --- /dev/null +++ b/fw/src/ws.c @@ -0,0 +1,251 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at https://mozilla.org/MPL/2.0/. + */ + + +#include <errno.h> +#include <stdbool.h> +#include <stdio.h> +#include <string.h> + +#include <zephyr/kernel.h> +#include <zephyr/logging/log.h> +#include <zephyr/net/http/server.h> +#include <zephyr/net/websocket.h> +#include <zephyr/sys_clock.h> +#include <zephyr/zbus/zbus.h> + +#include "heart.h" + + +LOG_MODULE_REGISTER(ws); + + +#define WS_NUM_HANDLERS 1 +#define WS_HANDLER_STACK_SIZE 2048 +#define WS_HANDLER_PRIO K_PRIO_PREEMPT(8) +#define WS_CONNECTION_BACKLOG_LEN 5 +#define WS_TX_BUFFER_LEN 150 +#define WS_CLIENT_HEARTBEAT_GRACE_TIME_MS 10000 + +K_MSGQ_DEFINE(ws_connection_backlog, sizeof(int), WS_CONNECTION_BACKLOG_LEN, 1); +ZBUS_MSG_SUBSCRIBER_DEFINE(ws_tx_messages); +ZBUS_CHAN_ADD_OBS(heartbeat_channel, ws_tx_messages, 3); + +static void ws_tx_thread_function(void *ptr1, void *ptr2, void *ptr3) { + int fd = -1; + struct k_msgq *tx_fds = ptr1; + uint8_t buffer[WS_TX_BUFFER_LEN]; + const struct zbus_channel *chan; + struct heartbeat heartbeat; + + while (true) { + int ret = zbus_obs_set_enable(&ws_tx_messages, false); + if (ret < 0) { + LOG_ERR("Could not disable zbus observer (%d)", ret); + continue; + } + + ret = k_msgq_get(tx_fds, &fd, K_FOREVER); + if (ret < 0) { + LOG_ERR("Error getting new file descriptor (%d)", ret); + continue; + } + LOG_INF("Handling TX of connection with file descriptor %d", fd); + + ret = zbus_obs_set_enable(&ws_tx_messages, true); + if (ret < 0) { + LOG_ERR("Could not enable zbus observer (%d)", ret); + continue; + } + + while (true) { + ret = zbus_sub_wait_msg(&ws_tx_messages, &chan, &heartbeat, K_FOREVER); + if (ret < 0) { + LOG_ERR("Could not wait for zbus messages (%d)", ret); + break; + } + if (chan != &heartbeat_channel) { + LOG_ERR("Unsupported channel"); + continue; + } + + ret = snprintf((char *)buffer, sizeof(buffer), + "{\"type\":0,\"ttl_ms\":%d}\n", heartbeat.ttl_ms); + if (ret < 0) { + LOG_ERR("Could not serialize heartbeat message"); + continue; + } + size_t len = ret; + + int ret = websocket_send_msg( + fd, (const uint8_t *)buffer, len, + WEBSOCKET_OPCODE_DATA_TEXT, false, true, 100 + ); + if (ret < 0) { + if (ret == -EBADF) { + LOG_DBG("Connection closed, waiting for new one"); + } else { + LOG_ERR("Error on websocket_send_msg (%d)", ret); + } + break; + } + } + } +} + +static int ws_recv_full_message(int fd, void *buf, const size_t len, + uint32_t *message_type, int64_t deadline) { + uint64_t remaining = 0; + size_t read = 0; + int ret; + + while (true) { + int64_t timeout = MIN(50, deadline - k_uptime_get()); + if (timeout <= 0) { + ret = -ETIMEDOUT; + LOG_ERR("No time left to read message from %d (%d)", fd, ret); + return ret; + } + + if (read >= len) { + ret = -ENOSPC; + LOG_ERR("No buffer space left to store message from %d (%d)", fd, ret); + return ret; + } + + ret = websocket_recv_msg(fd, (unsigned char *)buf+read, len-read, + message_type, &remaining, timeout); + if (ret < 0) { + if (ret == -EAGAIN || ret == -EINTR) { + LOG_DBG("Trying again to read full message"); + continue; + } else { + LOG_ERR("Unhandled error on message reading (%d)", ret); + return ret; + } + } + + read += ret; + + if (remaining <= 0) { + return read; + } + } +} + +static int ws_handle_data_text(const uint8_t *buf, size_t len, int64_t *deadline) { + LOG_DBG("Got text message"); + + static const char heartbeat[] = "{\"type\":0,\"ttl_ms\":1100}\n"; + + if (len + 1 == sizeof(heartbeat)) { + if (memcmp((const void *)buf, heartbeat, len) == 0) { + LOG_DBG("Received heartbeat from client"); + *deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; + return 0; + } + } + + LOG_HEXDUMP_DBG(buf, len, "Could not handle message"); + return -EIO; +} + +static int ws_rx_handle_connection(int fd) { + LOG_INF("Handling RX of connection with file descriptor %d", fd); + + uint8_t rx_buf[100]; + uint32_t message_type = 0; + int64_t rx_heartbeat_deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; + + while (true) { + memset(rx_buf, 0, sizeof(rx_buf)); + + int ret = ws_recv_full_message(fd, rx_buf, sizeof(rx_buf), + &message_type, rx_heartbeat_deadline); + if (ret < 0) { + LOG_ERR("Could not receive full message from %d (%d)", fd, ret); + return ret; + } + size_t len = ret; + LOG_DBG("Received message with opcode %d of length %d", message_type, len); + + if (message_type == WEBSOCKET_OPCODE_DATA_TEXT || + message_type == 3) { // FIXME for some reason opcode 1 parses to 3 + ret = ws_handle_data_text(rx_buf, len, &rx_heartbeat_deadline); + if (ret < 0) { + LOG_ERR("Failed to handle text data (%d)", ret); + } + } else if (message_type == WEBSOCKET_OPCODE_CLOSE) { + LOG_INF("Client closed connection"); + return 0; + } else { + LOG_WRN("Received unhandled message opcode %d", message_type); + LOG_HEXDUMP_WRN(rx_buf, len, "Message content:"); + } + + if (k_uptime_get() > rx_heartbeat_deadline) { + LOG_INF("Client heartbeat timeout expired on %d - closing", fd); + return -ETIMEDOUT; + } + } +} + +static void ws_rx_thread_function(void *ptr1, void *ptr2, void *ptr3) { + struct k_msgq *tx_fds = ptr1; + + while (true) { + int fd; + int ret = k_msgq_get(&ws_connection_backlog, &fd, K_FOREVER); + if (ret < 0) { + LOG_ERR("Error getting new file descriptor (%d)", ret); + continue; + } + + ret = k_msgq_put(tx_fds, (const void *)&fd, K_FOREVER); + if (ret < 0) { + LOG_ERR("Could not pass file descriptor %d to TX thread (%d)", fd, ret); + goto unregister; + } + + ret = ws_rx_handle_connection(fd); + if (ret < 0) { + LOG_ERR("Failed to handle connection %d (%d)", fd, ret); + goto unregister; + } + +unregister: + ret = websocket_unregister(fd); + if (ret < 0) { + LOG_ERR("Failed to unregister connection %d (%d)", fd, ret); + } + } +} + +K_MSGQ_DEFINE(ws_tx_fd_1, sizeof(int), 1, 1); +K_THREAD_DEFINE(ws_rx_thread_1, WS_HANDLER_STACK_SIZE, + ws_rx_thread_function, &ws_tx_fd_1, NULL, NULL, + WS_HANDLER_PRIO, 0, 0); +K_THREAD_DEFINE(ws_tx_thread_1, WS_HANDLER_STACK_SIZE, + ws_tx_thread_function, &ws_tx_fd_1, NULL, NULL, + WS_HANDLER_PRIO, 0, 0); + +int ws_upgrade_handler( + int ws_socket, + struct http_request_ctx *request_ctx, + void *user_data +) { + LOG_DBG("Handling WebSocket upgrade for file descriptor %d", ws_socket); + + int ret = k_msgq_put(&ws_connection_backlog, (const void *)&ws_socket, + K_NO_WAIT); + if (ret < 0) { + LOG_ERR("Connection backlog full, dropping file descriptor %d", ws_socket); + return -ENOENT; + } + + LOG_INF("Added file descriptor %d to connection backlog", ws_socket); + return 0; +} diff --git a/fw/src/ws.h b/fw/src/ws.h new file mode 100644 index 0000000..0c13039 --- /dev/null +++ b/fw/src/ws.h @@ -0,0 +1,17 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at https://mozilla.org/MPL/2.0/. + */ + +#ifndef SRC_WS_H +#define SRC_WS_H + + +#include <zephyr/net/http/server.h> + + +int ws_upgrade_handler(int ws_socket, struct http_request_ctx *request_ctx, + void *user_data); + +#endif // !SRC_WS_H |