extmod/modlwip: Store a chain of incoming pbufs, instead of only one.
Storing a chain of pbuf was an original design of @pfalcon's lwIP socket module. The problem with storing just one, like modlwip does is that "peer closed connection" notification is completely asynchronous and out of band. So, there may be following sequence of actions: 1. pbuf #1 arrives, and stored in a socket. 2. pbuf #2 arrives, and rejected, which causes lwIP to put it into a queue to re-deliver later. 3. "Peer closed connection" is signaled, and socket is set at such status. 4. pbuf #1 is processed. 5. There's no stored pbufs in teh socket, and socket status is "peer closed connection", so EOF is returned to a client. 6. pbuf #2 gets redelivered. Apparently, there's no easy workaround for this, except to queue all incoming pbufs in a socket. This may lead to increased memory pressure, as number of pending packets would be regulated only by TCP/IP flow control, whereas with previous setup lwIP had a global overlook of number packets waiting for redelivery and could regulate them centrally.
This commit is contained in:
parent
c7fba524cb
commit
5071ceea07
|
@ -239,7 +239,7 @@ typedef struct _lwip_socket_obj_t {
|
||||||
byte peer[4];
|
byte peer[4];
|
||||||
mp_uint_t peer_port;
|
mp_uint_t peer_port;
|
||||||
mp_uint_t timeout;
|
mp_uint_t timeout;
|
||||||
uint16_t leftover_count;
|
uint16_t recv_offset;
|
||||||
|
|
||||||
uint8_t domain;
|
uint8_t domain;
|
||||||
uint8_t type;
|
uint8_t type;
|
||||||
|
@ -354,11 +354,17 @@ STATIC err_t _lwip_tcp_recv(void *arg, struct tcp_pcb *tcpb, struct pbuf *p, err
|
||||||
socket->state = STATE_PEER_CLOSED;
|
socket->state = STATE_PEER_CLOSED;
|
||||||
exec_user_callback(socket);
|
exec_user_callback(socket);
|
||||||
return ERR_OK;
|
return ERR_OK;
|
||||||
} else if (socket->incoming.pbuf != NULL) {
|
|
||||||
// No room in the inn, let LWIP know it's still responsible for delivery later
|
|
||||||
return ERR_BUF;
|
|
||||||
}
|
}
|
||||||
socket->incoming.pbuf = p;
|
|
||||||
|
if (socket->incoming.pbuf == NULL) {
|
||||||
|
socket->incoming.pbuf = p;
|
||||||
|
} else {
|
||||||
|
#ifdef SOCKET_SINGLE_PBUF
|
||||||
|
return ERR_BUF;
|
||||||
|
#else
|
||||||
|
pbuf_cat(socket->incoming.pbuf, p);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
exec_user_callback(socket);
|
exec_user_callback(socket);
|
||||||
|
|
||||||
|
@ -536,22 +542,28 @@ STATIC mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
|
||||||
|
|
||||||
struct pbuf *p = socket->incoming.pbuf;
|
struct pbuf *p = socket->incoming.pbuf;
|
||||||
|
|
||||||
if (socket->leftover_count == 0) {
|
mp_uint_t remaining = p->len - socket->recv_offset;
|
||||||
socket->leftover_count = p->tot_len;
|
if (len > remaining) {
|
||||||
|
len = remaining;
|
||||||
}
|
}
|
||||||
|
|
||||||
u16_t result = pbuf_copy_partial(p, buf, ((socket->leftover_count >= len) ? len : socket->leftover_count), (p->tot_len - socket->leftover_count));
|
memcpy(buf, (byte*)p->payload + socket->recv_offset, len);
|
||||||
if (socket->leftover_count > len) {
|
|
||||||
// More left over...
|
remaining -= len;
|
||||||
socket->leftover_count -= len;
|
if (remaining == 0) {
|
||||||
} else {
|
socket->incoming.pbuf = p->next;
|
||||||
|
// If we don't ref here, free() will free the entire chain,
|
||||||
|
// if we ref, it does what we need: frees 1st buf, and decrements
|
||||||
|
// next buf's refcount back to 1.
|
||||||
|
pbuf_ref(p->next);
|
||||||
pbuf_free(p);
|
pbuf_free(p);
|
||||||
socket->incoming.pbuf = NULL;
|
socket->recv_offset = 0;
|
||||||
socket->leftover_count = 0;
|
} else {
|
||||||
|
socket->recv_offset += len;
|
||||||
}
|
}
|
||||||
|
tcp_recved(socket->pcb.tcp, len);
|
||||||
|
|
||||||
tcp_recved(socket->pcb.tcp, result);
|
return len;
|
||||||
return (mp_uint_t) result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*******************************************************************************/
|
/*******************************************************************************/
|
||||||
|
@ -561,8 +573,8 @@ STATIC const mp_obj_type_t lwip_socket_type;
|
||||||
|
|
||||||
STATIC void lwip_socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) {
|
STATIC void lwip_socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) {
|
||||||
lwip_socket_obj_t *self = self_in;
|
lwip_socket_obj_t *self = self_in;
|
||||||
mp_printf(print, "<socket state=%d timeout=%d incoming=%p remaining=%d>", self->state, self->timeout,
|
mp_printf(print, "<socket state=%d timeout=%d incoming=%p off=%d>", self->state, self->timeout,
|
||||||
self->incoming.pbuf, self->leftover_count);
|
self->incoming.pbuf, self->recv_offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: Only supports two arguments at present
|
// FIXME: Only supports two arguments at present
|
||||||
|
@ -612,7 +624,7 @@ STATIC mp_obj_t lwip_socket_make_new(const mp_obj_type_t *type, mp_uint_t n_args
|
||||||
socket->incoming.pbuf = NULL;
|
socket->incoming.pbuf = NULL;
|
||||||
socket->timeout = -1;
|
socket->timeout = -1;
|
||||||
socket->state = STATE_NEW;
|
socket->state = STATE_NEW;
|
||||||
socket->leftover_count = 0;
|
socket->recv_offset = 0;
|
||||||
return socket;
|
return socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -749,7 +761,7 @@ STATIC mp_obj_t lwip_socket_accept(mp_obj_t self_in) {
|
||||||
socket2->incoming.pbuf = NULL;
|
socket2->incoming.pbuf = NULL;
|
||||||
socket2->timeout = socket->timeout;
|
socket2->timeout = socket->timeout;
|
||||||
socket2->state = STATE_CONNECTED;
|
socket2->state = STATE_CONNECTED;
|
||||||
socket2->leftover_count = 0;
|
socket2->recv_offset = 0;
|
||||||
socket2->callback = MP_OBJ_NULL;
|
socket2->callback = MP_OBJ_NULL;
|
||||||
tcp_arg(socket2->pcb.tcp, (void*)socket2);
|
tcp_arg(socket2->pcb.tcp, (void*)socket2);
|
||||||
tcp_err(socket2->pcb.tcp, _lwip_tcp_error);
|
tcp_err(socket2->pcb.tcp, _lwip_tcp_error);
|
||||||
|
|
Loading…
Reference in New Issue