/* * This Source Code Form is subject to the terms of the Mozilla Public License, * v. 2.0. If a copy of the MPL was not distributed with this file, You can * obtain one at https://mozilla.org/MPL/2.0/. */ #include #include #include #include #include #include #include #include #include #include #include "heart.h" LOG_MODULE_REGISTER(ws); #define WS_NUM_HANDLERS 1 #define WS_HANDLER_STACK_SIZE 2048 #define WS_HANDLER_PRIO K_PRIO_PREEMPT(8) #define WS_CONNECTION_BACKLOG_LEN 5 #define WS_TX_BUFFER_LEN 150 #define WS_CLIENT_HEARTBEAT_GRACE_TIME_MS 10000 K_MSGQ_DEFINE(ws_connection_backlog, sizeof(int), WS_CONNECTION_BACKLOG_LEN, 1); ZBUS_MSG_SUBSCRIBER_DEFINE(ws_tx_messages); ZBUS_CHAN_ADD_OBS(heartbeat_channel, ws_tx_messages, 3); static void ws_tx_thread_function(void *ptr1, void *ptr2, void *ptr3) { int fd = -1; struct k_msgq *tx_fds = ptr1; uint8_t buffer[WS_TX_BUFFER_LEN]; const struct zbus_channel *chan; struct heartbeat heartbeat; while (true) { int ret = zbus_obs_set_enable(&ws_tx_messages, false); if (ret < 0) { LOG_ERR("Could not disable zbus observer (%d)", ret); continue; } ret = k_msgq_get(tx_fds, &fd, K_FOREVER); if (ret < 0) { LOG_ERR("Error getting new file descriptor (%d)", ret); continue; } LOG_INF("Handling TX of connection with file descriptor %d", fd); ret = zbus_obs_set_enable(&ws_tx_messages, true); if (ret < 0) { LOG_ERR("Could not enable zbus observer (%d)", ret); continue; } while (true) { ret = zbus_sub_wait_msg(&ws_tx_messages, &chan, &heartbeat, K_FOREVER); if (ret < 0) { LOG_ERR("Could not wait for zbus messages (%d)", ret); break; } if (chan != &heartbeat_channel) { LOG_ERR("Unsupported channel"); continue; } ret = snprintf((char *)buffer, sizeof(buffer), "{\"type\":0,\"ttl_ms\":%d}\n", heartbeat.ttl_ms); if (ret < 0) { LOG_ERR("Could not serialize heartbeat message"); continue; } size_t len = ret; int ret = websocket_send_msg( fd, (const uint8_t *)buffer, len, WEBSOCKET_OPCODE_DATA_TEXT, false, true, 100 ); if (ret < 0) { if (ret == -EBADF) { LOG_DBG("Connection closed, waiting for new one"); } else { LOG_ERR("Error on websocket_send_msg (%d)", ret); } break; } } } } static int ws_recv_full_message(int fd, void *buf, const size_t len, uint32_t *message_type, int64_t deadline) { uint64_t remaining = 0; size_t read = 0; int ret; while (true) { int64_t timeout = MIN(50, deadline - k_uptime_get()); if (timeout <= 0) { ret = -ETIMEDOUT; LOG_ERR("No time left to read message from %d (%d)", fd, ret); return ret; } if (read >= len) { ret = -ENOSPC; LOG_ERR("No buffer space left to store message from %d (%d)", fd, ret); return ret; } ret = websocket_recv_msg(fd, (unsigned char *)buf+read, len-read, message_type, &remaining, timeout); if (ret < 0) { if (ret == -EAGAIN || ret == -EINTR) { LOG_DBG("Trying again to read full message"); continue; } else { LOG_ERR("Unhandled error on message reading (%d)", ret); return ret; } } read += ret; if (remaining <= 0) { return read; } } } static int ws_handle_data_text(const uint8_t *buf, size_t len, int64_t *deadline) { LOG_DBG("Got text message"); static const char heartbeat[] = "{\"type\":0,\"ttl_ms\":1100}\n"; if (len + 1 == sizeof(heartbeat)) { if (memcmp((const void *)buf, heartbeat, len) == 0) { LOG_DBG("Received heartbeat from client"); *deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; return 0; } } LOG_HEXDUMP_DBG(buf, len, "Could not handle message"); return -EIO; } static int ws_rx_handle_connection(int fd) { LOG_INF("Handling RX of connection with file descriptor %d", fd); uint8_t rx_buf[100]; uint32_t message_type = 0; int64_t rx_heartbeat_deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; while (true) { memset(rx_buf, 0, sizeof(rx_buf)); int ret = ws_recv_full_message(fd, rx_buf, sizeof(rx_buf), &message_type, rx_heartbeat_deadline); if (ret < 0) { LOG_ERR("Could not receive full message from %d (%d)", fd, ret); return ret; } size_t len = ret; LOG_DBG("Received message with opcode %d of length %d", message_type, len); if (message_type == WEBSOCKET_OPCODE_DATA_TEXT || message_type == 3) { // FIXME for some reason opcode 1 parses to 3 ret = ws_handle_data_text(rx_buf, len, &rx_heartbeat_deadline); if (ret < 0) { LOG_ERR("Failed to handle text data (%d)", ret); } } else if (message_type == WEBSOCKET_OPCODE_CLOSE) { LOG_INF("Client closed connection"); return 0; } else { LOG_WRN("Received unhandled message opcode %d", message_type); LOG_HEXDUMP_WRN(rx_buf, len, "Message content:"); } if (k_uptime_get() > rx_heartbeat_deadline) { LOG_INF("Client heartbeat timeout expired on %d - closing", fd); return -ETIMEDOUT; } } } static void ws_rx_thread_function(void *ptr1, void *ptr2, void *ptr3) { struct k_msgq *tx_fds = ptr1; while (true) { int fd; int ret = k_msgq_get(&ws_connection_backlog, &fd, K_FOREVER); if (ret < 0) { LOG_ERR("Error getting new file descriptor (%d)", ret); continue; } ret = k_msgq_put(tx_fds, (const void *)&fd, K_FOREVER); if (ret < 0) { LOG_ERR("Could not pass file descriptor %d to TX thread (%d)", fd, ret); goto unregister; } ret = ws_rx_handle_connection(fd); if (ret < 0) { LOG_ERR("Failed to handle connection %d (%d)", fd, ret); goto unregister; } unregister: ret = websocket_unregister(fd); if (ret < 0) { LOG_ERR("Failed to unregister connection %d (%d)", fd, ret); } } } K_MSGQ_DEFINE(ws_tx_fd_1, sizeof(int), 1, 1); K_THREAD_DEFINE(ws_rx_thread_1, WS_HANDLER_STACK_SIZE, ws_rx_thread_function, &ws_tx_fd_1, NULL, NULL, WS_HANDLER_PRIO, 0, 0); K_THREAD_DEFINE(ws_tx_thread_1, WS_HANDLER_STACK_SIZE, ws_tx_thread_function, &ws_tx_fd_1, NULL, NULL, WS_HANDLER_PRIO, 0, 0); int ws_upgrade_handler( int ws_socket, struct http_request_ctx *request_ctx, void *user_data ) { LOG_DBG("Handling WebSocket upgrade for file descriptor %d", ws_socket); int ret = k_msgq_put(&ws_connection_backlog, (const void *)&ws_socket, K_NO_WAIT); if (ret < 0) { LOG_ERR("Connection backlog full, dropping file descriptor %d", ws_socket); return -ENOENT; } LOG_INF("Added file descriptor %d to connection backlog", ws_socket); return 0; }