asyncio: Add Task methods to get tasks closer to CPython.

Adds methods that are in CPython, such as `exception`, `result`,
`get_coro`, `cancelled`, `add_done_callback`, and
`remove_done_callback`.

Also adds support for the unary hash so tasks may be collected in
a python set.
This commit is contained in:
James Ward 2023-11-16 18:55:52 -05:00
parent b2c32cf42f
commit adbb02d9e3
No known key found for this signature in database
GPG Key ID: F53FE2DEDD7BBD79

View File

@ -46,6 +46,12 @@
(task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
|| (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)
#define IS_CANCELLED_ERROR(error) ( \
mp_obj_is_subclass_fast( \
MP_OBJ_FROM_PTR(mp_obj_get_type(error)), \
mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError)) \
))
typedef struct _mp_obj_task_t {
mp_pairheap_t pairheap;
mp_obj_t coro;
@ -202,6 +208,114 @@ STATIC mp_obj_t task_done(mp_obj_t self_in) {
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_done_obj, task_done);
STATIC mp_obj_t task_add_done_callback(mp_obj_t self_in, mp_obj_t callback) {
assert(mp_obj_is_callable(callback));
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
if (TASK_IS_DONE(self)) {
// In CPython the callbacks are not immediately called and are instead
// called by the event loop. However, MicroPython's event loop doesn't
// support `call_soon` to handle callback processing.
//
// Because of this, it's close enough to call the callback immediately.
mp_call_function_2(callback, self_in, self->data);
return mp_const_none;
}
if (self->state != mp_const_true) {
// Tasks SHOULD support more than one callback per CPython but to reduce
// the surface area of this change tasks can currently only support one.
mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT(">1 callback unsupported"));
}
self->state = callback;
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_add_done_callback_obj, task_add_done_callback);
STATIC mp_obj_t task_remove_done_callback(mp_obj_t self_in, mp_obj_t callback) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
if (callback != self->state) {
// If the callback isn't a match we can count this as removing 0 callbacks
return mp_obj_new_int(0);
}
self->state = mp_const_true;
return mp_obj_new_int(1);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_remove_done_callback_obj, task_remove_done_callback);
STATIC mp_obj_t task_get_coro(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
return MP_OBJ_FROM_PTR(self->coro);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_get_coro_obj, task_get_coro);
STATIC mp_obj_t task_exception(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
if (!TASK_IS_DONE(self)) {
mp_obj_t error_type = mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_InvalidStateError));
nlr_raise(mp_make_raise_obj(error_type));
}
// If the exception is a cancelled error then we should raise it
if (IS_CANCELLED_ERROR(self->data)) {
nlr_raise(mp_make_raise_obj(self->data));
}
// If it's a StopIteration we should should return none
if (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&mp_type_StopIteration))) {
return mp_const_none;
}
if (!mp_obj_is_exception_instance(self->data)) {
return mp_const_none;
}
return self->data;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_exception_obj, task_exception);
STATIC mp_obj_t task_result(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
if (!TASK_IS_DONE(self)) {
mp_obj_t error_type = mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_InvalidStateError));
nlr_raise(mp_make_raise_obj(error_type));
}
// If `exception()` returns anything we raise that
mp_obj_t exception_obj = task_exception(self_in);
if (exception_obj != mp_const_none) {
nlr_raise(mp_make_raise_obj(exception_obj));
}
// If not StopIteration, bail early
if (!mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&mp_type_StopIteration))) {
return mp_const_none;
}
return mp_obj_exception_get_value(self->data);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_result_obj, task_result);
STATIC mp_obj_t task_cancelled(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
if (!TASK_IS_DONE(self)) {
// If task isn't done it can't possibly be cancelled, and would instead
// be considered "cancelling" even if a cancel was requested until it
// has fully completed.
return mp_obj_new_bool(false);
}
return mp_obj_new_bool(IS_CANCELLED_ERROR(self->data));
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancelled_obj, task_cancelled);
STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
// Check if task is already finished.
@ -276,6 +390,24 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
} else if (attr == MP_QSTR___await__) {
dest[0] = MP_OBJ_FROM_PTR(&task_await_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_add_done_callback) {
dest[0] = MP_OBJ_FROM_PTR(&task_add_done_callback_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_remove_done_callback) {
dest[0] = MP_OBJ_FROM_PTR(&task_remove_done_callback_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_get_coro) {
dest[0] = MP_OBJ_FROM_PTR(&task_get_coro_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_result) {
dest[0] = MP_OBJ_FROM_PTR(&task_result_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_exception) {
dest[0] = MP_OBJ_FROM_PTR(&task_exception_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_cancelled) {
dest[0] = MP_OBJ_FROM_PTR(&task_cancelled_obj);
dest[1] = self_in;
}
} else if (dest[1] != MP_OBJ_NULL) {
// Store
@ -289,6 +421,15 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
}
}
STATIC mp_obj_t task_unary_op(mp_unary_op_t op, mp_obj_t o_in) {
switch (op) {
case MP_UNARY_OP_HASH:
return MP_OBJ_NEW_SMALL_INT((mp_uint_t)o_in);
default:
return MP_OBJ_NULL; // op not supported
}
}
STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
(void)iter_buf;
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
@ -337,7 +478,8 @@ STATIC MP_DEFINE_CONST_OBJ_TYPE(
MP_TYPE_FLAG_ITER_IS_CUSTOM,
make_new, task_make_new,
attr, task_attr,
iter, &task_getiter_iternext
iter, &task_getiter_iternext,
unary_op, task_unary_op
);
/******************************************************************************/