diff options
author | xengineering <me@xengineering.eu> | 2025-03-23 17:53:24 +0100 |
---|---|---|
committer | xengineering <me@xengineering.eu> | 2025-03-26 21:18:17 +0100 |
commit | b073db4017008ceb638a9c23c8cc93e60a3a7fdb (patch) | |
tree | 3359897450d1a0d187eb1fc2c47f36a6475691fd /fw/src/ws.c | |
parent | 6ebbc6cd9876744c09547bcb4ce2c39a89ce0f6c (diff) | |
download | iot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.tar iot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.tar.zst iot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.zip |
fw: app: Move application firmware code here
This makes the structure of the `fw` folder more clear and separates
application-related code from bootloader- or rtos-related code.
Diffstat (limited to 'fw/src/ws.c')
-rw-r--r-- | fw/src/ws.c | 251 |
1 files changed, 0 insertions, 251 deletions
diff --git a/fw/src/ws.c b/fw/src/ws.c deleted file mode 100644 index 6f68538..0000000 --- a/fw/src/ws.c +++ /dev/null @@ -1,251 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public License, - * v. 2.0. If a copy of the MPL was not distributed with this file, You can - * obtain one at https://mozilla.org/MPL/2.0/. - */ - - -#include <errno.h> -#include <stdbool.h> -#include <stdio.h> -#include <string.h> - -#include <zephyr/kernel.h> -#include <zephyr/logging/log.h> -#include <zephyr/net/http/server.h> -#include <zephyr/net/websocket.h> -#include <zephyr/sys_clock.h> -#include <zephyr/zbus/zbus.h> - -#include "heart.h" - - -LOG_MODULE_REGISTER(ws); - - -#define WS_NUM_HANDLERS 1 -#define WS_HANDLER_STACK_SIZE 2048 -#define WS_HANDLER_PRIO K_PRIO_PREEMPT(8) -#define WS_CONNECTION_BACKLOG_LEN 5 -#define WS_TX_BUFFER_LEN 150 -#define WS_CLIENT_HEARTBEAT_GRACE_TIME_MS 10000 - -K_MSGQ_DEFINE(ws_connection_backlog, sizeof(int), WS_CONNECTION_BACKLOG_LEN, 1); -ZBUS_MSG_SUBSCRIBER_DEFINE(ws_tx_messages); -ZBUS_CHAN_ADD_OBS(heartbeat_channel, ws_tx_messages, 3); - -static void ws_tx_thread_function(void *ptr1, void *ptr2, void *ptr3) { - int fd = -1; - struct k_msgq *tx_fds = ptr1; - uint8_t buffer[WS_TX_BUFFER_LEN]; - const struct zbus_channel *chan; - struct heartbeat heartbeat; - - while (true) { - int ret = zbus_obs_set_enable(&ws_tx_messages, false); - if (ret < 0) { - LOG_ERR("Could not disable zbus observer (%d)", ret); - continue; - } - - ret = k_msgq_get(tx_fds, &fd, K_FOREVER); - if (ret < 0) { - LOG_ERR("Error getting new file descriptor (%d)", ret); - continue; - } - LOG_INF("Handling TX of connection with file descriptor %d", fd); - - ret = zbus_obs_set_enable(&ws_tx_messages, true); - if (ret < 0) { - LOG_ERR("Could not enable zbus observer (%d)", ret); - continue; - } - - while (true) { - ret = zbus_sub_wait_msg(&ws_tx_messages, &chan, &heartbeat, K_FOREVER); - if (ret < 0) { - LOG_ERR("Could not wait for zbus messages (%d)", ret); - break; - } - if (chan != &heartbeat_channel) { - LOG_ERR("Unsupported channel"); - continue; - } - - ret = snprintf((char *)buffer, sizeof(buffer), - "{\"type\":0,\"ttl_ms\":%d}\n", heartbeat.ttl_ms); - if (ret < 0) { - LOG_ERR("Could not serialize heartbeat message"); - continue; - } - size_t len = ret; - - int ret = websocket_send_msg( - fd, (const uint8_t *)buffer, len, - WEBSOCKET_OPCODE_DATA_TEXT, false, true, 100 - ); - if (ret < 0) { - if (ret == -EBADF) { - LOG_DBG("Connection closed, waiting for new one"); - } else { - LOG_ERR("Error on websocket_send_msg (%d)", ret); - } - break; - } - } - } -} - -static int ws_recv_full_message(int fd, void *buf, const size_t len, - uint32_t *message_type, int64_t deadline) { - uint64_t remaining = 0; - size_t read = 0; - int ret; - - while (true) { - int64_t timeout = MIN(50, deadline - k_uptime_get()); - if (timeout <= 0) { - ret = -ETIMEDOUT; - LOG_ERR("No time left to read message from %d (%d)", fd, ret); - return ret; - } - - if (read >= len) { - ret = -ENOSPC; - LOG_ERR("No buffer space left to store message from %d (%d)", fd, ret); - return ret; - } - - ret = websocket_recv_msg(fd, (unsigned char *)buf+read, len-read, - message_type, &remaining, timeout); - if (ret < 0) { - if (ret == -EAGAIN || ret == -EINTR) { - LOG_DBG("Trying again to read full message"); - continue; - } else { - LOG_ERR("Unhandled error on message reading (%d)", ret); - return ret; - } - } - - read += ret; - - if (remaining <= 0) { - return read; - } - } -} - -static int ws_handle_data_text(const uint8_t *buf, size_t len, int64_t *deadline) { - LOG_DBG("Got text message"); - - static const char heartbeat[] = "{\"type\":0,\"ttl_ms\":1100}\n"; - - if (len + 1 == sizeof(heartbeat)) { - if (memcmp((const void *)buf, heartbeat, len) == 0) { - LOG_DBG("Received heartbeat from client"); - *deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; - return 0; - } - } - - LOG_HEXDUMP_DBG(buf, len, "Could not handle message"); - return -EIO; -} - -static int ws_rx_handle_connection(int fd) { - LOG_INF("Handling RX of connection with file descriptor %d", fd); - - uint8_t rx_buf[100]; - uint32_t message_type = 0; - int64_t rx_heartbeat_deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; - - while (true) { - memset(rx_buf, 0, sizeof(rx_buf)); - - int ret = ws_recv_full_message(fd, rx_buf, sizeof(rx_buf), - &message_type, rx_heartbeat_deadline); - if (ret < 0) { - LOG_ERR("Could not receive full message from %d (%d)", fd, ret); - return ret; - } - size_t len = ret; - LOG_DBG("Received message with opcode %d of length %d", message_type, len); - - if (message_type == WEBSOCKET_OPCODE_DATA_TEXT || - message_type == 3) { // FIXME for some reason opcode 1 parses to 3 - ret = ws_handle_data_text(rx_buf, len, &rx_heartbeat_deadline); - if (ret < 0) { - LOG_ERR("Failed to handle text data (%d)", ret); - } - } else if (message_type == WEBSOCKET_OPCODE_CLOSE) { - LOG_INF("Client closed connection"); - return 0; - } else { - LOG_WRN("Received unhandled message opcode %d", message_type); - LOG_HEXDUMP_WRN(rx_buf, len, "Message content:"); - } - - if (k_uptime_get() > rx_heartbeat_deadline) { - LOG_INF("Client heartbeat timeout expired on %d - closing", fd); - return -ETIMEDOUT; - } - } -} - -static void ws_rx_thread_function(void *ptr1, void *ptr2, void *ptr3) { - struct k_msgq *tx_fds = ptr1; - - while (true) { - int fd; - int ret = k_msgq_get(&ws_connection_backlog, &fd, K_FOREVER); - if (ret < 0) { - LOG_ERR("Error getting new file descriptor (%d)", ret); - continue; - } - - ret = k_msgq_put(tx_fds, (const void *)&fd, K_FOREVER); - if (ret < 0) { - LOG_ERR("Could not pass file descriptor %d to TX thread (%d)", fd, ret); - goto unregister; - } - - ret = ws_rx_handle_connection(fd); - if (ret < 0) { - LOG_ERR("Failed to handle connection %d (%d)", fd, ret); - goto unregister; - } - -unregister: - ret = websocket_unregister(fd); - if (ret < 0) { - LOG_ERR("Failed to unregister connection %d (%d)", fd, ret); - } - } -} - -K_MSGQ_DEFINE(ws_tx_fd_1, sizeof(int), 1, 1); -K_THREAD_DEFINE(ws_rx_thread_1, WS_HANDLER_STACK_SIZE, - ws_rx_thread_function, &ws_tx_fd_1, NULL, NULL, - WS_HANDLER_PRIO, 0, 0); -K_THREAD_DEFINE(ws_tx_thread_1, WS_HANDLER_STACK_SIZE, - ws_tx_thread_function, &ws_tx_fd_1, NULL, NULL, - WS_HANDLER_PRIO, 0, 0); - -int ws_upgrade_handler( - int ws_socket, - struct http_request_ctx *request_ctx, - void *user_data -) { - LOG_DBG("Handling WebSocket upgrade for file descriptor %d", ws_socket); - - int ret = k_msgq_put(&ws_connection_backlog, (const void *)&ws_socket, - K_NO_WAIT); - if (ret < 0) { - LOG_ERR("Connection backlog full, dropping file descriptor %d", ws_socket); - return -ENOENT; - } - - LOG_INF("Added file descriptor %d to connection backlog", ws_socket); - return 0; -} |