6027c41c8f
This work was funded through GitHub Sponsors. Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
54 lines
1.3 KiB
Python
54 lines
1.3 KiB
Python
# Test TCP server with client issuing TCP RST part way through read
|
|
|
|
try:
|
|
import asyncio
|
|
except ImportError:
|
|
print("SKIP")
|
|
raise SystemExit
|
|
|
|
import struct, time, socket
|
|
|
|
PORT = 8000
|
|
|
|
|
|
async def handle_connection(reader, writer):
|
|
data = await reader.read(10) # should succeed
|
|
print(data)
|
|
await asyncio.sleep(0.2) # wait for client to drop connection
|
|
try:
|
|
data = await reader.read(100)
|
|
print(data)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
except OSError as er:
|
|
print("OSError", er.errno)
|
|
ev.set()
|
|
|
|
|
|
async def main():
|
|
global ev
|
|
ev = asyncio.Event()
|
|
server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT)
|
|
multitest.next()
|
|
async with server:
|
|
await asyncio.wait_for(ev.wait(), 10)
|
|
|
|
|
|
def instance0():
|
|
multitest.globals(IP=multitest.get_network_ip())
|
|
asyncio.run(main())
|
|
|
|
|
|
def instance1():
|
|
if not hasattr(socket, "SO_LINGER"):
|
|
multitest.skip()
|
|
multitest.next()
|
|
s = socket.socket()
|
|
s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
|
|
lgr_onoff = 1
|
|
lgr_linger = 0
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", lgr_onoff, lgr_linger))
|
|
s.send(b"GET / HTTP/1.0\r\n\r\n")
|
|
time.sleep(0.1)
|
|
s.close() # This issues a TCP RST since we've set the linger option
|