extmod/uasyncio: Fix cancellation handling of wait_for.
This commit switches the roles of the helper task from a cancellation task to a runner task, to get the correct semantics for cancellation of wait_for. Some uasyncio tests are now disabled for the native emitter due to issues with native code generation of generators and yield-from. Fixes #5797. Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
parent
309dfe39e0
commit
b505971069
@ -9,24 +9,44 @@ async def wait_for(aw, timeout, sleep=core.sleep):
|
|||||||
if timeout is None:
|
if timeout is None:
|
||||||
return await aw
|
return await aw
|
||||||
|
|
||||||
def cancel(aw, timeout, sleep):
|
def runner(waiter, aw):
|
||||||
await sleep(timeout)
|
nonlocal status, result
|
||||||
aw.cancel()
|
try:
|
||||||
|
result = await aw
|
||||||
|
s = True
|
||||||
|
except BaseException as er:
|
||||||
|
s = er
|
||||||
|
if status is None:
|
||||||
|
# The waiter is still waiting, set status for it and cancel it.
|
||||||
|
status = s
|
||||||
|
waiter.cancel()
|
||||||
|
|
||||||
|
# Run aw in a separate runner task that manages its exceptions.
|
||||||
|
status = None
|
||||||
|
result = None
|
||||||
|
runner_task = core.create_task(runner(core.cur_task, aw))
|
||||||
|
|
||||||
cancel_task = core.create_task(cancel(aw, timeout, sleep))
|
|
||||||
try:
|
try:
|
||||||
ret = await aw
|
# Wait for the timeout to elapse.
|
||||||
except core.CancelledError:
|
await sleep(timeout)
|
||||||
# Ignore CancelledError from aw, it's probably due to timeout
|
except core.CancelledError as er:
|
||||||
pass
|
if status is True:
|
||||||
finally:
|
# aw completed successfully and cancelled the sleep, so return aw's result.
|
||||||
# Cancel the "cancel" task if it's still active (optimisation instead of cancel_task.cancel())
|
return result
|
||||||
if cancel_task.coro is not cancel_task:
|
elif status is None:
|
||||||
core._task_queue.remove(cancel_task)
|
# This wait_for was cancelled externally, so cancel aw and re-raise.
|
||||||
if cancel_task.coro is cancel_task:
|
status = True
|
||||||
# Cancel task ran to completion, ie there was a timeout
|
runner_task.cancel()
|
||||||
raise core.TimeoutError
|
raise er
|
||||||
return ret
|
else:
|
||||||
|
# aw raised an exception, propagate it out to the caller.
|
||||||
|
raise status
|
||||||
|
|
||||||
|
# The sleep finished before aw, so cancel aw and raise TimeoutError.
|
||||||
|
status = True
|
||||||
|
runner_task.cancel()
|
||||||
|
await runner_task
|
||||||
|
raise core.TimeoutError
|
||||||
|
|
||||||
|
|
||||||
def wait_for_ms(aw, timeout):
|
def wait_for_ms(aw, timeout):
|
||||||
|
@ -31,30 +31,85 @@ async def task_raise():
|
|||||||
raise ValueError
|
raise ValueError
|
||||||
|
|
||||||
|
|
||||||
|
async def task_cancel_other(t, other):
|
||||||
|
print("task_cancel_other start")
|
||||||
|
await asyncio.sleep(t)
|
||||||
|
print("task_cancel_other cancel")
|
||||||
|
other.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
async def task_wait_for_cancel(id, t, t_wait):
|
||||||
|
print("task_wait_for_cancel start")
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(task(id, t), t_wait)
|
||||||
|
except asyncio.CancelledError as er:
|
||||||
|
print("task_wait_for_cancel cancelled")
|
||||||
|
raise er
|
||||||
|
|
||||||
|
|
||||||
|
async def task_wait_for_cancel_ignore(t_wait):
|
||||||
|
print("task_wait_for_cancel_ignore start")
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(task_catch(), t_wait)
|
||||||
|
except asyncio.CancelledError as er:
|
||||||
|
print("task_wait_for_cancel_ignore cancelled")
|
||||||
|
raise er
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
sep = "-" * 10
|
||||||
|
|
||||||
# When task finished before the timeout
|
# When task finished before the timeout
|
||||||
print(await asyncio.wait_for(task(1, 0.01), 10))
|
print(await asyncio.wait_for(task(1, 0.01), 10))
|
||||||
|
print(sep)
|
||||||
|
|
||||||
# When timeout passes and task is cancelled
|
# When timeout passes and task is cancelled
|
||||||
try:
|
try:
|
||||||
print(await asyncio.wait_for(task(2, 10), 0.01))
|
print(await asyncio.wait_for(task(2, 10), 0.01))
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
print("timeout")
|
print("timeout")
|
||||||
|
print(sep)
|
||||||
|
|
||||||
# When timeout passes and task is cancelled, but task ignores the cancellation request
|
# When timeout passes and task is cancelled, but task ignores the cancellation request
|
||||||
try:
|
try:
|
||||||
print(await asyncio.wait_for(task_catch(), 0.1))
|
print(await asyncio.wait_for(task_catch(), 0.1))
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
print("TimeoutError")
|
print("TimeoutError")
|
||||||
|
print(sep)
|
||||||
|
|
||||||
# When task raises an exception
|
# When task raises an exception
|
||||||
try:
|
try:
|
||||||
print(await asyncio.wait_for(task_raise(), 1))
|
print(await asyncio.wait_for(task_raise(), 1))
|
||||||
except ValueError:
|
except ValueError:
|
||||||
print("ValueError")
|
print("ValueError")
|
||||||
|
print(sep)
|
||||||
|
|
||||||
# Timeout of None means wait forever
|
# Timeout of None means wait forever
|
||||||
print(await asyncio.wait_for(task(3, 0.1), None))
|
print(await asyncio.wait_for(task(3, 0.1), None))
|
||||||
|
print(sep)
|
||||||
|
|
||||||
|
# When task is cancelled by another task
|
||||||
|
t = asyncio.create_task(task(4, 10))
|
||||||
|
asyncio.create_task(task_cancel_other(0.01, t))
|
||||||
|
try:
|
||||||
|
print(await asyncio.wait_for(t, 1))
|
||||||
|
except asyncio.CancelledError as er:
|
||||||
|
print(repr(er))
|
||||||
|
print(sep)
|
||||||
|
|
||||||
|
# When wait_for gets cancelled
|
||||||
|
t = asyncio.create_task(task_wait_for_cancel(4, 1, 2))
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
t.cancel()
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
print(sep)
|
||||||
|
|
||||||
|
# When wait_for gets cancelled and awaited task ignores the cancellation request
|
||||||
|
t = asyncio.create_task(task_wait_for_cancel_ignore(2))
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
t.cancel()
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
print(sep)
|
||||||
|
|
||||||
print("finish")
|
print("finish")
|
||||||
|
|
||||||
|
@ -1,15 +1,35 @@
|
|||||||
task start 1
|
task start 1
|
||||||
task end 1
|
task end 1
|
||||||
2
|
2
|
||||||
|
----------
|
||||||
task start 2
|
task start 2
|
||||||
timeout
|
timeout
|
||||||
|
----------
|
||||||
task_catch start
|
task_catch start
|
||||||
ignore cancel
|
ignore cancel
|
||||||
task_catch done
|
task_catch done
|
||||||
TimeoutError
|
TimeoutError
|
||||||
|
----------
|
||||||
task start
|
task start
|
||||||
ValueError
|
ValueError
|
||||||
|
----------
|
||||||
task start 3
|
task start 3
|
||||||
task end 3
|
task end 3
|
||||||
6
|
6
|
||||||
|
----------
|
||||||
|
task start 4
|
||||||
|
task_cancel_other start
|
||||||
|
task_cancel_other cancel
|
||||||
|
CancelledError()
|
||||||
|
----------
|
||||||
|
task_wait_for_cancel start
|
||||||
|
task start 4
|
||||||
|
task_wait_for_cancel cancelled
|
||||||
|
----------
|
||||||
|
task_wait_for_cancel_ignore start
|
||||||
|
task_catch start
|
||||||
|
task_wait_for_cancel_ignore cancelled
|
||||||
|
ignore cancel
|
||||||
|
task_catch done
|
||||||
|
----------
|
||||||
finish
|
finish
|
||||||
|
60
tests/extmod/uasyncio_wait_for_fwd.py
Normal file
60
tests/extmod/uasyncio_wait_for_fwd.py
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
# Test asyncio.wait_for, with forwarding cancellation
|
||||||
|
|
||||||
|
try:
|
||||||
|
import uasyncio as asyncio
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
import asyncio
|
||||||
|
except ImportError:
|
||||||
|
print("SKIP")
|
||||||
|
raise SystemExit
|
||||||
|
|
||||||
|
|
||||||
|
async def awaiting(t, return_if_fail):
|
||||||
|
try:
|
||||||
|
print("awaiting started")
|
||||||
|
await asyncio.sleep(t)
|
||||||
|
except asyncio.CancelledError as er:
|
||||||
|
# CPython wait_for raises CancelledError inside task but TimeoutError in wait_for
|
||||||
|
print("awaiting canceled")
|
||||||
|
if return_if_fail:
|
||||||
|
return False # return has no effect if Cancelled
|
||||||
|
else:
|
||||||
|
raise er
|
||||||
|
except Exception as er:
|
||||||
|
print("caught exception", er)
|
||||||
|
raise er
|
||||||
|
|
||||||
|
|
||||||
|
async def test_cancellation_forwarded(catch, catch_inside):
|
||||||
|
print("----------")
|
||||||
|
|
||||||
|
async def wait():
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(awaiting(2, catch_inside), 1)
|
||||||
|
except asyncio.TimeoutError as er:
|
||||||
|
print("Got timeout error")
|
||||||
|
raise er
|
||||||
|
except asyncio.CancelledError as er:
|
||||||
|
print("Got canceled")
|
||||||
|
if not catch:
|
||||||
|
raise er
|
||||||
|
|
||||||
|
async def cancel(t):
|
||||||
|
print("cancel started")
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
print("cancel wait()")
|
||||||
|
t.cancel()
|
||||||
|
|
||||||
|
t = asyncio.create_task(wait())
|
||||||
|
k = asyncio.create_task(cancel(t))
|
||||||
|
try:
|
||||||
|
await t
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
print("waiting got cancelled")
|
||||||
|
|
||||||
|
|
||||||
|
asyncio.run(test_cancellation_forwarded(False, False))
|
||||||
|
asyncio.run(test_cancellation_forwarded(False, True))
|
||||||
|
asyncio.run(test_cancellation_forwarded(True, True))
|
||||||
|
asyncio.run(test_cancellation_forwarded(True, False))
|
26
tests/extmod/uasyncio_wait_for_fwd.py.exp
Normal file
26
tests/extmod/uasyncio_wait_for_fwd.py.exp
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
----------
|
||||||
|
cancel started
|
||||||
|
awaiting started
|
||||||
|
cancel wait()
|
||||||
|
Got canceled
|
||||||
|
awaiting canceled
|
||||||
|
waiting got cancelled
|
||||||
|
----------
|
||||||
|
cancel started
|
||||||
|
awaiting started
|
||||||
|
cancel wait()
|
||||||
|
Got canceled
|
||||||
|
awaiting canceled
|
||||||
|
waiting got cancelled
|
||||||
|
----------
|
||||||
|
cancel started
|
||||||
|
awaiting started
|
||||||
|
cancel wait()
|
||||||
|
Got canceled
|
||||||
|
awaiting canceled
|
||||||
|
----------
|
||||||
|
cancel started
|
||||||
|
awaiting started
|
||||||
|
cancel wait()
|
||||||
|
Got canceled
|
||||||
|
awaiting canceled
|
@ -434,7 +434,10 @@ def run_tests(pyb, tests, args, result_dir):
|
|||||||
skip_tests.add('basics/scope_implicit.py') # requires checking for unbound local
|
skip_tests.add('basics/scope_implicit.py') # requires checking for unbound local
|
||||||
skip_tests.add('basics/try_finally_return2.py') # requires raise_varargs
|
skip_tests.add('basics/try_finally_return2.py') # requires raise_varargs
|
||||||
skip_tests.add('basics/unboundlocal.py') # requires checking for unbound local
|
skip_tests.add('basics/unboundlocal.py') # requires checking for unbound local
|
||||||
|
skip_tests.add('extmod/uasyncio_event.py') # unknown issue
|
||||||
skip_tests.add('extmod/uasyncio_lock.py') # requires async with
|
skip_tests.add('extmod/uasyncio_lock.py') # requires async with
|
||||||
|
skip_tests.add('extmod/uasyncio_micropython.py') # unknown issue
|
||||||
|
skip_tests.add('extmod/uasyncio_wait_for.py') # unknown issue
|
||||||
skip_tests.add('misc/features.py') # requires raise_varargs
|
skip_tests.add('misc/features.py') # requires raise_varargs
|
||||||
skip_tests.add('misc/print_exception.py') # because native doesn't have proper traceback info
|
skip_tests.add('misc/print_exception.py') # because native doesn't have proper traceback info
|
||||||
skip_tests.add('misc/sys_exc_info.py') # sys.exc_info() is not supported for native
|
skip_tests.add('misc/sys_exc_info.py') # sys.exc_info() is not supported for native
|
||||||
|
Loading…
Reference in New Issue
Block a user