diff options
author | xengineering <me@xengineering.eu> | 2025-03-23 17:53:24 +0100 |
---|---|---|
committer | xengineering <me@xengineering.eu> | 2025-03-26 21:18:17 +0100 |
commit | b073db4017008ceb638a9c23c8cc93e60a3a7fdb (patch) | |
tree | 3359897450d1a0d187eb1fc2c47f36a6475691fd /fw/app/src/ws.c | |
parent | 6ebbc6cd9876744c09547bcb4ce2c39a89ce0f6c (diff) | |
download | iot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.tar iot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.tar.zst iot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.zip |
fw: app: Move application firmware code here
This makes the structure of the `fw` folder more clear and separates
application-related code from bootloader- or rtos-related code.
Diffstat (limited to 'fw/app/src/ws.c')
-rw-r--r-- | fw/app/src/ws.c | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/fw/app/src/ws.c b/fw/app/src/ws.c new file mode 100644 index 0000000..6f68538 --- /dev/null +++ b/fw/app/src/ws.c @@ -0,0 +1,251 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at https://mozilla.org/MPL/2.0/. + */ + + +#include <errno.h> +#include <stdbool.h> +#include <stdio.h> +#include <string.h> + +#include <zephyr/kernel.h> +#include <zephyr/logging/log.h> +#include <zephyr/net/http/server.h> +#include <zephyr/net/websocket.h> +#include <zephyr/sys_clock.h> +#include <zephyr/zbus/zbus.h> + +#include "heart.h" + + +LOG_MODULE_REGISTER(ws); + + +#define WS_NUM_HANDLERS 1 +#define WS_HANDLER_STACK_SIZE 2048 +#define WS_HANDLER_PRIO K_PRIO_PREEMPT(8) +#define WS_CONNECTION_BACKLOG_LEN 5 +#define WS_TX_BUFFER_LEN 150 +#define WS_CLIENT_HEARTBEAT_GRACE_TIME_MS 10000 + +K_MSGQ_DEFINE(ws_connection_backlog, sizeof(int), WS_CONNECTION_BACKLOG_LEN, 1); +ZBUS_MSG_SUBSCRIBER_DEFINE(ws_tx_messages); +ZBUS_CHAN_ADD_OBS(heartbeat_channel, ws_tx_messages, 3); + +static void ws_tx_thread_function(void *ptr1, void *ptr2, void *ptr3) { + int fd = -1; + struct k_msgq *tx_fds = ptr1; + uint8_t buffer[WS_TX_BUFFER_LEN]; + const struct zbus_channel *chan; + struct heartbeat heartbeat; + + while (true) { + int ret = zbus_obs_set_enable(&ws_tx_messages, false); + if (ret < 0) { + LOG_ERR("Could not disable zbus observer (%d)", ret); + continue; + } + + ret = k_msgq_get(tx_fds, &fd, K_FOREVER); + if (ret < 0) { + LOG_ERR("Error getting new file descriptor (%d)", ret); + continue; + } + LOG_INF("Handling TX of connection with file descriptor %d", fd); + + ret = zbus_obs_set_enable(&ws_tx_messages, true); + if (ret < 0) { + LOG_ERR("Could not enable zbus observer (%d)", ret); + continue; + } + + while (true) { + ret = zbus_sub_wait_msg(&ws_tx_messages, &chan, &heartbeat, K_FOREVER); + if (ret < 0) { + LOG_ERR("Could not wait for zbus messages (%d)", ret); + break; + } + if (chan != &heartbeat_channel) { + LOG_ERR("Unsupported channel"); + continue; + } + + ret = snprintf((char *)buffer, sizeof(buffer), + "{\"type\":0,\"ttl_ms\":%d}\n", heartbeat.ttl_ms); + if (ret < 0) { + LOG_ERR("Could not serialize heartbeat message"); + continue; + } + size_t len = ret; + + int ret = websocket_send_msg( + fd, (const uint8_t *)buffer, len, + WEBSOCKET_OPCODE_DATA_TEXT, false, true, 100 + ); + if (ret < 0) { + if (ret == -EBADF) { + LOG_DBG("Connection closed, waiting for new one"); + } else { + LOG_ERR("Error on websocket_send_msg (%d)", ret); + } + break; + } + } + } +} + +static int ws_recv_full_message(int fd, void *buf, const size_t len, + uint32_t *message_type, int64_t deadline) { + uint64_t remaining = 0; + size_t read = 0; + int ret; + + while (true) { + int64_t timeout = MIN(50, deadline - k_uptime_get()); + if (timeout <= 0) { + ret = -ETIMEDOUT; + LOG_ERR("No time left to read message from %d (%d)", fd, ret); + return ret; + } + + if (read >= len) { + ret = -ENOSPC; + LOG_ERR("No buffer space left to store message from %d (%d)", fd, ret); + return ret; + } + + ret = websocket_recv_msg(fd, (unsigned char *)buf+read, len-read, + message_type, &remaining, timeout); + if (ret < 0) { + if (ret == -EAGAIN || ret == -EINTR) { + LOG_DBG("Trying again to read full message"); + continue; + } else { + LOG_ERR("Unhandled error on message reading (%d)", ret); + return ret; + } + } + + read += ret; + + if (remaining <= 0) { + return read; + } + } +} + +static int ws_handle_data_text(const uint8_t *buf, size_t len, int64_t *deadline) { + LOG_DBG("Got text message"); + + static const char heartbeat[] = "{\"type\":0,\"ttl_ms\":1100}\n"; + + if (len + 1 == sizeof(heartbeat)) { + if (memcmp((const void *)buf, heartbeat, len) == 0) { + LOG_DBG("Received heartbeat from client"); + *deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; + return 0; + } + } + + LOG_HEXDUMP_DBG(buf, len, "Could not handle message"); + return -EIO; +} + +static int ws_rx_handle_connection(int fd) { + LOG_INF("Handling RX of connection with file descriptor %d", fd); + + uint8_t rx_buf[100]; + uint32_t message_type = 0; + int64_t rx_heartbeat_deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; + + while (true) { + memset(rx_buf, 0, sizeof(rx_buf)); + + int ret = ws_recv_full_message(fd, rx_buf, sizeof(rx_buf), + &message_type, rx_heartbeat_deadline); + if (ret < 0) { + LOG_ERR("Could not receive full message from %d (%d)", fd, ret); + return ret; + } + size_t len = ret; + LOG_DBG("Received message with opcode %d of length %d", message_type, len); + + if (message_type == WEBSOCKET_OPCODE_DATA_TEXT || + message_type == 3) { // FIXME for some reason opcode 1 parses to 3 + ret = ws_handle_data_text(rx_buf, len, &rx_heartbeat_deadline); + if (ret < 0) { + LOG_ERR("Failed to handle text data (%d)", ret); + } + } else if (message_type == WEBSOCKET_OPCODE_CLOSE) { + LOG_INF("Client closed connection"); + return 0; + } else { + LOG_WRN("Received unhandled message opcode %d", message_type); + LOG_HEXDUMP_WRN(rx_buf, len, "Message content:"); + } + + if (k_uptime_get() > rx_heartbeat_deadline) { + LOG_INF("Client heartbeat timeout expired on %d - closing", fd); + return -ETIMEDOUT; + } + } +} + +static void ws_rx_thread_function(void *ptr1, void *ptr2, void *ptr3) { + struct k_msgq *tx_fds = ptr1; + + while (true) { + int fd; + int ret = k_msgq_get(&ws_connection_backlog, &fd, K_FOREVER); + if (ret < 0) { + LOG_ERR("Error getting new file descriptor (%d)", ret); + continue; + } + + ret = k_msgq_put(tx_fds, (const void *)&fd, K_FOREVER); + if (ret < 0) { + LOG_ERR("Could not pass file descriptor %d to TX thread (%d)", fd, ret); + goto unregister; + } + + ret = ws_rx_handle_connection(fd); + if (ret < 0) { + LOG_ERR("Failed to handle connection %d (%d)", fd, ret); + goto unregister; + } + +unregister: + ret = websocket_unregister(fd); + if (ret < 0) { + LOG_ERR("Failed to unregister connection %d (%d)", fd, ret); + } + } +} + +K_MSGQ_DEFINE(ws_tx_fd_1, sizeof(int), 1, 1); +K_THREAD_DEFINE(ws_rx_thread_1, WS_HANDLER_STACK_SIZE, + ws_rx_thread_function, &ws_tx_fd_1, NULL, NULL, + WS_HANDLER_PRIO, 0, 0); +K_THREAD_DEFINE(ws_tx_thread_1, WS_HANDLER_STACK_SIZE, + ws_tx_thread_function, &ws_tx_fd_1, NULL, NULL, + WS_HANDLER_PRIO, 0, 0); + +int ws_upgrade_handler( + int ws_socket, + struct http_request_ctx *request_ctx, + void *user_data +) { + LOG_DBG("Handling WebSocket upgrade for file descriptor %d", ws_socket); + + int ret = k_msgq_put(&ws_connection_backlog, (const void *)&ws_socket, + K_NO_WAIT); + if (ret < 0) { + LOG_ERR("Connection backlog full, dropping file descriptor %d", ws_socket); + return -ENOENT; + } + + LOG_INF("Added file descriptor %d to connection backlog", ws_socket); + return 0; +} |