Add socket select task to wake CP

This fixes the web workflow on boards without a display.

Fixes #6552
This commit is contained in:
Scott Shawcroft 2022-07-12 09:37:09 -07:00
parent 3515003b30
commit e9dd254127
No known key found for this signature in database
GPG Key ID: 0DFD512649C052DA
7 changed files with 234 additions and 100 deletions

View File

@ -40,20 +40,12 @@
#include "common-hal/pulseio/PulseIn.h" #include "common-hal/pulseio/PulseIn.h"
#endif #endif
#if CIRCUITPY_WEB_WORKFLOW
#include "supervisor/shared/web_workflow/web_workflow.h"
#endif
void port_background_task(void) { void port_background_task(void) {
// Zero delay in case FreeRTOS wants to switch to something else. // Zero delay in case FreeRTOS wants to switch to something else.
vTaskDelay(0); vTaskDelay(0);
#if CIRCUITPY_PULSEIO #if CIRCUITPY_PULSEIO
pulsein_background(); pulsein_background();
#endif #endif
#if CIRCUITPY_WEB_WORKFLOW
supervisor_web_workflow_background();
#endif
} }
void port_start_background_task(void) { void port_start_background_task(void) {

View File

@ -30,38 +30,213 @@
#include "shared/runtime/interrupt_char.h" #include "shared/runtime/interrupt_char.h"
#include "py/mperrno.h" #include "py/mperrno.h"
#include "py/runtime.h" #include "py/runtime.h"
#include "shared-bindings/socketpool/SocketPool.h"
#include "supervisor/port.h"
#include "supervisor/shared/tick.h" #include "supervisor/shared/tick.h"
#include "supervisor/workflow.h"
#include "components/lwip/lwip/src/include/lwip/err.h" #include "components/lwip/lwip/src/include/lwip/err.h"
#include "components/lwip/lwip/src/include/lwip/sockets.h" #include "components/lwip/lwip/src/include/lwip/sockets.h"
#include "components/lwip/lwip/src/include/lwip/sys.h" #include "components/lwip/lwip/src/include/lwip/sys.h"
#include "components/lwip/lwip/src/include/lwip/netdb.h" #include "components/lwip/lwip/src/include/lwip/netdb.h"
#include "components/vfs/include/esp_vfs_eventfd.h"
STATIC socketpool_socket_obj_t *open_socket_handles[CONFIG_LWIP_MAX_SOCKETS]; StackType_t socket_select_stack[2 * configMINIMAL_STACK_SIZE];
STATIC int open_socket_fds[CONFIG_LWIP_MAX_SOCKETS];
STATIC bool user_socket[CONFIG_LWIP_MAX_SOCKETS];
StaticTask_t socket_select_task_handle;
STATIC int socket_change_fd = -1;
STATIC void socket_select_task(void *arg) {
uint64_t signal;
while (true) {
fd_set readfds;
fd_set errfds;
FD_ZERO(&readfds);
FD_ZERO(&errfds);
FD_SET(socket_change_fd, &readfds);
FD_SET(socket_change_fd, &errfds);
int max_fd = socket_change_fd;
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] < 0) {
continue;
}
max_fd = MAX(max_fd, open_socket_fds[i]);
FD_SET(open_socket_fds[i], &readfds);
FD_SET(open_socket_fds[i], &errfds);
}
int num_triggered = select(max_fd + 1, &readfds, NULL, &errfds, NULL);
if (num_triggered < 0) {
// Maybe bad file descriptor
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
int sockfd = open_socket_fds[i];
if (sockfd < 0) {
continue;
}
if (FD_ISSET(sockfd, &errfds)) {
int err;
int optlen = sizeof(int);
int ret = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, (socklen_t *)&optlen);
if (ret < 0) {
open_socket_fds[i] = -1;
// Try again.
continue;
}
}
}
}
assert(num_triggered >= 0);
if (FD_ISSET(socket_change_fd, &readfds)) {
read(socket_change_fd, &signal, sizeof(signal));
num_triggered -= 1;
}
if (num_triggered > 0) {
supervisor_workflow_request_background();
// Wake up CircuitPython. We know it is asleep because we are lower
// priority.
port_wake_main_task();
}
}
close(socket_change_fd);
vTaskDelete(NULL);
}
void socket_user_reset(void) { void socket_user_reset(void) {
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_handles); i++) { if (socket_change_fd < 0) {
if (open_socket_handles[i]) { esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
if (open_socket_handles[i]->num > 0) { ESP_ERROR_CHECK(esp_vfs_eventfd_register(&config));
// Close automatically clears socket handle
common_hal_socketpool_socket_close(open_socket_handles[i]); for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
} else { open_socket_fds[i] = -1;
open_socket_handles[i] = NULL; user_socket[i] = false;
} }
socket_change_fd = eventfd(0, 0);
// This task runs at a lower priority than CircuitPython and is used to wake CircuitPython
// up when any open sockets have data to read. It allows us to sleep otherwise.
(void)xTaskCreateStaticPinnedToCore(socket_select_task,
"socket_select",
2 * configMINIMAL_STACK_SIZE,
NULL,
0, // Run this at IDLE priority. We only need it when CP isn't running (at 1).
socket_select_stack,
&socket_select_task_handle,
xPortGetCoreID());
}
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] >= 0 && user_socket[i]) {
int num = open_socket_fds[i];
// Close automatically clears socket handle
lwip_shutdown(num, SHUT_RDWR);
lwip_close(num);
open_socket_fds[i] = -1;
user_socket[i] = false;
} }
} }
} }
bool register_open_socket(socketpool_socket_obj_t *self) { // The writes below send an event to the socket select task so that it redoes the
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_handles); i++) { // select with the new open socket set.
if (open_socket_handles[i] == NULL) {
open_socket_handles[i] = self; STATIC bool register_open_socket(int fd) {
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] == -1) {
open_socket_fds[i] = fd;
user_socket[i] = false;
uint64_t signal = 1;
write(socket_change_fd, &signal, sizeof(signal));
return true; return true;
} }
} }
return false; return false;
} }
STATIC void unregister_open_socket(int fd) {
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] == fd) {
open_socket_fds[i] = -1;
user_socket[i] = false;
write(socket_change_fd, &fd, sizeof(fd));
return;
}
}
}
STATIC void mark_user_socket(int fd) {
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] == fd) {
user_socket[i] = true;
return;
}
}
}
bool socketpool_socket(socketpool_socketpool_obj_t *self,
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
socketpool_socket_obj_t *sock) {
int addr_family;
int ipproto;
if (family == SOCKETPOOL_AF_INET) {
addr_family = AF_INET;
ipproto = IPPROTO_IP;
} else { // INET6
addr_family = AF_INET6;
ipproto = IPPROTO_IPV6;
}
int socket_type;
if (type == SOCKETPOOL_SOCK_STREAM) {
socket_type = SOCK_STREAM;
} else if (type == SOCKETPOOL_SOCK_DGRAM) {
socket_type = SOCK_DGRAM;
} else { // SOCKETPOOL_SOCK_RAW
socket_type = SOCK_RAW;
}
sock->type = socket_type;
sock->family = addr_family;
sock->ipproto = ipproto;
sock->pool = self;
sock->timeout_ms = (uint)-1;
// Create LWIP socket
int socknum = -1;
socknum = lwip_socket(sock->family, sock->type, sock->ipproto);
if (socknum < 0) {
return false;
}
// This shouldn't happen since we have room for the same number of sockets as LWIP.
if (!register_open_socket(socknum)) {
lwip_close(socknum);
return false;
}
sock->num = socknum;
// Sockets should be nonblocking in most cases
lwip_fcntl(socknum, F_SETFL, O_NONBLOCK);
return true;
}
socketpool_socket_obj_t *common_hal_socketpool_socket(socketpool_socketpool_obj_t *self,
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type) {
if (family != SOCKETPOOL_AF_INET) {
mp_raise_NotImplementedError(translate("Only IPv4 sockets supported"));
}
socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t);
sock->base.type = &socketpool_socket_type;
if (!socketpool_socket(self, family, type, sock)) {
mp_raise_RuntimeError(translate("Out of sockets"));
}
mark_user_socket(sock->num);
return sock;
}
int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_t *port) { int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_t *port) {
struct sockaddr_in accept_addr; struct sockaddr_in accept_addr;
socklen_t socklen = sizeof(accept_addr); socklen_t socklen = sizeof(accept_addr);
@ -92,6 +267,10 @@ int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_
if (newsoc < 0) { if (newsoc < 0) {
return -MP_EBADF; return -MP_EBADF;
} }
if (!register_open_socket(newsoc)) {
lwip_close(newsoc);
return -MP_EBADF;
}
return newsoc; return newsoc;
} }
@ -100,6 +279,7 @@ socketpool_socket_obj_t *common_hal_socketpool_socket_accept(socketpool_socket_o
int newsoc = socketpool_socket_accept(self, ip, port); int newsoc = socketpool_socket_accept(self, ip, port);
if (newsoc > 0) { if (newsoc > 0) {
mark_user_socket(newsoc);
// Create the socket // Create the socket
socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t); socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t);
sock->base.type = &socketpool_socket_type; sock->base.type = &socketpool_socket_type;
@ -107,10 +287,6 @@ socketpool_socket_obj_t *common_hal_socketpool_socket_accept(socketpool_socket_o
sock->pool = self->pool; sock->pool = self->pool;
sock->connected = true; sock->connected = true;
if (!register_open_socket(sock)) {
mp_raise_OSError(MP_EBADF);
}
lwip_fcntl(newsoc, F_SETFL, O_NONBLOCK); lwip_fcntl(newsoc, F_SETFL, O_NONBLOCK);
return sock; return sock;
} else { } else {
@ -150,18 +326,13 @@ void socketpool_socket_close(socketpool_socket_obj_t *self) {
if (self->num >= 0) { if (self->num >= 0) {
lwip_shutdown(self->num, SHUT_RDWR); lwip_shutdown(self->num, SHUT_RDWR);
lwip_close(self->num); lwip_close(self->num);
unregister_open_socket(self->num);
self->num = -1; self->num = -1;
} }
} }
void common_hal_socketpool_socket_close(socketpool_socket_obj_t *self) { void common_hal_socketpool_socket_close(socketpool_socket_obj_t *self) {
socketpool_socket_close(self); socketpool_socket_close(self);
// Remove socket record
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_handles); i++) {
if (open_socket_handles[i] == self) {
open_socket_handles[i] = NULL;
}
}
} }
void common_hal_socketpool_socket_connect(socketpool_socket_obj_t *self, void common_hal_socketpool_socket_connect(socketpool_socket_obj_t *self,

View File

@ -46,6 +46,5 @@ typedef struct {
} socketpool_socket_obj_t; } socketpool_socket_obj_t;
void socket_user_reset(void); void socket_user_reset(void);
bool register_open_socket(socketpool_socket_obj_t *self);
#endif // MICROPY_INCLUDED_ESPRESSIF_COMMON_HAL_SOCKETPOOL_SOCKET_H #endif // MICROPY_INCLUDED_ESPRESSIF_COMMON_HAL_SOCKETPOOL_SOCKET_H

View File

@ -40,61 +40,7 @@ void common_hal_socketpool_socketpool_construct(socketpool_socketpool_obj_t *sel
} }
} }
bool socketpool_socket(socketpool_socketpool_obj_t *self, // common_hal_socketpool_socket is in socketpool/Socket.c to centralize open socket tracking.
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
socketpool_socket_obj_t *sock) {
int addr_family;
int ipproto;
if (family == SOCKETPOOL_AF_INET) {
addr_family = AF_INET;
ipproto = IPPROTO_IP;
} else { // INET6
addr_family = AF_INET6;
ipproto = IPPROTO_IPV6;
}
int socket_type;
if (type == SOCKETPOOL_SOCK_STREAM) {
socket_type = SOCK_STREAM;
} else if (type == SOCKETPOOL_SOCK_DGRAM) {
socket_type = SOCK_DGRAM;
} else { // SOCKETPOOL_SOCK_RAW
socket_type = SOCK_RAW;
}
sock->type = socket_type;
sock->family = addr_family;
sock->ipproto = ipproto;
sock->pool = self;
sock->timeout_ms = (uint)-1;
// Create LWIP socket
int socknum = -1;
socknum = lwip_socket(sock->family, sock->type, sock->ipproto);
if (socknum < 0) {
return false;
}
sock->num = socknum;
// Sockets should be nonblocking in most cases
lwip_fcntl(socknum, F_SETFL, O_NONBLOCK);
return true;
}
socketpool_socket_obj_t *common_hal_socketpool_socket(socketpool_socketpool_obj_t *self,
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type) {
if (family != SOCKETPOOL_AF_INET) {
mp_raise_NotImplementedError(translate("Only IPv4 sockets supported"));
}
socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t);
sock->base.type = &socketpool_socket_type;
if (!socketpool_socket(self, family, type, sock) ||
!register_open_socket(sock)) {
mp_raise_RuntimeError(translate("Out of sockets"));
}
return sock;
}
mp_obj_t common_hal_socketpool_socketpool_gethostbyname(socketpool_socketpool_obj_t *self, mp_obj_t common_hal_socketpool_socketpool_gethostbyname(socketpool_socketpool_obj_t *self,
const char *host) { const char *host) {

View File

@ -53,6 +53,7 @@
#include "shared-bindings/microcontroller/__init__.h" #include "shared-bindings/microcontroller/__init__.h"
#include "shared-bindings/microcontroller/RunMode.h" #include "shared-bindings/microcontroller/RunMode.h"
#include "shared-bindings/rtc/__init__.h" #include "shared-bindings/rtc/__init__.h"
#include "shared-bindings/socketpool/__init__.h"
#include "peripherals/rmt.h" #include "peripherals/rmt.h"
#include "peripherals/timer.h" #include "peripherals/timer.h"
@ -296,6 +297,10 @@ void reset_port(void) {
rtc_reset(); rtc_reset();
#endif #endif
#if CIRCUITPY_SOCKETPOOL
socketpool_user_reset();
#endif
#if CIRCUITPY_TOUCHIO_USE_NATIVE #if CIRCUITPY_TOUCHIO_USE_NATIVE
peripherals_touch_reset(); peripherals_touch_reset();
#endif #endif

View File

@ -404,7 +404,7 @@ static const char *OK_JSON = "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nC
static void _cors_header(socketpool_socket_obj_t *socket, _request *request) { static void _cors_header(socketpool_socket_obj_t *socket, _request *request) {
_send_strs(socket, _send_strs(socket,
"Access-Control-Allow-Credentials: true\r\n", "Access-Control-Allow-Credentials: true\r\n",
"Vary: Origin\r\n", "Vary: Origin, Accept, Upgrade\r\n",
"Access-Control-Allow-Origin: ", request->origin, "\r\n", "Access-Control-Allow-Origin: ", request->origin, "\r\n",
NULL); NULL);
} }
@ -440,10 +440,10 @@ static void _reply_access_control(socketpool_socket_obj_t *socket, _request *req
"Access-Control-Allow-Methods:GET, OPTIONS", NULL); "Access-Control-Allow-Methods:GET, OPTIONS", NULL);
if (!_usb_active()) { if (!_usb_active()) {
_send_str(socket, ", PUT, DELETE"); _send_str(socket, ", PUT, DELETE");
#if CIRCUITPY_USB_MSC
usb_msc_unlock();
#endif
} }
#if CIRCUITPY_USB_MSC
usb_msc_unlock();
#endif
_send_str(socket, "\r\n"); _send_str(socket, "\r\n");
_cors_header(socket, request); _cors_header(socket, request);
_send_str(socket, "\r\n"); _send_str(socket, "\r\n");
@ -740,14 +740,25 @@ STATIC uint64_t truncate_time(uint64_t input_time, DWORD *fattime) {
return truncated_time; return truncated_time;
} }
STATIC void _discard_incoming(socketpool_socket_obj_t *socket, size_t amount) {
size_t discarded = 0;
while (discarded < amount) {
uint8_t bytes[64];
size_t read_len = MIN(sizeof(bytes), amount - discarded);
int len = socketpool_socket_recv_into(socket, bytes, read_len);
if (len < 0) {
break;
}
discarded += read_len;
}
}
static void _write_file_and_reply(socketpool_socket_obj_t *socket, _request *request, FATFS *fs, const TCHAR *path) { static void _write_file_and_reply(socketpool_socket_obj_t *socket, _request *request, FATFS *fs, const TCHAR *path) {
FIL active_file; FIL active_file;
if (_usb_active()) { if (_usb_active()) {
_discard_incoming(socket, request->content_length);
_reply_conflict(socket, request); _reply_conflict(socket, request);
#if CIRCUITPY_USB_MSC
usb_msc_unlock();
#endif
return; return;
} }
if (request->timestamp_ms > 0) { if (request->timestamp_ms > 0) {
@ -765,12 +776,20 @@ static void _write_file_and_reply(socketpool_socket_obj_t *socket, _request *req
if (result == FR_NO_PATH) { if (result == FR_NO_PATH) {
override_fattime(0); override_fattime(0);
#if CIRCUITPY_USB_MSC
usb_msc_unlock();
#endif
_discard_incoming(socket, request->content_length);
_reply_missing(socket, request); _reply_missing(socket, request);
return; return;
} }
if (result != FR_OK) { if (result != FR_OK) {
ESP_LOGE(TAG, "file write error %d %s", result, path); ESP_LOGE(TAG, "file write error %d %s", result, path);
override_fattime(0); override_fattime(0);
#if CIRCUITPY_USB_MSC
usb_msc_unlock();
#endif
_discard_incoming(socket, request->content_length);
_reply_server_error(socket, request); _reply_server_error(socket, request);
return; return;
} else if (request->expect) { } else if (request->expect) {
@ -785,6 +804,7 @@ static void _write_file_and_reply(socketpool_socket_obj_t *socket, _request *req
#if CIRCUITPY_USB_MSC #if CIRCUITPY_USB_MSC
usb_msc_unlock(); usb_msc_unlock();
#endif #endif
_discard_incoming(socket, request->content_length);
// Too large. // Too large.
if (request->expect) { if (request->expect) {
_reply_expectation_failed(socket, request); _reply_expectation_failed(socket, request);
@ -915,9 +935,6 @@ static bool _reply(socketpool_socket_obj_t *socket, _request *request) {
} else if (strcmp(request->method, "DELETE") == 0) { } else if (strcmp(request->method, "DELETE") == 0) {
if (_usb_active()) { if (_usb_active()) {
_reply_conflict(socket, request); _reply_conflict(socket, request);
#if CIRCUITPY_USB_MSC
usb_msc_unlock();
#endif
return false; return false;
} }
@ -932,6 +949,9 @@ static bool _reply(socketpool_socket_obj_t *socket, _request *request) {
} }
} }
#if CIRCUITPY_USB_MSC
usb_msc_unlock();
#endif
if (result == FR_NO_PATH || result == FR_NO_FILE) { if (result == FR_NO_PATH || result == FR_NO_FILE) {
_reply_missing(socket, request); _reply_missing(socket, request);
} else if (result != FR_OK) { } else if (result != FR_OK) {
@ -965,9 +985,6 @@ static bool _reply(socketpool_socket_obj_t *socket, _request *request) {
} else if (strcmp(request->method, "PUT") == 0) { } else if (strcmp(request->method, "PUT") == 0) {
if (_usb_active()) { if (_usb_active()) {
_reply_conflict(socket, request); _reply_conflict(socket, request);
#if CIRCUITPY_USB_MSC
usb_msc_unlock();
#endif
return false; return false;
} }

View File

@ -66,6 +66,10 @@ static void workflow_background(void *data) {
#if CIRCUITPY_STATUS_BAR #if CIRCUITPY_STATUS_BAR
supervisor_workflow_update_status_bar(); supervisor_workflow_update_status_bar();
#endif #endif
#if CIRCUITPY_WEB_WORKFLOW
supervisor_web_workflow_background();
#endif
} }
// Called during a VM reset. Doesn't actually reset things. // Called during a VM reset. Doesn't actually reset things.