Merge pull request #7173 from jepler/async-socket

asyncio bugfixes, select bugfixes & selectable socket
This commit is contained in:
Dan Halbert 2022-11-07 18:32:05 -05:00 committed by GitHub
commit c9ad92d133
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 105 additions and 6 deletions

View File

@ -83,7 +83,7 @@ STATIC mp_obj_t ticks(void) {
// shared-bindings/supervisor/__init__.c). We assume/require that
// supervisor.ticks_ms is picked as the ticks implementation under
// CircuitPython for the Python-coded bits of asyncio.
#define ticks() MP_OBJ_NEW_SMALL_INT(supervisor_ticks_ms())
#define ticks() supervisor_ticks_ms()
#endif
STATIC mp_int_t ticks_diff(mp_obj_t t1_in, mp_obj_t t0_in) {

View File

@ -16,6 +16,7 @@
#include "py/stream.h"
#include "py/mperrno.h"
#include "py/mphal.h"
#include "shared/runtime/interrupt_char.h"
// Flags for poll()
#define FLAG_ONESHOT (1)
@ -230,6 +231,9 @@ STATIC mp_uint_t poll_poll_internal(uint n_args, const mp_obj_t *args) {
break;
}
RUN_BACKGROUND_TASKS;
if (mp_hal_is_interrupted()) {
return 0;
}
}
return n_ready;

View File

@ -553,3 +553,27 @@ mp_uint_t common_hal_socketpool_socket_sendto(socketpool_socket_obj_t *self,
void common_hal_socketpool_socket_settimeout(socketpool_socket_obj_t *self, uint32_t timeout_ms) {
self->timeout_ms = timeout_ms;
}
bool common_hal_socketpool_readable(socketpool_socket_obj_t *self) {
struct timeval immediate = {0, 0};
fd_set fds;
FD_ZERO(&fds);
FD_SET(self->num, &fds);
int num_triggered = select(self->num + 1, &fds, NULL, &fds, &immediate);
// including returning true in the error case
return num_triggered != 0;
}
bool common_hal_socketpool_writable(socketpool_socket_obj_t *self) {
struct timeval immediate = {0, 0};
fd_set fds;
FD_ZERO(&fds);
FD_SET(self->num, &fds);
int num_triggered = select(self->num + 1, NULL, &fds, &fds, &immediate);
// including returning true in the error case
return num_triggered != 0;
}

View File

@ -1163,3 +1163,46 @@ mp_uint_t common_hal_socketpool_socket_sendto(socketpool_socket_obj_t *socket,
void common_hal_socketpool_socket_settimeout(socketpool_socket_obj_t *self, uint32_t timeout_ms) {
self->timeout = timeout_ms;
}
bool common_hal_socketpool_readable(socketpool_socket_obj_t *self) {
MICROPY_PY_LWIP_ENTER;
bool result = self->incoming.pbuf != NULL;
if (self->state == STATE_PEER_CLOSED) {
result = true;
}
if (self->type == SOCKETPOOL_SOCK_STREAM && self->pcb.tcp->state == LISTEN) {
struct tcp_pcb *volatile *incoming_connection = &lwip_socket_incoming_array(self)[self->incoming.connection.iget];
result = (incoming_connection != NULL);
}
MICROPY_PY_LWIP_EXIT;
return result;
}
bool common_hal_socketpool_writable(socketpool_socket_obj_t *self) {
bool result = false;
MICROPY_PY_LWIP_ENTER;
switch (self->type) {
case SOCKETPOOL_SOCK_STREAM: {
result = tcp_sndbuf(self->pcb.tcp) != 0;
break;
}
case SOCKETPOOL_SOCK_DGRAM:
#if MICROPY_PY_LWIP_SOCK_RAW
case SOCKETPOOL_SOCK_RAW:
#endif
result = true;
break;
}
MICROPY_PY_LWIP_EXIT;
return result;
}

View File

@ -30,13 +30,14 @@
#include <stdio.h>
#include <string.h>
#include "shared/runtime/context_manager_helpers.h"
#include "py/objtuple.h"
#include "py/objlist.h"
#include "py/runtime.h"
#include "py/mperrno.h"
#include "py/objlist.h"
#include "py/objtuple.h"
#include "py/runtime.h"
#include "py/stream.h"
#include "shared/netutils/netutils.h"
#include "shared/runtime/context_manager_helpers.h"
#include "shared/runtime/interrupt_char.h"
//| class Socket:
@ -422,6 +423,31 @@ STATIC const mp_rom_map_elem_t socketpool_socket_locals_dict_table[] = {
STATIC MP_DEFINE_CONST_DICT(socketpool_socket_locals_dict, socketpool_socket_locals_dict_table);
STATIC mp_uint_t socket_ioctl(mp_obj_t self_in, mp_uint_t request, mp_uint_t arg, int *errcode) {
socketpool_socket_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_uint_t ret;
if (request == MP_STREAM_POLL) {
mp_uint_t flags = arg;
ret = 0;
if ((flags & MP_STREAM_POLL_RD) && common_hal_socketpool_readable(self) > 0) {
ret |= MP_STREAM_POLL_RD;
}
if ((flags & MP_STREAM_POLL_WR) && common_hal_socketpool_writable(self)) {
ret |= MP_STREAM_POLL_WR;
}
} else {
*errcode = MP_EINVAL;
ret = MP_STREAM_ERROR;
}
return ret;
}
STATIC const mp_stream_p_t socket_stream_p = {
MP_PROTO_IMPLEMENT(MP_QSTR_protocol_stream)
.ioctl = socket_ioctl,
.is_text = false,
};
const mp_obj_type_t socketpool_socket_type = {
{ &mp_type_type },
.flags = MP_TYPE_FLAG_EXTENDED,
@ -429,5 +455,6 @@ const mp_obj_type_t socketpool_socket_type = {
.locals_dict = (mp_obj_dict_t *)&socketpool_socket_locals_dict,
MP_TYPE_EXTENDED_FIELDS(
.unary_op = mp_generic_unary_op,
.protocol = &socket_stream_p,
)
};

View File

@ -46,6 +46,8 @@ mp_uint_t common_hal_socketpool_socket_send(socketpool_socket_obj_t *self, const
mp_uint_t common_hal_socketpool_socket_sendto(socketpool_socket_obj_t *self,
const char *host, size_t hostlen, uint32_t port, const uint8_t *buf, uint32_t len);
void common_hal_socketpool_socket_settimeout(socketpool_socket_obj_t *self, uint32_t timeout_ms);
bool common_hal_socketpool_readable(socketpool_socket_obj_t *self);
bool common_hal_socketpool_writable(socketpool_socket_obj_t *self);
// Non-allocating versions for internal use.
int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_t *port);
@ -53,5 +55,4 @@ void socketpool_socket_close(socketpool_socket_obj_t *self);
int socketpool_socket_send(socketpool_socket_obj_t *self, const uint8_t *buf, uint32_t len);
int socketpool_socket_recv_into(socketpool_socket_obj_t *self,
const uint8_t *buf, uint32_t len);
#endif // MICROPY_INCLUDED_SHARED_BINDINGS_SOCKETPOOL_SOCKET_H