From e9dd25412779106c018a9d4b6d00413bd4296257 Mon Sep 17 00:00:00 2001 From: Scott Shawcroft Date: Tue, 12 Jul 2022 09:37:09 -0700 Subject: [PATCH] Add socket select task to wake CP This fixes the web workflow on boards without a display. Fixes #6552 --- ports/espressif/background.c | 8 - .../espressif/common-hal/socketpool/Socket.c | 217 ++++++++++++++++-- .../espressif/common-hal/socketpool/Socket.h | 1 - .../common-hal/socketpool/SocketPool.c | 56 +---- ports/espressif/supervisor/port.c | 5 + supervisor/shared/web_workflow/web_workflow.c | 43 ++-- supervisor/shared/workflow.c | 4 + 7 files changed, 234 insertions(+), 100 deletions(-) diff --git a/ports/espressif/background.c b/ports/espressif/background.c index 1bf44700be..0b5bb96a3b 100644 --- a/ports/espressif/background.c +++ b/ports/espressif/background.c @@ -40,20 +40,12 @@ #include "common-hal/pulseio/PulseIn.h" #endif -#if CIRCUITPY_WEB_WORKFLOW -#include "supervisor/shared/web_workflow/web_workflow.h" -#endif - void port_background_task(void) { // Zero delay in case FreeRTOS wants to switch to something else. vTaskDelay(0); #if CIRCUITPY_PULSEIO pulsein_background(); #endif - - #if CIRCUITPY_WEB_WORKFLOW - supervisor_web_workflow_background(); - #endif } void port_start_background_task(void) { diff --git a/ports/espressif/common-hal/socketpool/Socket.c b/ports/espressif/common-hal/socketpool/Socket.c index 0774204bad..45d5f1fea0 100644 --- a/ports/espressif/common-hal/socketpool/Socket.c +++ b/ports/espressif/common-hal/socketpool/Socket.c @@ -30,38 +30,213 @@ #include "shared/runtime/interrupt_char.h" #include "py/mperrno.h" #include "py/runtime.h" +#include "shared-bindings/socketpool/SocketPool.h" +#include "supervisor/port.h" #include "supervisor/shared/tick.h" +#include "supervisor/workflow.h" #include "components/lwip/lwip/src/include/lwip/err.h" #include "components/lwip/lwip/src/include/lwip/sockets.h" #include "components/lwip/lwip/src/include/lwip/sys.h" #include "components/lwip/lwip/src/include/lwip/netdb.h" +#include "components/vfs/include/esp_vfs_eventfd.h" -STATIC socketpool_socket_obj_t *open_socket_handles[CONFIG_LWIP_MAX_SOCKETS]; +StackType_t socket_select_stack[2 * configMINIMAL_STACK_SIZE]; + +STATIC int open_socket_fds[CONFIG_LWIP_MAX_SOCKETS]; +STATIC bool user_socket[CONFIG_LWIP_MAX_SOCKETS]; +StaticTask_t socket_select_task_handle; +STATIC int socket_change_fd = -1; + +STATIC void socket_select_task(void *arg) { + uint64_t signal; + + while (true) { + fd_set readfds; + fd_set errfds; + FD_ZERO(&readfds); + FD_ZERO(&errfds); + FD_SET(socket_change_fd, &readfds); + FD_SET(socket_change_fd, &errfds); + int max_fd = socket_change_fd; + for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) { + if (open_socket_fds[i] < 0) { + continue; + } + max_fd = MAX(max_fd, open_socket_fds[i]); + FD_SET(open_socket_fds[i], &readfds); + FD_SET(open_socket_fds[i], &errfds); + } + + int num_triggered = select(max_fd + 1, &readfds, NULL, &errfds, NULL); + if (num_triggered < 0) { + // Maybe bad file descriptor + for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) { + int sockfd = open_socket_fds[i]; + if (sockfd < 0) { + continue; + } + if (FD_ISSET(sockfd, &errfds)) { + int err; + int optlen = sizeof(int); + int ret = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, (socklen_t *)&optlen); + if (ret < 0) { + open_socket_fds[i] = -1; + // Try again. + continue; + } + } + } + } + assert(num_triggered >= 0); + + if (FD_ISSET(socket_change_fd, &readfds)) { + read(socket_change_fd, &signal, sizeof(signal)); + num_triggered -= 1; + } + if (num_triggered > 0) { + supervisor_workflow_request_background(); + + // Wake up CircuitPython. We know it is asleep because we are lower + // priority. + port_wake_main_task(); + } + + } + close(socket_change_fd); + vTaskDelete(NULL); +} void socket_user_reset(void) { - for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_handles); i++) { - if (open_socket_handles[i]) { - if (open_socket_handles[i]->num > 0) { - // Close automatically clears socket handle - common_hal_socketpool_socket_close(open_socket_handles[i]); - } else { - open_socket_handles[i] = NULL; - } + if (socket_change_fd < 0) { + esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT(); + ESP_ERROR_CHECK(esp_vfs_eventfd_register(&config)); + + for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) { + open_socket_fds[i] = -1; + user_socket[i] = false; + } + socket_change_fd = eventfd(0, 0); + // This task runs at a lower priority than CircuitPython and is used to wake CircuitPython + // up when any open sockets have data to read. It allows us to sleep otherwise. + (void)xTaskCreateStaticPinnedToCore(socket_select_task, + "socket_select", + 2 * configMINIMAL_STACK_SIZE, + NULL, + 0, // Run this at IDLE priority. We only need it when CP isn't running (at 1). + socket_select_stack, + &socket_select_task_handle, + xPortGetCoreID()); + } + + for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) { + if (open_socket_fds[i] >= 0 && user_socket[i]) { + int num = open_socket_fds[i]; + // Close automatically clears socket handle + lwip_shutdown(num, SHUT_RDWR); + lwip_close(num); + open_socket_fds[i] = -1; + user_socket[i] = false; } } } -bool register_open_socket(socketpool_socket_obj_t *self) { - for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_handles); i++) { - if (open_socket_handles[i] == NULL) { - open_socket_handles[i] = self; +// The writes below send an event to the socket select task so that it redoes the +// select with the new open socket set. + +STATIC bool register_open_socket(int fd) { + for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) { + if (open_socket_fds[i] == -1) { + open_socket_fds[i] = fd; + user_socket[i] = false; + uint64_t signal = 1; + write(socket_change_fd, &signal, sizeof(signal)); return true; } } return false; } +STATIC void unregister_open_socket(int fd) { + for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) { + if (open_socket_fds[i] == fd) { + open_socket_fds[i] = -1; + user_socket[i] = false; + write(socket_change_fd, &fd, sizeof(fd)); + return; + } + } +} + +STATIC void mark_user_socket(int fd) { + for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) { + if (open_socket_fds[i] == fd) { + user_socket[i] = true; + return; + } + } +} + +bool socketpool_socket(socketpool_socketpool_obj_t *self, + socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type, + socketpool_socket_obj_t *sock) { + int addr_family; + int ipproto; + if (family == SOCKETPOOL_AF_INET) { + addr_family = AF_INET; + ipproto = IPPROTO_IP; + } else { // INET6 + addr_family = AF_INET6; + ipproto = IPPROTO_IPV6; + } + + int socket_type; + if (type == SOCKETPOOL_SOCK_STREAM) { + socket_type = SOCK_STREAM; + } else if (type == SOCKETPOOL_SOCK_DGRAM) { + socket_type = SOCK_DGRAM; + } else { // SOCKETPOOL_SOCK_RAW + socket_type = SOCK_RAW; + } + sock->type = socket_type; + sock->family = addr_family; + sock->ipproto = ipproto; + sock->pool = self; + sock->timeout_ms = (uint)-1; + + // Create LWIP socket + int socknum = -1; + socknum = lwip_socket(sock->family, sock->type, sock->ipproto); + if (socknum < 0) { + return false; + } + // This shouldn't happen since we have room for the same number of sockets as LWIP. + if (!register_open_socket(socknum)) { + lwip_close(socknum); + return false; + } + sock->num = socknum; + // Sockets should be nonblocking in most cases + lwip_fcntl(socknum, F_SETFL, O_NONBLOCK); + return true; +} + +socketpool_socket_obj_t *common_hal_socketpool_socket(socketpool_socketpool_obj_t *self, + socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type) { + if (family != SOCKETPOOL_AF_INET) { + mp_raise_NotImplementedError(translate("Only IPv4 sockets supported")); + } + + socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t); + sock->base.type = &socketpool_socket_type; + + if (!socketpool_socket(self, family, type, sock)) { + mp_raise_RuntimeError(translate("Out of sockets")); + } + mark_user_socket(sock->num); + return sock; +} + int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_t *port) { struct sockaddr_in accept_addr; socklen_t socklen = sizeof(accept_addr); @@ -92,6 +267,10 @@ int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_ if (newsoc < 0) { return -MP_EBADF; } + if (!register_open_socket(newsoc)) { + lwip_close(newsoc); + return -MP_EBADF; + } return newsoc; } @@ -100,6 +279,7 @@ socketpool_socket_obj_t *common_hal_socketpool_socket_accept(socketpool_socket_o int newsoc = socketpool_socket_accept(self, ip, port); if (newsoc > 0) { + mark_user_socket(newsoc); // Create the socket socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t); sock->base.type = &socketpool_socket_type; @@ -107,10 +287,6 @@ socketpool_socket_obj_t *common_hal_socketpool_socket_accept(socketpool_socket_o sock->pool = self->pool; sock->connected = true; - if (!register_open_socket(sock)) { - mp_raise_OSError(MP_EBADF); - } - lwip_fcntl(newsoc, F_SETFL, O_NONBLOCK); return sock; } else { @@ -150,18 +326,13 @@ void socketpool_socket_close(socketpool_socket_obj_t *self) { if (self->num >= 0) { lwip_shutdown(self->num, SHUT_RDWR); lwip_close(self->num); + unregister_open_socket(self->num); self->num = -1; } } void common_hal_socketpool_socket_close(socketpool_socket_obj_t *self) { socketpool_socket_close(self); - // Remove socket record - for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_handles); i++) { - if (open_socket_handles[i] == self) { - open_socket_handles[i] = NULL; - } - } } void common_hal_socketpool_socket_connect(socketpool_socket_obj_t *self, diff --git a/ports/espressif/common-hal/socketpool/Socket.h b/ports/espressif/common-hal/socketpool/Socket.h index 2b8ea9fcbf..b91419807c 100644 --- a/ports/espressif/common-hal/socketpool/Socket.h +++ b/ports/espressif/common-hal/socketpool/Socket.h @@ -46,6 +46,5 @@ typedef struct { } socketpool_socket_obj_t; void socket_user_reset(void); -bool register_open_socket(socketpool_socket_obj_t *self); #endif // MICROPY_INCLUDED_ESPRESSIF_COMMON_HAL_SOCKETPOOL_SOCKET_H diff --git a/ports/espressif/common-hal/socketpool/SocketPool.c b/ports/espressif/common-hal/socketpool/SocketPool.c index f9bf62d253..1d1aafa638 100644 --- a/ports/espressif/common-hal/socketpool/SocketPool.c +++ b/ports/espressif/common-hal/socketpool/SocketPool.c @@ -40,61 +40,7 @@ void common_hal_socketpool_socketpool_construct(socketpool_socketpool_obj_t *sel } } -bool socketpool_socket(socketpool_socketpool_obj_t *self, - socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type, - socketpool_socket_obj_t *sock) { - int addr_family; - int ipproto; - if (family == SOCKETPOOL_AF_INET) { - addr_family = AF_INET; - ipproto = IPPROTO_IP; - } else { // INET6 - addr_family = AF_INET6; - ipproto = IPPROTO_IPV6; - } - - int socket_type; - if (type == SOCKETPOOL_SOCK_STREAM) { - socket_type = SOCK_STREAM; - } else if (type == SOCKETPOOL_SOCK_DGRAM) { - socket_type = SOCK_DGRAM; - } else { // SOCKETPOOL_SOCK_RAW - socket_type = SOCK_RAW; - } - sock->type = socket_type; - sock->family = addr_family; - sock->ipproto = ipproto; - sock->pool = self; - sock->timeout_ms = (uint)-1; - - // Create LWIP socket - int socknum = -1; - socknum = lwip_socket(sock->family, sock->type, sock->ipproto); - if (socknum < 0) { - return false; - } - sock->num = socknum; - // Sockets should be nonblocking in most cases - lwip_fcntl(socknum, F_SETFL, O_NONBLOCK); - return true; -} - -socketpool_socket_obj_t *common_hal_socketpool_socket(socketpool_socketpool_obj_t *self, - socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type) { - if (family != SOCKETPOOL_AF_INET) { - mp_raise_NotImplementedError(translate("Only IPv4 sockets supported")); - } - - socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t); - sock->base.type = &socketpool_socket_type; - - if (!socketpool_socket(self, family, type, sock) || - !register_open_socket(sock)) { - mp_raise_RuntimeError(translate("Out of sockets")); - } - return sock; -} - +// common_hal_socketpool_socket is in socketpool/Socket.c to centralize open socket tracking. mp_obj_t common_hal_socketpool_socketpool_gethostbyname(socketpool_socketpool_obj_t *self, const char *host) { diff --git a/ports/espressif/supervisor/port.c b/ports/espressif/supervisor/port.c index 8d10c1f7dc..adefff3f71 100644 --- a/ports/espressif/supervisor/port.c +++ b/ports/espressif/supervisor/port.c @@ -53,6 +53,7 @@ #include "shared-bindings/microcontroller/__init__.h" #include "shared-bindings/microcontroller/RunMode.h" #include "shared-bindings/rtc/__init__.h" +#include "shared-bindings/socketpool/__init__.h" #include "peripherals/rmt.h" #include "peripherals/timer.h" @@ -296,6 +297,10 @@ void reset_port(void) { rtc_reset(); #endif + #if CIRCUITPY_SOCKETPOOL + socketpool_user_reset(); + #endif + #if CIRCUITPY_TOUCHIO_USE_NATIVE peripherals_touch_reset(); #endif diff --git a/supervisor/shared/web_workflow/web_workflow.c b/supervisor/shared/web_workflow/web_workflow.c index 6cf765b8f9..81730c92d0 100644 --- a/supervisor/shared/web_workflow/web_workflow.c +++ b/supervisor/shared/web_workflow/web_workflow.c @@ -404,7 +404,7 @@ static const char *OK_JSON = "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nC static void _cors_header(socketpool_socket_obj_t *socket, _request *request) { _send_strs(socket, "Access-Control-Allow-Credentials: true\r\n", - "Vary: Origin\r\n", + "Vary: Origin, Accept, Upgrade\r\n", "Access-Control-Allow-Origin: ", request->origin, "\r\n", NULL); } @@ -440,10 +440,10 @@ static void _reply_access_control(socketpool_socket_obj_t *socket, _request *req "Access-Control-Allow-Methods:GET, OPTIONS", NULL); if (!_usb_active()) { _send_str(socket, ", PUT, DELETE"); + #if CIRCUITPY_USB_MSC + usb_msc_unlock(); + #endif } - #if CIRCUITPY_USB_MSC - usb_msc_unlock(); - #endif _send_str(socket, "\r\n"); _cors_header(socket, request); _send_str(socket, "\r\n"); @@ -740,14 +740,25 @@ STATIC uint64_t truncate_time(uint64_t input_time, DWORD *fattime) { return truncated_time; } +STATIC void _discard_incoming(socketpool_socket_obj_t *socket, size_t amount) { + size_t discarded = 0; + while (discarded < amount) { + uint8_t bytes[64]; + size_t read_len = MIN(sizeof(bytes), amount - discarded); + int len = socketpool_socket_recv_into(socket, bytes, read_len); + if (len < 0) { + break; + } + discarded += read_len; + } +} + static void _write_file_and_reply(socketpool_socket_obj_t *socket, _request *request, FATFS *fs, const TCHAR *path) { FIL active_file; if (_usb_active()) { + _discard_incoming(socket, request->content_length); _reply_conflict(socket, request); - #if CIRCUITPY_USB_MSC - usb_msc_unlock(); - #endif return; } if (request->timestamp_ms > 0) { @@ -765,12 +776,20 @@ static void _write_file_and_reply(socketpool_socket_obj_t *socket, _request *req if (result == FR_NO_PATH) { override_fattime(0); + #if CIRCUITPY_USB_MSC + usb_msc_unlock(); + #endif + _discard_incoming(socket, request->content_length); _reply_missing(socket, request); return; } if (result != FR_OK) { ESP_LOGE(TAG, "file write error %d %s", result, path); override_fattime(0); + #if CIRCUITPY_USB_MSC + usb_msc_unlock(); + #endif + _discard_incoming(socket, request->content_length); _reply_server_error(socket, request); return; } else if (request->expect) { @@ -785,6 +804,7 @@ static void _write_file_and_reply(socketpool_socket_obj_t *socket, _request *req #if CIRCUITPY_USB_MSC usb_msc_unlock(); #endif + _discard_incoming(socket, request->content_length); // Too large. if (request->expect) { _reply_expectation_failed(socket, request); @@ -915,9 +935,6 @@ static bool _reply(socketpool_socket_obj_t *socket, _request *request) { } else if (strcmp(request->method, "DELETE") == 0) { if (_usb_active()) { _reply_conflict(socket, request); - #if CIRCUITPY_USB_MSC - usb_msc_unlock(); - #endif return false; } @@ -932,6 +949,9 @@ static bool _reply(socketpool_socket_obj_t *socket, _request *request) { } } + #if CIRCUITPY_USB_MSC + usb_msc_unlock(); + #endif if (result == FR_NO_PATH || result == FR_NO_FILE) { _reply_missing(socket, request); } else if (result != FR_OK) { @@ -965,9 +985,6 @@ static bool _reply(socketpool_socket_obj_t *socket, _request *request) { } else if (strcmp(request->method, "PUT") == 0) { if (_usb_active()) { _reply_conflict(socket, request); - #if CIRCUITPY_USB_MSC - usb_msc_unlock(); - #endif return false; } diff --git a/supervisor/shared/workflow.c b/supervisor/shared/workflow.c index a7bddca84d..23532181c6 100644 --- a/supervisor/shared/workflow.c +++ b/supervisor/shared/workflow.c @@ -66,6 +66,10 @@ static void workflow_background(void *data) { #if CIRCUITPY_STATUS_BAR supervisor_workflow_update_status_bar(); #endif + + #if CIRCUITPY_WEB_WORKFLOW + supervisor_web_workflow_background(); + #endif } // Called during a VM reset. Doesn't actually reset things.