diff options
Diffstat (limited to 'fw')
| -rw-r--r-- | fw/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | fw/prj.conf | 4 | ||||
| -rw-r--r-- | fw/src/http.c | 15 | ||||
| -rw-r--r-- | fw/src/ws.c | 251 | ||||
| -rw-r--r-- | fw/src/ws.h | 17 | 
5 files changed, 288 insertions, 0 deletions
diff --git a/fw/CMakeLists.txt b/fw/CMakeLists.txt index c0b9484..5aaf319 100644 --- a/fw/CMakeLists.txt +++ b/fw/CMakeLists.txt @@ -24,6 +24,7 @@ target_sources(app  		"${CMAKE_CURRENT_SOURCE_DIR}/src/syslog.c"  		"${CMAKE_CURRENT_SOURCE_DIR}/src/mac.c"  		"${CMAKE_CURRENT_SOURCE_DIR}/src/http.c" +		"${CMAKE_CURRENT_SOURCE_DIR}/src/ws.c"  		"${CMAKE_CURRENT_SOURCE_DIR}/src/heart.c"  ) diff --git a/fw/prj.conf b/fw/prj.conf index 7984caa..ac120b9 100644 --- a/fw/prj.conf +++ b/fw/prj.conf @@ -24,11 +24,15 @@ CONFIG_HTTP_PARSER=y  CONFIG_HTTP_PARSER_URL=y  CONFIG_HTTP_SERVER=y +CONFIG_HTTP_SERVER_WEBSOCKET=y  CONFIG_FILE_SYSTEM=y  CONFIG_EVENTFD=y +CONFIG_ZVFS_OPEN_MAX=32 +CONFIG_ZVFS_POLL_MAX=32 +  CONFIG_ZBUS=y  CONFIG_ZBUS_MSG_SUBSCRIBER=y  CONFIG_HEAP_MEM_POOL_SIZE=2048 diff --git a/fw/src/http.c b/fw/src/http.c index 96a70fe..b9a76ba 100644 --- a/fw/src/http.c +++ b/fw/src/http.c @@ -13,6 +13,8 @@  #include <zephyr/net/http/service.h>  #include <zephyr/net/http/status.h> +#include "ws.h" +  LOG_MODULE_REGISTER(http); @@ -59,11 +61,24 @@ static struct http_resource_detail_dynamic favicon_resource_detail = {  	.user_data = NULL,  }; +static uint8_t websocket_read_buffer[1024]; + +struct http_resource_detail_websocket websocket_resource_detail = { +	.common = { +			.type = HTTP_RESOURCE_TYPE_WEBSOCKET, +			.bitmask_of_supported_http_methods = BIT(HTTP_GET), +		}, +	.cb = ws_upgrade_handler, +	.data_buffer = websocket_read_buffer, +	.data_buffer_len = sizeof(websocket_read_buffer), +}; +  static uint16_t http_port = 80;  HTTP_SERVICE_DEFINE(http_service, NULL, &http_port, 1, 10, NULL, NULL);  HTTP_RESOURCE_DEFINE(index_resource, http_service, "/", &index_resource_detail); +HTTP_RESOURCE_DEFINE(websocket_resource, http_service, "/", &websocket_resource_detail);  HTTP_RESOURCE_DEFINE(favicon_resource, http_service, "/favicon.ico", &favicon_resource_detail);  int init_http_server(void) { diff --git a/fw/src/ws.c b/fw/src/ws.c new file mode 100644 index 0000000..6f68538 --- /dev/null +++ b/fw/src/ws.c @@ -0,0 +1,251 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at https://mozilla.org/MPL/2.0/. + */ + + +#include <errno.h> +#include <stdbool.h> +#include <stdio.h> +#include <string.h> + +#include <zephyr/kernel.h> +#include <zephyr/logging/log.h> +#include <zephyr/net/http/server.h> +#include <zephyr/net/websocket.h> +#include <zephyr/sys_clock.h> +#include <zephyr/zbus/zbus.h> + +#include "heart.h" + + +LOG_MODULE_REGISTER(ws); + + +#define WS_NUM_HANDLERS                           1 +#define WS_HANDLER_STACK_SIZE                  2048 +#define WS_HANDLER_PRIO           K_PRIO_PREEMPT(8) +#define WS_CONNECTION_BACKLOG_LEN                 5 +#define WS_TX_BUFFER_LEN                        150 +#define WS_CLIENT_HEARTBEAT_GRACE_TIME_MS     10000 + +K_MSGQ_DEFINE(ws_connection_backlog, sizeof(int), WS_CONNECTION_BACKLOG_LEN, 1); +ZBUS_MSG_SUBSCRIBER_DEFINE(ws_tx_messages); +ZBUS_CHAN_ADD_OBS(heartbeat_channel, ws_tx_messages, 3); + +static void ws_tx_thread_function(void *ptr1, void *ptr2, void *ptr3) { +	int fd = -1; +	struct k_msgq *tx_fds = ptr1; +	uint8_t buffer[WS_TX_BUFFER_LEN]; +	const struct zbus_channel *chan; +	struct heartbeat heartbeat; + +	while (true) { +		int ret = zbus_obs_set_enable(&ws_tx_messages, false); +		if (ret < 0) { +			LOG_ERR("Could not disable zbus observer (%d)", ret); +			continue; +		} + +		ret = k_msgq_get(tx_fds, &fd, K_FOREVER); +		if (ret < 0) { +			LOG_ERR("Error getting new file descriptor (%d)", ret); +			continue; +		} +		LOG_INF("Handling TX of connection with file descriptor %d", fd); + +		ret = zbus_obs_set_enable(&ws_tx_messages, true); +		if (ret < 0) { +			LOG_ERR("Could not enable zbus observer (%d)", ret); +			continue; +		} + +		while (true) { +			ret = zbus_sub_wait_msg(&ws_tx_messages, &chan, &heartbeat, K_FOREVER); +			if (ret < 0) { +				LOG_ERR("Could not wait for zbus messages (%d)", ret); +				break; +			} +			if (chan != &heartbeat_channel) { +				LOG_ERR("Unsupported channel"); +				continue; +			} + +			ret = snprintf((char *)buffer, sizeof(buffer), +			               "{\"type\":0,\"ttl_ms\":%d}\n", heartbeat.ttl_ms); +			if (ret < 0) { +				LOG_ERR("Could not serialize heartbeat message"); +				continue; +			} +			size_t len = ret; + +			int ret = websocket_send_msg( +				fd, (const uint8_t *)buffer, len, +				WEBSOCKET_OPCODE_DATA_TEXT, false, true, 100 +			); +			if (ret < 0) { +				if (ret == -EBADF) { +					LOG_DBG("Connection closed, waiting for new one"); +				} else { +					LOG_ERR("Error on websocket_send_msg (%d)", ret); +				} +				break; +			} +		} +	} +} + +static int ws_recv_full_message(int fd, void *buf, const size_t len, +                                uint32_t *message_type, int64_t deadline) { +	uint64_t remaining = 0; +	size_t read = 0; +	int ret; + +	while (true) { +		int64_t timeout = MIN(50, deadline - k_uptime_get()); +		if (timeout <= 0) { +			ret = -ETIMEDOUT; +			LOG_ERR("No time left to read message from %d (%d)", fd, ret); +			return ret; +		} + +		if (read >= len) { +			ret = -ENOSPC; +			LOG_ERR("No buffer space left to store message from %d (%d)", fd, ret); +			return ret; +		} + +		ret = websocket_recv_msg(fd, (unsigned char *)buf+read, len-read, +		                         message_type, &remaining, timeout); +		if (ret < 0) { +			if (ret == -EAGAIN || ret == -EINTR) { +				LOG_DBG("Trying again to read full message"); +				continue; +			} else { +				LOG_ERR("Unhandled error on message reading (%d)", ret); +				return ret; +			} +		} + +		read += ret; + +		if (remaining <= 0) { +			return read; +		} +	} +} + +static int ws_handle_data_text(const uint8_t *buf, size_t len, int64_t *deadline) { +	LOG_DBG("Got text message"); + +	static const char heartbeat[] = "{\"type\":0,\"ttl_ms\":1100}\n"; + +	if (len + 1 == sizeof(heartbeat)) { +		if (memcmp((const void *)buf, heartbeat, len) == 0) { +			LOG_DBG("Received heartbeat from client"); +			*deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; +			return 0; +		} +	} + +	LOG_HEXDUMP_DBG(buf, len, "Could not handle message"); +	return -EIO; +} + +static int ws_rx_handle_connection(int fd) { +	LOG_INF("Handling RX of connection with file descriptor %d", fd); + +	uint8_t rx_buf[100]; +	uint32_t message_type = 0; +	int64_t rx_heartbeat_deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS; + +	while (true) { +		memset(rx_buf, 0, sizeof(rx_buf)); + +		int ret = ws_recv_full_message(fd, rx_buf, sizeof(rx_buf), +		                               &message_type, rx_heartbeat_deadline); +		if (ret < 0) { +			LOG_ERR("Could not receive full message from %d (%d)", fd, ret); +			return ret; +		} +		size_t len = ret; +		LOG_DBG("Received message with opcode %d of length %d", message_type, len); + +		if (message_type == WEBSOCKET_OPCODE_DATA_TEXT || +		    message_type == 3) {  // FIXME for some reason opcode 1 parses to 3 +			ret = ws_handle_data_text(rx_buf, len, &rx_heartbeat_deadline); +			if (ret < 0) { +				LOG_ERR("Failed to handle text data (%d)", ret); +			} +		} else if (message_type == WEBSOCKET_OPCODE_CLOSE) { +			LOG_INF("Client closed connection"); +			return 0; +		} else { +				LOG_WRN("Received unhandled message opcode %d", message_type); +				LOG_HEXDUMP_WRN(rx_buf, len, "Message content:"); +		} + +		if (k_uptime_get() > rx_heartbeat_deadline) { +			LOG_INF("Client heartbeat timeout expired on %d - closing", fd); +			return -ETIMEDOUT; +		} +	} +} + +static void ws_rx_thread_function(void *ptr1, void *ptr2, void *ptr3) { +	struct k_msgq *tx_fds = ptr1; + +	while (true) { +		int fd; +		int ret = k_msgq_get(&ws_connection_backlog, &fd, K_FOREVER); +		if (ret < 0) { +			LOG_ERR("Error getting new file descriptor (%d)", ret); +			continue; +		} + +		ret = k_msgq_put(tx_fds, (const void *)&fd, K_FOREVER); +		if (ret < 0) { +			LOG_ERR("Could not pass file descriptor %d to TX thread (%d)", fd, ret); +			goto unregister; +		} + +		ret = ws_rx_handle_connection(fd); +		if (ret < 0) { +			LOG_ERR("Failed to handle connection %d (%d)", fd, ret); +			goto unregister; +		} + +unregister: +		ret = websocket_unregister(fd); +		if (ret < 0) { +			LOG_ERR("Failed to unregister connection %d (%d)", fd, ret); +		} +	} +} + +K_MSGQ_DEFINE(ws_tx_fd_1, sizeof(int), 1, 1); +K_THREAD_DEFINE(ws_rx_thread_1, WS_HANDLER_STACK_SIZE, +                ws_rx_thread_function, &ws_tx_fd_1, NULL, NULL, +                WS_HANDLER_PRIO, 0, 0); +K_THREAD_DEFINE(ws_tx_thread_1, WS_HANDLER_STACK_SIZE, +                ws_tx_thread_function, &ws_tx_fd_1, NULL, NULL, +                WS_HANDLER_PRIO, 0, 0); + +int ws_upgrade_handler( +	int ws_socket, +	struct http_request_ctx *request_ctx, +	void *user_data +) { +	LOG_DBG("Handling WebSocket upgrade for file descriptor %d", ws_socket); + +	int ret = k_msgq_put(&ws_connection_backlog, (const void *)&ws_socket, +	                     K_NO_WAIT); +	if (ret < 0) { +		LOG_ERR("Connection backlog full, dropping file descriptor %d", ws_socket); +		return -ENOENT; +	} + +	LOG_INF("Added file descriptor %d to connection backlog", ws_socket); +	return 0; +} diff --git a/fw/src/ws.h b/fw/src/ws.h new file mode 100644 index 0000000..0c13039 --- /dev/null +++ b/fw/src/ws.h @@ -0,0 +1,17 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public License, + * v. 2.0. If a copy of the MPL was not distributed with this file, You can + * obtain one at https://mozilla.org/MPL/2.0/. + */ + +#ifndef SRC_WS_H +#define SRC_WS_H + + +#include <zephyr/net/http/server.h> + + +int ws_upgrade_handler(int ws_socket, struct http_request_ctx *request_ctx, +                       void *user_data); + +#endif // !SRC_WS_H  | 
