circuitpython/tests/extmod/asyncio_threadsafeflag.py
Jim Mussared fae83a6b4d tests/extmod/asyncio_threadsafeflag.py: Update for unix select.
1. Remove the skip for detecting support for polling user-defined objects
   as this is always possible now on all ports.
2. Don't print when the scheduled task runs as the ordering of this
   relative to the other prints is dependent on other factors (e.g. if
   using the native emitter).

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
2023-09-29 17:58:40 +10:00

87 lines
1.6 KiB
Python

# Test Event class
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
import micropython
try:
micropython.schedule
except AttributeError:
print("SKIP")
raise SystemExit
async def task(id, flag):
print("task", id)
await flag.wait()
print("task", id, "done")
def set_from_schedule(flag):
flag.set()
async def main():
flag = asyncio.ThreadSafeFlag()
# Set the flag from within the loop.
t = asyncio.create_task(task(1, flag))
print("yield")
await asyncio.sleep(0)
print("set event")
flag.set()
print("yield")
await asyncio.sleep(0)
print("wait task")
await t
# Set the flag from scheduler context.
print("----")
t = asyncio.create_task(task(2, flag))
print("yield")
await asyncio.sleep(0)
print("set event")
micropython.schedule(set_from_schedule, flag)
print("yield")
await asyncio.sleep(0)
print("wait task")
await t
# Flag already set.
print("----")
print("set event")
flag.set()
t = asyncio.create_task(task(3, flag))
print("yield")
await asyncio.sleep(0)
print("wait task")
await t
# Flag set, cleared, and set again.
print("----")
print("set event")
flag.set()
print("yield")
await asyncio.sleep(0)
print("clear event")
flag.clear()
print("yield")
await asyncio.sleep(0)
t = asyncio.create_task(task(4, flag))
print("yield")
await asyncio.sleep(0)
print("set event")
flag.set()
print("yield")
await asyncio.sleep(0)
print("wait task")
await t
asyncio.run(main())