2016-12-22 00:31:38 +03:00
|
|
|
# Test for utimeq module which implements task queue with support for
|
|
|
|
# wraparound time (utime.ticks_ms() style).
|
|
|
|
try:
|
|
|
|
from utime import ticks_add, ticks_diff
|
|
|
|
from utimeq import utimeq
|
|
|
|
except ImportError:
|
|
|
|
print("SKIP")
|
|
|
|
import sys
|
|
|
|
sys.exit()
|
|
|
|
|
|
|
|
DEBUG = 0
|
|
|
|
|
|
|
|
MAX = ticks_add(0, -1)
|
|
|
|
MODULO_HALF = MAX // 2 + 1
|
|
|
|
|
|
|
|
if DEBUG:
|
|
|
|
def dprint(*v):
|
|
|
|
print(*v)
|
|
|
|
else:
|
|
|
|
def dprint(*v):
|
|
|
|
pass
|
|
|
|
|
|
|
|
# Try not to crash on invalid data
|
|
|
|
h = utimeq(10)
|
|
|
|
try:
|
|
|
|
h.push(1)
|
|
|
|
assert False
|
|
|
|
except TypeError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
try:
|
|
|
|
h.pop(1)
|
|
|
|
assert False
|
|
|
|
except IndexError:
|
|
|
|
pass
|
|
|
|
|
2017-04-13 23:34:28 +10:00
|
|
|
# unsupported unary op
|
|
|
|
try:
|
|
|
|
~h
|
|
|
|
assert False
|
|
|
|
except TypeError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
# pushing on full queue
|
|
|
|
h = utimeq(1)
|
|
|
|
h.push(1, 0, 0)
|
|
|
|
try:
|
|
|
|
h.push(2, 0, 0)
|
|
|
|
assert False
|
|
|
|
except IndexError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
# popping into invalid type
|
|
|
|
try:
|
|
|
|
h.pop([])
|
|
|
|
assert False
|
|
|
|
except TypeError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
# length
|
|
|
|
assert len(h) == 1
|
|
|
|
|
|
|
|
# peektime
|
|
|
|
assert h.peektime() == 1
|
|
|
|
|
|
|
|
# peektime with empty queue
|
|
|
|
try:
|
|
|
|
utimeq(1).peektime()
|
|
|
|
assert False
|
|
|
|
except IndexError:
|
|
|
|
pass
|
2016-12-22 00:31:38 +03:00
|
|
|
|
|
|
|
def pop_all(h):
|
|
|
|
l = []
|
|
|
|
while h:
|
|
|
|
item = [0, 0, 0]
|
|
|
|
h.pop(item)
|
|
|
|
#print("!", item)
|
|
|
|
l.append(tuple(item))
|
|
|
|
dprint(l)
|
|
|
|
return l
|
|
|
|
|
|
|
|
def add(h, v):
|
|
|
|
h.push(v, 0, 0)
|
|
|
|
dprint("-----")
|
|
|
|
#h.dump()
|
|
|
|
dprint("-----")
|
|
|
|
|
|
|
|
h = utimeq(10)
|
|
|
|
add(h, 0)
|
|
|
|
add(h, MAX)
|
|
|
|
add(h, MAX - 1)
|
|
|
|
add(h, 101)
|
|
|
|
add(h, 100)
|
|
|
|
add(h, MAX - 2)
|
|
|
|
dprint(h)
|
|
|
|
l = pop_all(h)
|
|
|
|
for i in range(len(l) - 1):
|
|
|
|
diff = ticks_diff(l[i + 1][0], l[i][0])
|
|
|
|
assert diff > 0
|
|
|
|
|
|
|
|
def edge_case(edge, offset):
|
|
|
|
h = utimeq(10)
|
|
|
|
add(h, ticks_add(0, offset))
|
|
|
|
add(h, ticks_add(edge, offset))
|
|
|
|
dprint(h)
|
|
|
|
l = pop_all(h)
|
|
|
|
diff = ticks_diff(l[1][0], l[0][0])
|
|
|
|
dprint(diff, diff > 0)
|
|
|
|
return diff
|
|
|
|
|
|
|
|
dprint("===")
|
|
|
|
diff = edge_case(MODULO_HALF - 1, 0)
|
|
|
|
assert diff == MODULO_HALF - 1
|
|
|
|
assert edge_case(MODULO_HALF - 1, 100) == diff
|
|
|
|
assert edge_case(MODULO_HALF - 1, -100) == diff
|
|
|
|
|
|
|
|
# We expect diff to be always positive, per the definition of heappop() which should return
|
|
|
|
# the smallest value.
|
|
|
|
# This is the edge case where this invariant breaks, due to assymetry of two's-complement
|
|
|
|
# range - there's one more negative integer than positive, so heappushing values like below
|
|
|
|
# will then make ticks_diff() return the minimum negative value. We could make heappop
|
|
|
|
# return them in a different order, but ticks_diff() result would be the same. Conclusion:
|
|
|
|
# never add to a heap values where (a - b) == MODULO_HALF (and which are >= MODULO_HALF
|
|
|
|
# ticks apart in real time of course).
|
|
|
|
dprint("===")
|
|
|
|
diff = edge_case(MODULO_HALF, 0)
|
|
|
|
assert diff == -MODULO_HALF
|
|
|
|
assert edge_case(MODULO_HALF, 100) == diff
|
|
|
|
assert edge_case(MODULO_HALF, -100) == diff
|
|
|
|
|
|
|
|
dprint("===")
|
|
|
|
diff = edge_case(MODULO_HALF + 1, 0)
|
|
|
|
assert diff == MODULO_HALF - 1
|
|
|
|
assert edge_case(MODULO_HALF + 1, 100) == diff
|
|
|
|
assert edge_case(MODULO_HALF + 1, -100) == diff
|
|
|
|
|
|
|
|
print("OK")
|