/*
 * This Source Code Form is subject to the terms of the Mozilla Public License,
 * v. 2.0. If a copy of the MPL was not distributed with this file, You can
 * obtain one at https://mozilla.org/MPL/2.0/.
 */


#include <errno.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>

#include <zephyr/kernel.h>
#include <zephyr/logging/log.h>
#include <zephyr/net/http/server.h>
#include <zephyr/net/websocket.h>
#include <zephyr/sys_clock.h>
#include <zephyr/zbus/zbus.h>

#include "heart.h"


LOG_MODULE_REGISTER(ws);


#define WS_NUM_HANDLERS                           1
#define WS_HANDLER_STACK_SIZE                  2048
#define WS_HANDLER_PRIO           K_PRIO_PREEMPT(8)
#define WS_CONNECTION_BACKLOG_LEN                 5
#define WS_TX_BUFFER_LEN                        150
#define WS_CLIENT_HEARTBEAT_GRACE_TIME_MS     10000

K_MSGQ_DEFINE(ws_connection_backlog, sizeof(int), WS_CONNECTION_BACKLOG_LEN, 1);
ZBUS_MSG_SUBSCRIBER_DEFINE(ws_tx_messages);
ZBUS_CHAN_ADD_OBS(heartbeat_channel, ws_tx_messages, 3);

static void ws_tx_thread_function(void *ptr1, void *ptr2, void *ptr3) {
	int fd = -1;
	struct k_msgq *tx_fds = ptr1;
	uint8_t buffer[WS_TX_BUFFER_LEN];
	const struct zbus_channel *chan;
	struct heartbeat heartbeat;

	while (true) {
		int ret = zbus_obs_set_enable(&ws_tx_messages, false);
		if (ret < 0) {
			LOG_ERR("Could not disable zbus observer (%d)", ret);
			continue;
		}

		ret = k_msgq_get(tx_fds, &fd, K_FOREVER);
		if (ret < 0) {
			LOG_ERR("Error getting new file descriptor (%d)", ret);
			continue;
		}
		LOG_INF("Handling TX of connection with file descriptor %d", fd);

		ret = zbus_obs_set_enable(&ws_tx_messages, true);
		if (ret < 0) {
			LOG_ERR("Could not enable zbus observer (%d)", ret);
			continue;
		}

		while (true) {
			ret = zbus_sub_wait_msg(&ws_tx_messages, &chan, &heartbeat, K_FOREVER);
			if (ret < 0) {
				LOG_ERR("Could not wait for zbus messages (%d)", ret);
				break;
			}
			if (chan != &heartbeat_channel) {
				LOG_ERR("Unsupported channel");
				continue;
			}

			ret = snprintf((char *)buffer, sizeof(buffer),
			               "{\"type\":0,\"ttl_ms\":%d}\n", heartbeat.ttl_ms);
			if (ret < 0) {
				LOG_ERR("Could not serialize heartbeat message");
				continue;
			}
			size_t len = ret;

			int ret = websocket_send_msg(
				fd, (const uint8_t *)buffer, len,
				WEBSOCKET_OPCODE_DATA_TEXT, false, true, 100
			);
			if (ret < 0) {
				if (ret == -EBADF) {
					LOG_DBG("Connection closed, waiting for new one");
				} else {
					LOG_ERR("Error on websocket_send_msg (%d)", ret);
				}
				break;
			}
		}
	}
}

static int ws_recv_full_message(int fd, void *buf, const size_t len,
                                uint32_t *message_type, int64_t deadline) {
	uint64_t remaining = 0;
	size_t read = 0;
	int ret;

	while (true) {
		int64_t timeout = MIN(50, deadline - k_uptime_get());
		if (timeout <= 0) {
			ret = -ETIMEDOUT;
			LOG_ERR("No time left to read message from %d (%d)", fd, ret);
			return ret;
		}

		if (read >= len) {
			ret = -ENOSPC;
			LOG_ERR("No buffer space left to store message from %d (%d)", fd, ret);
			return ret;
		}

		ret = websocket_recv_msg(fd, (unsigned char *)buf+read, len-read,
		                         message_type, &remaining, timeout);
		if (ret < 0) {
			if (ret == -EAGAIN || ret == -EINTR) {
				LOG_DBG("Trying again to read full message");
				continue;
			} else {
				LOG_ERR("Unhandled error on message reading (%d)", ret);
				return ret;
			}
		}

		read += ret;

		if (remaining <= 0) {
			return read;
		}
	}
}

static int ws_handle_data_text(const uint8_t *buf, size_t len, int64_t *deadline) {
	LOG_DBG("Got text message");

	static const char heartbeat[] = "{\"type\":0,\"ttl_ms\":1100}\n";

	if (len + 1 == sizeof(heartbeat)) {
		if (memcmp((const void *)buf, heartbeat, len) == 0) {
			LOG_DBG("Received heartbeat from client");
			*deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS;
			return 0;
		}
	}

	LOG_HEXDUMP_DBG(buf, len, "Could not handle message");
	return -EIO;
}

static int ws_rx_handle_connection(int fd) {
	LOG_INF("Handling RX of connection with file descriptor %d", fd);

	uint8_t rx_buf[100];
	uint32_t message_type = 0;
	int64_t rx_heartbeat_deadline = k_uptime_get() + WS_CLIENT_HEARTBEAT_GRACE_TIME_MS;

	while (true) {
		memset(rx_buf, 0, sizeof(rx_buf));

		int ret = ws_recv_full_message(fd, rx_buf, sizeof(rx_buf),
		                               &message_type, rx_heartbeat_deadline);
		if (ret < 0) {
			LOG_ERR("Could not receive full message from %d (%d)", fd, ret);
			return ret;
		}
		size_t len = ret;
		LOG_DBG("Received message with opcode %d of length %d", message_type, len);

		if (message_type == WEBSOCKET_OPCODE_DATA_TEXT ||
		    message_type == 3) {  // FIXME for some reason opcode 1 parses to 3
			ret = ws_handle_data_text(rx_buf, len, &rx_heartbeat_deadline);
			if (ret < 0) {
				LOG_ERR("Failed to handle text data (%d)", ret);
			}
		} else if (message_type == WEBSOCKET_OPCODE_CLOSE) {
			LOG_INF("Client closed connection");
			return 0;
		} else {
				LOG_WRN("Received unhandled message opcode %d", message_type);
				LOG_HEXDUMP_WRN(rx_buf, len, "Message content:");
		}

		if (k_uptime_get() > rx_heartbeat_deadline) {
			LOG_INF("Client heartbeat timeout expired on %d - closing", fd);
			return -ETIMEDOUT;
		}
	}
}

static void ws_rx_thread_function(void *ptr1, void *ptr2, void *ptr3) {
	struct k_msgq *tx_fds = ptr1;

	while (true) {
		int fd;
		int ret = k_msgq_get(&ws_connection_backlog, &fd, K_FOREVER);
		if (ret < 0) {
			LOG_ERR("Error getting new file descriptor (%d)", ret);
			continue;
		}

		ret = k_msgq_put(tx_fds, (const void *)&fd, K_FOREVER);
		if (ret < 0) {
			LOG_ERR("Could not pass file descriptor %d to TX thread (%d)", fd, ret);
			goto unregister;
		}

		ret = ws_rx_handle_connection(fd);
		if (ret < 0) {
			LOG_ERR("Failed to handle connection %d (%d)", fd, ret);
			goto unregister;
		}

unregister:
		ret = websocket_unregister(fd);
		if (ret < 0) {
			LOG_ERR("Failed to unregister connection %d (%d)", fd, ret);
		}
	}
}

K_MSGQ_DEFINE(ws_tx_fd_1, sizeof(int), 1, 1);
K_THREAD_DEFINE(ws_rx_thread_1, WS_HANDLER_STACK_SIZE,
                ws_rx_thread_function, &ws_tx_fd_1, NULL, NULL,
                WS_HANDLER_PRIO, 0, 0);
K_THREAD_DEFINE(ws_tx_thread_1, WS_HANDLER_STACK_SIZE,
                ws_tx_thread_function, &ws_tx_fd_1, NULL, NULL,
                WS_HANDLER_PRIO, 0, 0);

int ws_upgrade_handler(
	int ws_socket,
	struct http_request_ctx *request_ctx,
	void *user_data
) {
	LOG_DBG("Handling WebSocket upgrade for file descriptor %d", ws_socket);

	int ret = k_msgq_put(&ws_connection_backlog, (const void *)&ws_socket,
	                     K_NO_WAIT);
	if (ret < 0) {
		LOG_ERR("Connection backlog full, dropping file descriptor %d", ws_socket);
		return -ENOENT;
	}

	LOG_INF("Added file descriptor %d to connection backlog", ws_socket);
	return 0;
}