50 lines
991 B
Python
50 lines
991 B
Python
|
# This test ensures that the scheduler doesn't trigger any assertions
|
||
|
# while dealing with concurrent access from multiple threads.
|
||
|
|
||
|
import _thread
|
||
|
import utime
|
||
|
import micropython
|
||
|
import gc
|
||
|
|
||
|
try:
|
||
|
micropython.schedule
|
||
|
except AttributeError:
|
||
|
print("SKIP")
|
||
|
raise SystemExit
|
||
|
|
||
|
gc.disable()
|
||
|
|
||
|
n = 0 # How many times the task successfully ran.
|
||
|
|
||
|
|
||
|
def task(x):
|
||
|
global n
|
||
|
n += 1
|
||
|
|
||
|
|
||
|
def thread():
|
||
|
while True:
|
||
|
try:
|
||
|
micropython.schedule(task, None)
|
||
|
except RuntimeError:
|
||
|
# Queue full, back off.
|
||
|
utime.sleep_ms(10)
|
||
|
|
||
|
|
||
|
for i in range(8):
|
||
|
_thread.start_new_thread(thread, ())
|
||
|
|
||
|
_NUM_TASKS = const(10000)
|
||
|
_TIMEOUT_MS = const(10000)
|
||
|
|
||
|
# Wait up to 10 seconds for 10000 tasks to be scheduled.
|
||
|
t = utime.ticks_ms()
|
||
|
while n < _NUM_TASKS and utime.ticks_diff(utime.ticks_ms(), t) < _TIMEOUT_MS:
|
||
|
pass
|
||
|
|
||
|
if n < _NUM_TASKS:
|
||
|
# Not all the tasks were scheduled, likely the scheduler stopped working.
|
||
|
print(n)
|
||
|
else:
|
||
|
print("PASS")
|