circuitpython/tests/extmod/uasyncio_wait_for_fwd.py
Jeff Epler 562520e158
Skip these tests when using --emit native
.. in which case the __await__ attribute is not available due to
"CIRCUITPY: no support for mp_type_native_coro_wrap" (emitglue.c)
2023-08-20 08:02:31 -05:00

72 lines
1.7 KiB
Python

# Test asyncio.wait_for, with forwarding cancellation
try:
import uasyncio as asyncio
except ImportError:
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
async def foo():
return 42
try:
foo().__await__
except AttributeError:
print("SKIP")
raise SystemExit
async def awaiting(t, return_if_fail):
try:
print("awaiting started")
await asyncio.sleep(t)
except asyncio.CancelledError as er:
# CPython wait_for raises CancelledError inside task but TimeoutError in wait_for
print("awaiting canceled")
if return_if_fail:
return False # return has no effect if Cancelled
else:
raise er
except Exception as er:
print("caught exception", er)
raise er
async def test_cancellation_forwarded(catch, catch_inside):
print("----------")
async def wait():
try:
await asyncio.wait_for(awaiting(2, catch_inside), 1)
except asyncio.TimeoutError as er:
print("Got timeout error")
raise er
except asyncio.CancelledError as er:
print("Got canceled")
if not catch:
raise er
async def cancel(t):
print("cancel started")
await asyncio.sleep(0.01)
print("cancel wait()")
t.cancel()
t = asyncio.create_task(wait())
k = asyncio.create_task(cancel(t))
try:
await t
except asyncio.CancelledError:
print("waiting got cancelled")
asyncio.run(test_cancellation_forwarded(False, False))
asyncio.run(test_cancellation_forwarded(False, True))
asyncio.run(test_cancellation_forwarded(True, True))
asyncio.run(test_cancellation_forwarded(True, False))