diff --git a/zephyr/modusocket.c b/zephyr/modusocket.c index 68a66aa263..45991d42e7 100644 --- a/zephyr/modusocket.c +++ b/zephyr/modusocket.c @@ -44,6 +44,13 @@ typedef struct _socket_obj_t { mp_obj_base_t base; struct net_context *ctx; struct k_fifo recv_q; + struct net_buf *cur_buf; + + #define STATE_NEW 0 + #define STATE_CONNECTING 1 + #define STATE_CONNECTED 2 + #define STATE_PEER_CLOSED 3 + int8_t state; } socket_obj_t; STATIC const mp_obj_type_t socket_type; @@ -103,6 +110,19 @@ static void sock_received_cb(struct net_context *context, struct net_buf *net_bu } DEBUG_printf("\n"); + // if net_buf == NULL, EOF + if (net_buf == NULL) { + // TODO: k_fifo accessor for this? + struct net_buf *last_buf = (struct net_buf*)sys_slist_peek_tail(&socket->recv_q.data_q); + // We abuse "buf_sent" flag to store EOF flag + net_nbuf_set_buf_sent(last_buf, true); + DEBUG_printf("Set EOF flag on %p\n", last_buf); + return; + } + + // Make sure that "EOF flag" is not set + net_nbuf_set_buf_sent(net_buf, false); + // net_buf->frags will be overwritten by fifo, so save it net_nbuf_set_token(net_buf, net_buf->frags); k_fifo_put(&socket->recv_q, net_buf); @@ -126,6 +146,8 @@ STATIC mp_obj_t socket_make_new(const mp_obj_type_t *type, size_t n_args, size_t socket_obj_t *socket = m_new_obj_with_finaliser(socket_obj_t); socket->base.type = type; k_fifo_init(&socket->recv_q); + socket->cur_buf = NULL; + socket->state = STATE_NEW; int family = AF_INET; int socktype = SOCK_STREAM; @@ -226,6 +248,61 @@ STATIC mp_obj_t socket_recv(mp_obj_t self_in, mp_obj_t len_in) { net_buf_gather(net_buf, vstr.buf, recv_len); net_nbuf_unref(net_buf); + } else if (sock_type == SOCK_STREAM) { + + do { + if (socket->state == STATE_PEER_CLOSED) { + return mp_const_empty_bytes; + } + + unsigned header_len = 0; + if (socket->cur_buf == NULL) { + DEBUG_printf("TCP recv: no cur_buf, getting\n"); + struct net_buf *net_buf = k_fifo_get(&socket->recv_q, K_FOREVER); + // Restore ->frags overwritten by fifo + net_buf->frags = net_nbuf_token(net_buf); + + header_len = net_nbuf_appdata(net_buf) - net_buf->frags->data; + DEBUG_printf("TCP recv: new cur_buf: %p, hdr_len: %u\n", net_buf, header_len); + socket->cur_buf = net_buf; + } + + struct net_buf *frag = socket->cur_buf->frags; + if (frag == NULL) { + printf("net_buf has empty fragments on start!\n"); + assert(0); + } + + net_buf_pull(frag, header_len); + unsigned frag_len = frag->len; + recv_len = frag_len; + if (recv_len > max_len) { + recv_len = max_len; + } + DEBUG_printf("%d data bytes in head frag, going to read %d\n", frag_len, recv_len); + + vstr_init_len(&vstr, recv_len); + memcpy(vstr.buf, frag->data, recv_len); + + if (recv_len != frag_len) { + net_buf_pull(frag, recv_len); + } else { + frag = net_buf_frag_del(socket->cur_buf, frag); + if (frag == NULL) { + DEBUG_printf("Finished processing net_buf %p\n", socket->cur_buf); + // If "buf_sent" flag was set, it's last packet and we reached EOF + if (net_nbuf_buf_sent(socket->cur_buf)) { + socket->state = STATE_PEER_CLOSED; + } + net_nbuf_unref(socket->cur_buf); + socket->cur_buf = NULL; + } + } + // Keep repeating while we're getting empty fragments + // Zephyr IP stack appears to feed empty net_buf's with empty + // frags for various TCP control packets. + } while (recv_len == 0); + } else { mp_not_implemented(""); }