summaryrefslogtreecommitdiff
path: root/fw/src/ws.c
diff options
context:
space:
mode:
authorxengineering <me@xengineering.eu>2025-03-23 17:53:24 +0100
committerxengineering <me@xengineering.eu>2025-03-26 21:18:17 +0100
commitb073db4017008ceb638a9c23c8cc93e60a3a7fdb (patch)
tree3359897450d1a0d187eb1fc2c47f36a6475691fd /fw/src/ws.c
parent6ebbc6cd9876744c09547bcb4ce2c39a89ce0f6c (diff)
downloadiot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.tar
iot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.tar.zst
iot-contact-b073db4017008ceb638a9c23c8cc93e60a3a7fdb.zip
fw: app: Move application firmware code here
This makes the structure of the `fw` folder more clear and separates application-related code from bootloader- or rtos-related code.
Diffstat (limited to 'fw/src/ws.c')
-rw-r--r--fw/src/ws.c251
1 files changed, 0 insertions, 251 deletions
diff --git a/fw/src/ws.c b/fw/src/ws.c
deleted file mode 100644
index 6f68538..0000000
--- a/fw/src/ws.c
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * This Source Code Form is subject to the terms of the Mozilla Public License,
- * v. 2.0. If a copy of the MPL was not distributed with this file, You can
- * obtain one at https://mozilla.org/MPL/2.0/.
- */
-
-
-#include <errno.h>
-#include <stdbool.h>
-#include <stdio.h>
-#include <string.h>
-
-#include <zephyr/kernel.h>
-#include <zephyr/logging/log.h>
-#include <zephyr/net/http/server.h>
-#include <zephyr/net/websocket.h>
-#include <zephyr/sys_clock.h>
-#include <zephyr/zbus/zbus.h>
-
-#include "heart.h"
-
-
-LOG_MODULE_REGISTER(ws);
-
-
-#define WS_NUM_HANDLERS 1
-#define WS_HANDLER_STACK_SIZE 2048
-#define WS_HANDLER_PRIO K_PRIO_PREEMPT(8)
-#define WS_CONNECTION_BACKLOG_LEN 5
-#define WS_TX_BUFFER_LEN 150
-#define WS_CLIENT_HEARTBEAT_GRACE_TIME_MS 10000
-
-K_MSGQ_DEFINE(ws_connection_backlog, sizeof(int), WS_CONNECTION_BACKLOG_LEN, 1);
-ZBUS_MSG_SUBSCRIBER_DEFINE(ws_tx_messages);
-ZBUS_CHAN_ADD_OBS(heartbeat_channel, ws_tx_messages, 3);
-
-static void ws_tx_thread_function(void *ptr1, void *ptr2, void *ptr3) {
- int fd = -1;
- struct k_msgq *tx_fds = ptr1;
- uint8_t buffer[WS_TX_BUFFER_LEN];
- const struct zbus_channel *chan;
- struct heartbeat heartbeat;
-
- while (true) {
- int ret = zbus_obs_set_enable(&ws_tx_messages, false);
- if (ret < 0) {
- LOG_ERR("Could not disable zbus observer (%d)", ret);
- continue;
- }
-
- ret = k_msgq_get(tx_fds, &fd, K_FOREVER);
- if (ret < 0) {
- LOG_ERR("Error getting new file descriptor (%d)", ret);
- continue;
- }
- LOG_INF("Handling TX of connection with file descriptor %d", fd);
-
- ret = zbus_obs_set_enable(&ws_tx_messages, true);
- if (ret < 0) {
- LOG_ERR("Could not enable zbus observer (%d)", ret);
- continue;
- }
-
- while (true) {
- ret = zbus_sub_wait_msg(&ws_tx_messages, &chan, &heartbeat, K_FOREVER);
- if (ret < 0) {
- LOG_ERR("Could not wait for zbus messages (%d)", ret);
- break;
- }
- if (chan != &heartbeat_channel) {
- LOG_ERR("Unsupported channel");
- continue;
- }
-
- ret = snprintf((char *)buffer, sizeof(buffer),
- "{\"type\":0,\"ttl_ms\":%d}\n", heartbeat.ttl_ms);
- if (ret < 0) {
- LOG_ERR("Could not serialize heartbeat message");
- continue;
- }
- size_t len = ret;
-
- int ret = websocket_send_msg(
- fd, (const uint8_t *)buffer, len,
- WEBSOCKET_OPCODE_DATA_TEXT, false, true, 100
- );
- if (ret < 0) {
- if (ret == -EBADF) {
- LOG_DBG("Connection closed, waiting for new one");
- } else {
- LOG_ERR("Error on websocket_send_msg (%d)", ret);
- }
- break;
- }
- }
- }
-}
-
-static int ws_recv_full_message(int fd, void *buf, const size_t len,
- uint32_t *message_type, int64_t deadline) {
- uint64_t remaining = 0;
- size_t read = 0;
- int ret;
-
- while (true) {
- int64_t timeout = MIN(50, deadline - k_uptime_get());
- if (timeout <= 0) {
- ret = -ETIMEDOUT;
- LOG_ERR("No time left to read message from %d (%d)", fd, ret);
- return ret;
- }
-
- if (read >= len) {
- ret = -ENOSPC;
- LOG_ERR("No buffer space left to store message from %d (%d)", fd, ret);
- return ret;
- }
-
- ret = websocket_recv_msg(fd, (unsigned char *)buf+read, len-read,
- message_type, &remaining, timeout);
- if (ret < 0) {
- if (ret == -EAGAIN || ret == -EINTR) {
- LOG_DBG("Trying again to read full message");
- continue;
- } else {
- LOG_ERR("Unhandled error on message reading (%d)", ret);
- return ret;
- }
- }
-
- read += ret;
-
- if (remaining <= 0) {
- return read;
- }
- }
-}
-
-static int ws_handle_data_text(const uint8_t *buf, size_t len, int64_t *deadline) {
- LOG_DBG("Got text message");
-
- static const char heartbeat[] = "{\"type\":0,\"ttl_ms\":1100}\n";
-
- if (len + 1 == sizeof(heartbeat)) {
- if (memcmp((const void *)buf, heartbeat, len) == 0) {
- LOG_DBG("Received heartbeat from client");
- *deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS;
- return 0;
- }
- }
-
- LOG_HEXDUMP_DBG(buf, len, "Could not handle message");
- return -EIO;
-}
-
-static int ws_rx_handle_connection(int fd) {
- LOG_INF("Handling RX of connection with file descriptor %d", fd);
-
- uint8_t rx_buf[100];
- uint32_t message_type = 0;
- int64_t rx_heartbeat_deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS;
-
- while (true) {
- memset(rx_buf, 0, sizeof(rx_buf));
-
- int ret = ws_recv_full_message(fd, rx_buf, sizeof(rx_buf),
- &message_type, rx_heartbeat_deadline);
- if (ret < 0) {
- LOG_ERR("Could not receive full message from %d (%d)", fd, ret);
- return ret;
- }
- size_t len = ret;
- LOG_DBG("Received message with opcode %d of length %d", message_type, len);
-
- if (message_type == WEBSOCKET_OPCODE_DATA_TEXT ||
- message_type == 3) { // FIXME for some reason opcode 1 parses to 3
- ret = ws_handle_data_text(rx_buf, len, &rx_heartbeat_deadline);
- if (ret < 0) {
- LOG_ERR("Failed to handle text data (%d)", ret);
- }
- } else if (message_type == WEBSOCKET_OPCODE_CLOSE) {
- LOG_INF("Client closed connection");
- return 0;
- } else {
- LOG_WRN("Received unhandled message opcode %d", message_type);
- LOG_HEXDUMP_WRN(rx_buf, len, "Message content:");
- }
-
- if (k_uptime_get() > rx_heartbeat_deadline) {
- LOG_INF("Client heartbeat timeout expired on %d - closing", fd);
- return -ETIMEDOUT;
- }
- }
-}
-
-static void ws_rx_thread_function(void *ptr1, void *ptr2, void *ptr3) {
- struct k_msgq *tx_fds = ptr1;
-
- while (true) {
- int fd;
- int ret = k_msgq_get(&ws_connection_backlog, &fd, K_FOREVER);
- if (ret < 0) {
- LOG_ERR("Error getting new file descriptor (%d)", ret);
- continue;
- }
-
- ret = k_msgq_put(tx_fds, (const void *)&fd, K_FOREVER);
- if (ret < 0) {
- LOG_ERR("Could not pass file descriptor %d to TX thread (%d)", fd, ret);
- goto unregister;
- }
-
- ret = ws_rx_handle_connection(fd);
- if (ret < 0) {
- LOG_ERR("Failed to handle connection %d (%d)", fd, ret);
- goto unregister;
- }
-
-unregister:
- ret = websocket_unregister(fd);
- if (ret < 0) {
- LOG_ERR("Failed to unregister connection %d (%d)", fd, ret);
- }
- }
-}
-
-K_MSGQ_DEFINE(ws_tx_fd_1, sizeof(int), 1, 1);
-K_THREAD_DEFINE(ws_rx_thread_1, WS_HANDLER_STACK_SIZE,
- ws_rx_thread_function, &ws_tx_fd_1, NULL, NULL,
- WS_HANDLER_PRIO, 0, 0);
-K_THREAD_DEFINE(ws_tx_thread_1, WS_HANDLER_STACK_SIZE,
- ws_tx_thread_function, &ws_tx_fd_1, NULL, NULL,
- WS_HANDLER_PRIO, 0, 0);
-
-int ws_upgrade_handler(
- int ws_socket,
- struct http_request_ctx *request_ctx,
- void *user_data
-) {
- LOG_DBG("Handling WebSocket upgrade for file descriptor %d", ws_socket);
-
- int ret = k_msgq_put(&ws_connection_backlog, (const void *)&ws_socket,
- K_NO_WAIT);
- if (ret < 0) {
- LOG_ERR("Connection backlog full, dropping file descriptor %d", ws_socket);
- return -ENOENT;
- }
-
- LOG_INF("Added file descriptor %d to connection backlog", ws_socket);
- return 0;
-}