extmod/uasyncio: Fix race with cancelled task waiting on finished task.
This commit fixes a problem with a race between cancellation of task A and completion of task B, when A waits on B. If task B completes just before task A is cancelled then the cancellation of A does not work. Instead, the CancelledError meant to cancel A gets passed through to B (that's expected behaviour) but B handles it as a "Task exception wasn't retrieved" scenario, printing out such a message (this is because finished tasks point their "coro" attribute to themselves to indicate they are done, and implement the throw() method, but that method inadvertently catches the CancelledError). The correct behaviour is for B to bounce that CancelledError back out. This bug is mainly seen when wait_for() is used, and in that context the symptoms are: - occurs when using wait_for(T, S), if the task T being waited on finishes at exactly the same time as the wait-for timeout S expires - task T will have run to completion - the "Task exception wasn't retrieved message" is printed with "<class 'CancelledError'>" as the error (ie no traceback) - the wait_for(T, S) call never returns (it's never put back on the uasyncio run queue) and all tasks waiting on this are blocked forever from running - uasyncio otherwise continues to function and other tasks continue to be scheduled as normal The fix here reworks the "waiting" attribute of Task to be called "state" and uses it to indicate whether a task is: running and not awaited on, running and awaited on, finished and not awaited on, or finished and awaited on. This means the task does not need to point "coro" to itself to indicate finished, and also allows removal of the throw() method. A benefit of this is that "Task exception wasn't retrieved" messages can go back to being able to print the name of the coroutine function. Fixes issue #7386. Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
parent
8edc3aacdd
commit
514bf1a191
|
@ -31,12 +31,19 @@
|
||||||
|
|
||||||
#if MICROPY_PY_UASYNCIO
|
#if MICROPY_PY_UASYNCIO
|
||||||
|
|
||||||
|
#define TASK_STATE_RUNNING_NOT_WAITED_ON (mp_const_true)
|
||||||
|
#define TASK_STATE_DONE_NOT_WAITED_ON (mp_const_none)
|
||||||
|
#define TASK_STATE_DONE_WAS_WAITED_ON (mp_const_false)
|
||||||
|
|
||||||
|
#define TASK_IS_DONE(task) ( \
|
||||||
|
(task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
|
||||||
|
|| (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)
|
||||||
|
|
||||||
typedef struct _mp_obj_task_t {
|
typedef struct _mp_obj_task_t {
|
||||||
mp_pairheap_t pairheap;
|
mp_pairheap_t pairheap;
|
||||||
mp_obj_t coro;
|
mp_obj_t coro;
|
||||||
mp_obj_t data;
|
mp_obj_t data;
|
||||||
mp_obj_t waiting;
|
mp_obj_t state;
|
||||||
|
|
||||||
mp_obj_t ph_key;
|
mp_obj_t ph_key;
|
||||||
} mp_obj_task_t;
|
} mp_obj_task_t;
|
||||||
|
|
||||||
|
@ -146,9 +153,6 @@ STATIC const mp_obj_type_t task_queue_type = {
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
// Task class
|
// Task class
|
||||||
|
|
||||||
// For efficiency, the task object is stored to the coro entry when the task is done.
|
|
||||||
#define TASK_IS_DONE(task) ((task)->coro == MP_OBJ_FROM_PTR(task))
|
|
||||||
|
|
||||||
// This is the core uasyncio context with cur_task, _task_queue and CancelledError.
|
// This is the core uasyncio context with cur_task, _task_queue and CancelledError.
|
||||||
STATIC mp_obj_t uasyncio_context = MP_OBJ_NULL;
|
STATIC mp_obj_t uasyncio_context = MP_OBJ_NULL;
|
||||||
|
|
||||||
|
@ -159,7 +163,7 @@ STATIC mp_obj_t task_make_new(const mp_obj_type_t *type, size_t n_args, size_t n
|
||||||
mp_pairheap_init_node(task_lt, &self->pairheap);
|
mp_pairheap_init_node(task_lt, &self->pairheap);
|
||||||
self->coro = args[0];
|
self->coro = args[0];
|
||||||
self->data = mp_const_none;
|
self->data = mp_const_none;
|
||||||
self->waiting = mp_const_none;
|
self->state = TASK_STATE_RUNNING_NOT_WAITED_ON;
|
||||||
self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
|
self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
|
||||||
if (n_args == 2) {
|
if (n_args == 2) {
|
||||||
uasyncio_context = args[1];
|
uasyncio_context = args[1];
|
||||||
|
@ -218,24 +222,6 @@ STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
|
||||||
}
|
}
|
||||||
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
|
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
|
||||||
|
|
||||||
STATIC mp_obj_t task_throw(mp_obj_t self_in, mp_obj_t value_in) {
|
|
||||||
// This task raised an exception which was uncaught; handle that now.
|
|
||||||
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
|
|
||||||
// Set the data because it was cleared by the main scheduling loop.
|
|
||||||
self->data = value_in;
|
|
||||||
if (self->waiting == mp_const_none) {
|
|
||||||
// Nothing await'ed on the task so call the exception handler.
|
|
||||||
mp_obj_t _exc_context = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__exc_context));
|
|
||||||
mp_obj_dict_store(_exc_context, MP_OBJ_NEW_QSTR(MP_QSTR_exception), value_in);
|
|
||||||
mp_obj_dict_store(_exc_context, MP_OBJ_NEW_QSTR(MP_QSTR_future), self_in);
|
|
||||||
mp_obj_t Loop = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_Loop));
|
|
||||||
mp_obj_t call_exception_handler = mp_load_attr(Loop, MP_QSTR_call_exception_handler);
|
|
||||||
mp_call_function_1(call_exception_handler, _exc_context);
|
|
||||||
}
|
|
||||||
return mp_const_none;
|
|
||||||
}
|
|
||||||
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_throw_obj, task_throw);
|
|
||||||
|
|
||||||
STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
|
STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
|
||||||
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
|
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
|
||||||
if (dest[0] == MP_OBJ_NULL) {
|
if (dest[0] == MP_OBJ_NULL) {
|
||||||
|
@ -244,32 +230,24 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
|
||||||
dest[0] = self->coro;
|
dest[0] = self->coro;
|
||||||
} else if (attr == MP_QSTR_data) {
|
} else if (attr == MP_QSTR_data) {
|
||||||
dest[0] = self->data;
|
dest[0] = self->data;
|
||||||
} else if (attr == MP_QSTR_waiting) {
|
} else if (attr == MP_QSTR_state) {
|
||||||
if (self->waiting != mp_const_none && self->waiting != mp_const_false) {
|
dest[0] = self->state;
|
||||||
dest[0] = self->waiting;
|
|
||||||
}
|
|
||||||
} else if (attr == MP_QSTR_done) {
|
} else if (attr == MP_QSTR_done) {
|
||||||
dest[0] = MP_OBJ_FROM_PTR(&task_done_obj);
|
dest[0] = MP_OBJ_FROM_PTR(&task_done_obj);
|
||||||
dest[1] = self_in;
|
dest[1] = self_in;
|
||||||
} else if (attr == MP_QSTR_cancel) {
|
} else if (attr == MP_QSTR_cancel) {
|
||||||
dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
|
dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
|
||||||
dest[1] = self_in;
|
dest[1] = self_in;
|
||||||
} else if (attr == MP_QSTR_throw) {
|
|
||||||
dest[0] = MP_OBJ_FROM_PTR(&task_throw_obj);
|
|
||||||
dest[1] = self_in;
|
|
||||||
} else if (attr == MP_QSTR_ph_key) {
|
} else if (attr == MP_QSTR_ph_key) {
|
||||||
dest[0] = self->ph_key;
|
dest[0] = self->ph_key;
|
||||||
}
|
}
|
||||||
} else if (dest[1] != MP_OBJ_NULL) {
|
} else if (dest[1] != MP_OBJ_NULL) {
|
||||||
// Store
|
// Store
|
||||||
if (attr == MP_QSTR_coro) {
|
if (attr == MP_QSTR_data) {
|
||||||
self->coro = dest[1];
|
|
||||||
dest[0] = MP_OBJ_NULL;
|
|
||||||
} else if (attr == MP_QSTR_data) {
|
|
||||||
self->data = dest[1];
|
self->data = dest[1];
|
||||||
dest[0] = MP_OBJ_NULL;
|
dest[0] = MP_OBJ_NULL;
|
||||||
} else if (attr == MP_QSTR_waiting) {
|
} else if (attr == MP_QSTR_state) {
|
||||||
self->waiting = dest[1];
|
self->state = dest[1];
|
||||||
dest[0] = MP_OBJ_NULL;
|
dest[0] = MP_OBJ_NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -278,15 +256,12 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
|
||||||
STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
|
STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
|
||||||
(void)iter_buf;
|
(void)iter_buf;
|
||||||
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
|
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
|
||||||
if (self->waiting == mp_const_none) {
|
if (TASK_IS_DONE(self)) {
|
||||||
// The is the first access of the "waiting" entry.
|
// Signal that the completed-task has been await'ed on.
|
||||||
if (TASK_IS_DONE(self)) {
|
self->state = TASK_STATE_DONE_WAS_WAITED_ON;
|
||||||
// Signal that the completed-task has been await'ed on.
|
} else if (self->state == TASK_STATE_RUNNING_NOT_WAITED_ON) {
|
||||||
self->waiting = mp_const_false;
|
// Allocate the waiting queue.
|
||||||
} else {
|
self->state = task_queue_make_new(&task_queue_type, 0, 0, NULL);
|
||||||
// Lazily allocate the waiting queue.
|
|
||||||
self->waiting = task_queue_make_new(&task_queue_type, 0, 0, NULL);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return self_in;
|
return self_in;
|
||||||
}
|
}
|
||||||
|
@ -299,7 +274,7 @@ STATIC mp_obj_t task_iternext(mp_obj_t self_in) {
|
||||||
} else {
|
} else {
|
||||||
// Put calling task on waiting queue.
|
// Put calling task on waiting queue.
|
||||||
mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
|
mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
|
||||||
mp_obj_t args[2] = { self->waiting, cur_task };
|
mp_obj_t args[2] = { self->state, cur_task };
|
||||||
task_queue_push_sorted(2, args);
|
task_queue_push_sorted(2, args);
|
||||||
// Set calling task's data to this task that it waits on, to double-link it.
|
// Set calling task's data to this task that it waits on, to double-link it.
|
||||||
((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;
|
((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;
|
||||||
|
|
|
@ -175,6 +175,10 @@ def run_until_complete(main_task=None):
|
||||||
if not exc:
|
if not exc:
|
||||||
t.coro.send(None)
|
t.coro.send(None)
|
||||||
else:
|
else:
|
||||||
|
# If the task is finished and on the run queue and gets here, then it
|
||||||
|
# had an exception and was not await'ed on. Throwing into it now will
|
||||||
|
# raise StopIteration and the code below will catch this and run the
|
||||||
|
# call_exception_handler function.
|
||||||
t.data = None
|
t.data = None
|
||||||
t.coro.throw(exc)
|
t.coro.throw(exc)
|
||||||
except excs_all as er:
|
except excs_all as er:
|
||||||
|
@ -185,22 +189,32 @@ def run_until_complete(main_task=None):
|
||||||
if isinstance(er, StopIteration):
|
if isinstance(er, StopIteration):
|
||||||
return er.value
|
return er.value
|
||||||
raise er
|
raise er
|
||||||
# Schedule any other tasks waiting on the completion of this task
|
if t.state:
|
||||||
waiting = False
|
# Task was running but is now finished.
|
||||||
if hasattr(t, "waiting"):
|
waiting = False
|
||||||
while t.waiting.peek():
|
if t.state is True:
|
||||||
_task_queue.push_head(t.waiting.pop_head())
|
# "None" indicates that the task is complete and not await'ed on (yet).
|
||||||
waiting = True
|
t.state = None
|
||||||
t.waiting = None # Free waiting queue head
|
else:
|
||||||
if not waiting and not isinstance(er, excs_stop):
|
# Schedule any other tasks waiting on the completion of this task.
|
||||||
# An exception ended this detached task, so queue it for later
|
while t.state.peek():
|
||||||
# execution to handle the uncaught exception if no other task retrieves
|
_task_queue.push_head(t.state.pop_head())
|
||||||
# the exception in the meantime (this is handled by Task.throw).
|
waiting = True
|
||||||
_task_queue.push_head(t)
|
# "False" indicates that the task is complete and has been await'ed on.
|
||||||
# Indicate task is done by setting coro to the task object itself
|
t.state = False
|
||||||
t.coro = t
|
if not waiting and not isinstance(er, excs_stop):
|
||||||
# Save return value of coro to pass up to caller
|
# An exception ended this detached task, so queue it for later
|
||||||
t.data = er
|
# execution to handle the uncaught exception if no other task retrieves
|
||||||
|
# the exception in the meantime (this is handled by Task.throw).
|
||||||
|
_task_queue.push_head(t)
|
||||||
|
# Save return value of coro to pass up to caller.
|
||||||
|
t.data = er
|
||||||
|
elif t.state is None:
|
||||||
|
# Task is already finished and nothing await'ed on the task,
|
||||||
|
# so call the exception handler.
|
||||||
|
_exc_context["exception"] = exc
|
||||||
|
_exc_context["future"] = t
|
||||||
|
Loop.call_exception_handler(_exc_context)
|
||||||
|
|
||||||
|
|
||||||
# Create a new task from a coroutine and run it until it finishes
|
# Create a new task from a coroutine and run it until it finishes
|
||||||
|
|
|
@ -123,6 +123,7 @@ class Task:
|
||||||
def __init__(self, coro, globals=None):
|
def __init__(self, coro, globals=None):
|
||||||
self.coro = coro # Coroutine of this Task
|
self.coro = coro # Coroutine of this Task
|
||||||
self.data = None # General data for queue it is waiting on
|
self.data = None # General data for queue it is waiting on
|
||||||
|
self.state = True # None, False, True or a TaskQueue instance
|
||||||
self.ph_key = 0 # Pairing heap
|
self.ph_key = 0 # Pairing heap
|
||||||
self.ph_child = None # Paring heap
|
self.ph_child = None # Paring heap
|
||||||
self.ph_child_last = None # Paring heap
|
self.ph_child_last = None # Paring heap
|
||||||
|
@ -130,30 +131,30 @@ class Task:
|
||||||
self.ph_rightmost_parent = None # Paring heap
|
self.ph_rightmost_parent = None # Paring heap
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
if self.coro is self:
|
if not self.state:
|
||||||
# Signal that the completed-task has been await'ed on.
|
# Task finished, signal that is has been await'ed on.
|
||||||
self.waiting = None
|
self.state = False
|
||||||
elif not hasattr(self, "waiting"):
|
elif self.state is True:
|
||||||
# Lazily allocated head of linked list of Tasks waiting on completion of this task.
|
# Allocated head of linked list of Tasks waiting on completion of this task.
|
||||||
self.waiting = TaskQueue()
|
self.state = TaskQueue()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __next__(self):
|
def __next__(self):
|
||||||
if self.coro is self:
|
if not self.state:
|
||||||
# Task finished, raise return value to caller so it can continue.
|
# Task finished, raise return value to caller so it can continue.
|
||||||
raise self.data
|
raise self.data
|
||||||
else:
|
else:
|
||||||
# Put calling task on waiting queue.
|
# Put calling task on waiting queue.
|
||||||
self.waiting.push_head(core.cur_task)
|
self.state.push_head(core.cur_task)
|
||||||
# Set calling task's data to this task that it waits on, to double-link it.
|
# Set calling task's data to this task that it waits on, to double-link it.
|
||||||
core.cur_task.data = self
|
core.cur_task.data = self
|
||||||
|
|
||||||
def done(self):
|
def done(self):
|
||||||
return self.coro is self
|
return not self.state
|
||||||
|
|
||||||
def cancel(self):
|
def cancel(self):
|
||||||
# Check if task is already finished.
|
# Check if task is already finished.
|
||||||
if self.coro is self:
|
if not self.state:
|
||||||
return False
|
return False
|
||||||
# Can't cancel self (not supported yet).
|
# Can't cancel self (not supported yet).
|
||||||
if self is core.cur_task:
|
if self is core.cur_task:
|
||||||
|
@ -172,13 +173,3 @@ class Task:
|
||||||
core._task_queue.push_head(self)
|
core._task_queue.push_head(self)
|
||||||
self.data = core.CancelledError
|
self.data = core.CancelledError
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def throw(self, value):
|
|
||||||
# This task raised an exception which was uncaught; handle that now.
|
|
||||||
# Set the data because it was cleared by the main scheduling loop.
|
|
||||||
self.data = value
|
|
||||||
if not hasattr(self, "waiting"):
|
|
||||||
# Nothing await'ed on the task so call the exception handler.
|
|
||||||
core._exc_context["exception"] = value
|
|
||||||
core._exc_context["future"] = self
|
|
||||||
core.Loop.call_exception_handler(core._exc_context)
|
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
# Test cancelling a task that is waiting on a task that just finishes.
|
||||||
|
|
||||||
|
try:
|
||||||
|
import uasyncio as asyncio
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
import asyncio
|
||||||
|
except ImportError:
|
||||||
|
print("SKIP")
|
||||||
|
raise SystemExit
|
||||||
|
|
||||||
|
|
||||||
|
async def sleep_task():
|
||||||
|
print("sleep_task sleep")
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
print("sleep_task wake")
|
||||||
|
|
||||||
|
|
||||||
|
async def wait_task(t):
|
||||||
|
print("wait_task wait")
|
||||||
|
await t
|
||||||
|
print("wait_task wake")
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
waiting_task = asyncio.create_task(wait_task(asyncio.create_task(sleep_task())))
|
||||||
|
|
||||||
|
print("main sleep")
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
print("main sleep")
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
waiting_task.cancel()
|
||||||
|
print("main wait")
|
||||||
|
try:
|
||||||
|
await waiting_task
|
||||||
|
except asyncio.CancelledError as er:
|
||||||
|
print(repr(er))
|
||||||
|
|
||||||
|
|
||||||
|
asyncio.run(main())
|
|
@ -0,0 +1,7 @@
|
||||||
|
main sleep
|
||||||
|
sleep_task sleep
|
||||||
|
wait_task wait
|
||||||
|
main sleep
|
||||||
|
sleep_task wake
|
||||||
|
main wait
|
||||||
|
CancelledError()
|
Loading…
Reference in New Issue