tests/multi_net: Add initial set of multi-instance tests for network.

This commit is contained in:
Damien George 2020-03-04 14:26:07 +11:00
parent df9a949891
commit 8f44c0dd16
5 changed files with 150 additions and 0 deletions

View File

@ -0,0 +1,32 @@
# Simple test creating an SSL connection and transferring some data
# This test won't run under CPython because it requires key/cert
import usocket as socket, ussl as ssl
PORT = 8000
# Server
def instance0():
multitest.globals(IP=multitest.get_network_ip())
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1])
s.listen(1)
multitest.next()
s2, _ = s.accept()
s2 = ssl.wrap_socket(s2, server_side=True)
print(s2.read(16))
s2.write(b"server to client")
s.close()
# Client
def instance1():
multitest.next()
s = socket.socket()
s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
s = ssl.wrap_socket(s)
s.write(b"client to server")
print(s.read(16))
s.close()

View File

@ -0,0 +1,4 @@
--- instance0 ---
b'client to server'
--- instance1 ---
b'server to client'

View File

@ -0,0 +1,30 @@
# Test recv on socket that just accepted a connection
import socket
PORT = 8000
# Server
def instance0():
multitest.globals(IP=multitest.get_network_ip())
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1])
s.listen(1)
multitest.next()
s.accept()
try:
print("recv", s.recv(10)) # should raise Errno 107 ENOTCONN
except OSError as er:
print(er.args[0])
s.close()
# Client
def instance1():
multitest.next()
s = socket.socket()
s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
s.send(b"GET / HTTP/1.0\r\n\r\n")
s.close()

View File

@ -0,0 +1,56 @@
# Test when client does a TCP RST on an open connection
import struct, time, socket, select
PORT = 8000
def convert_poll_list(l):
# To be compatible across all ports/targets
return [ev for _, ev in l]
# Server
def instance0():
multitest.globals(IP=multitest.get_network_ip())
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1])
s.listen(1)
multitest.next()
s2, _ = s.accept()
s2.setblocking(False)
poll = select.poll()
poll.register(s2, select.POLLIN)
time.sleep(0.4)
print(convert_poll_list(poll.poll(1000)))
# TODO: the following recv don't work with lwip, it abandons data upon TCP RST
try:
print(s2.recv(10))
print(convert_poll_list(poll.poll(1000)))
print(s2.recv(10))
print(convert_poll_list(poll.poll(1000)))
print(s2.recv(10))
print(convert_poll_list(poll.poll(1000)))
except OSError as er:
print(er.args[0])
print(convert_poll_list(poll.poll(1000)))
# TODO lwip raises here but apparently it shouldn't
print(s2.recv(10))
print(convert_poll_list(poll.poll(1000)))
s.close()
# Client
def instance1():
if not hasattr(socket, "SO_LINGER"):
multitest.skip()
multitest.next()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
lgr_onoff = 1
lgr_linger = 0
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", lgr_onoff, lgr_linger))
s.send(b"GET / HTTP/1.0\r\n\r\n")
time.sleep(0.2)
s.close() # This issues a TCP RST since we've set the linger option

View File

@ -0,0 +1,28 @@
# Simple test of a TCP server and client transferring data
import socket
PORT = 8000
# Server
def instance0():
multitest.globals(IP=multitest.get_network_ip())
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1])
s.listen(1)
multitest.next()
s2, _ = s.accept()
print(s2.recv(16))
s2.send(b"server to client")
s.close()
# Client
def instance1():
multitest.next()
s = socket.socket()
s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
s.send(b"client to server")
print(s.recv(16))
s.close()