3dc324d3f1
This adds the Python files in the tests/ directory to be formatted with ./tools/codeformat.py. The basics/ subdirectory is excluded for now so we aren't changing too much at once. In a few places `# fmt: off`/`# fmt: on` was used where the code had special formatting for readability or where the test was actually testing the specific formatting.
306 lines
6.2 KiB
Python
306 lines
6.2 KiB
Python
try:
|
|
from pyb import CAN
|
|
except ImportError:
|
|
print("SKIP")
|
|
raise SystemExit
|
|
|
|
from array import array
|
|
import micropython
|
|
import pyb
|
|
|
|
# test we can correctly create by id (2 handled in can2.py test)
|
|
for bus in (-1, 0, 1, 3):
|
|
try:
|
|
CAN(bus, CAN.LOOPBACK)
|
|
print("CAN", bus)
|
|
except ValueError:
|
|
print("ValueError", bus)
|
|
CAN(1).deinit()
|
|
|
|
CAN.initfilterbanks(14)
|
|
can = CAN(1)
|
|
print(can)
|
|
|
|
# Test state when de-init'd
|
|
print(can.state() == can.STOPPED)
|
|
|
|
can.init(CAN.LOOPBACK)
|
|
print(can)
|
|
print(can.any(0))
|
|
|
|
# Test state when freshly created
|
|
print(can.state() == can.ERROR_ACTIVE)
|
|
|
|
# Test that restart can be called
|
|
can.restart()
|
|
|
|
# Test info returns a sensible value
|
|
print(can.info())
|
|
|
|
# Catch all filter
|
|
can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0))
|
|
|
|
can.send("abcd", 123, timeout=5000)
|
|
print(can.any(0), can.info())
|
|
print(can.recv(0))
|
|
|
|
can.send("abcd", -1, timeout=5000)
|
|
print(can.recv(0))
|
|
|
|
can.send("abcd", 0x7FF + 1, timeout=5000)
|
|
print(can.recv(0))
|
|
|
|
# Test too long message
|
|
try:
|
|
can.send("abcdefghi", 0x7FF, timeout=5000)
|
|
except ValueError:
|
|
print("passed")
|
|
else:
|
|
print("failed")
|
|
|
|
# Test that recv can work without allocating memory on the heap
|
|
|
|
buf = bytearray(10)
|
|
l = [0, 0, 0, memoryview(buf)]
|
|
l2 = None
|
|
|
|
micropython.heap_lock()
|
|
|
|
can.send("", 42)
|
|
l2 = can.recv(0, l)
|
|
assert l is l2
|
|
print(l, len(l[3]), buf)
|
|
|
|
can.send("1234", 42)
|
|
l2 = can.recv(0, l)
|
|
assert l is l2
|
|
print(l, len(l[3]), buf)
|
|
|
|
can.send("01234567", 42)
|
|
l2 = can.recv(0, l)
|
|
assert l is l2
|
|
print(l, len(l[3]), buf)
|
|
|
|
can.send("abc", 42)
|
|
l2 = can.recv(0, l)
|
|
assert l is l2
|
|
print(l, len(l[3]), buf)
|
|
|
|
micropython.heap_unlock()
|
|
|
|
# Test that recv can work with different arrays behind the memoryview
|
|
can.send("abc", 1)
|
|
print(bytes(can.recv(0, [0, 0, 0, memoryview(array("B", range(8)))])[3]))
|
|
can.send("def", 1)
|
|
print(bytes(can.recv(0, [0, 0, 0, memoryview(array("b", range(8)))])[3]))
|
|
|
|
# Test for non-list passed as second arg to recv
|
|
can.send("abc", 1)
|
|
try:
|
|
can.recv(0, 1)
|
|
except TypeError:
|
|
print("TypeError")
|
|
|
|
# Test for too-short-list passed as second arg to recv
|
|
can.send("abc", 1)
|
|
try:
|
|
can.recv(0, [0, 0, 0])
|
|
except ValueError:
|
|
print("ValueError")
|
|
|
|
# Test for non-memoryview passed as 4th element to recv
|
|
can.send("abc", 1)
|
|
try:
|
|
can.recv(0, [0, 0, 0, 0])
|
|
except TypeError:
|
|
print("TypeError")
|
|
|
|
# Test for read-only-memoryview passed as 4th element to recv
|
|
can.send("abc", 1)
|
|
try:
|
|
can.recv(0, [0, 0, 0, memoryview(bytes(8))])
|
|
except ValueError:
|
|
print("ValueError")
|
|
|
|
# Test for bad-typecode-memoryview passed as 4th element to recv
|
|
can.send("abc", 1)
|
|
try:
|
|
can.recv(0, [0, 0, 0, memoryview(array("i", range(8)))])
|
|
except ValueError:
|
|
print("ValueError")
|
|
|
|
del can
|
|
|
|
# Testing extended IDs
|
|
can = CAN(1, CAN.LOOPBACK, extframe=True)
|
|
# Catch all filter
|
|
can.setfilter(0, CAN.MASK32, 0, (0, 0))
|
|
|
|
print(can)
|
|
|
|
try:
|
|
can.send("abcde", 0x7FF + 1, timeout=5000)
|
|
except ValueError:
|
|
print("failed")
|
|
else:
|
|
r = can.recv(0)
|
|
if r[0] == 0x7FF + 1 and r[3] == b"abcde":
|
|
print("passed")
|
|
else:
|
|
print("failed, wrong data received")
|
|
|
|
# Test filters
|
|
for n in [0, 8, 16, 24]:
|
|
filter_id = 0b00001000 << n
|
|
filter_mask = 0b00011100 << n
|
|
id_ok = 0b00001010 << n
|
|
id_fail = 0b00011010 << n
|
|
|
|
can.clearfilter(0)
|
|
can.setfilter(0, pyb.CAN.MASK32, 0, (filter_id, filter_mask))
|
|
|
|
can.send("ok", id_ok, timeout=3)
|
|
if can.any(0):
|
|
msg = can.recv(0)
|
|
print((hex(filter_id), hex(filter_mask), hex(msg[0]), msg[3]))
|
|
|
|
can.send("fail", id_fail, timeout=3)
|
|
if can.any(0):
|
|
msg = can.recv(0)
|
|
print((hex(filter_id), hex(filter_mask), hex(msg[0]), msg[3]))
|
|
|
|
del can
|
|
|
|
# Test RxCallbacks
|
|
can = CAN(1, CAN.LOOPBACK)
|
|
can.setfilter(0, CAN.LIST16, 0, (1, 2, 3, 4))
|
|
can.setfilter(1, CAN.LIST16, 1, (5, 6, 7, 8))
|
|
|
|
|
|
def cb0(bus, reason):
|
|
print("cb0")
|
|
if reason == 0:
|
|
print("pending")
|
|
if reason == 1:
|
|
print("full")
|
|
if reason == 2:
|
|
print("overflow")
|
|
|
|
|
|
def cb1(bus, reason):
|
|
print("cb1")
|
|
if reason == 0:
|
|
print("pending")
|
|
if reason == 1:
|
|
print("full")
|
|
if reason == 2:
|
|
print("overflow")
|
|
|
|
|
|
def cb0a(bus, reason):
|
|
print("cb0a")
|
|
if reason == 0:
|
|
print("pending")
|
|
if reason == 1:
|
|
print("full")
|
|
if reason == 2:
|
|
print("overflow")
|
|
|
|
|
|
def cb1a(bus, reason):
|
|
print("cb1a")
|
|
if reason == 0:
|
|
print("pending")
|
|
if reason == 1:
|
|
print("full")
|
|
if reason == 2:
|
|
print("overflow")
|
|
|
|
|
|
can.rxcallback(0, cb0)
|
|
can.rxcallback(1, cb1)
|
|
|
|
can.send("11111111", 1, timeout=5000)
|
|
can.send("22222222", 2, timeout=5000)
|
|
can.send("33333333", 3, timeout=5000)
|
|
can.rxcallback(0, cb0a)
|
|
can.send("44444444", 4, timeout=5000)
|
|
|
|
can.send("55555555", 5, timeout=5000)
|
|
can.send("66666666", 6, timeout=5000)
|
|
can.send("77777777", 7, timeout=5000)
|
|
can.rxcallback(1, cb1a)
|
|
can.send("88888888", 8, timeout=5000)
|
|
|
|
print(can.recv(0))
|
|
print(can.recv(0))
|
|
print(can.recv(0))
|
|
print(can.recv(1))
|
|
print(can.recv(1))
|
|
print(can.recv(1))
|
|
|
|
can.send("11111111", 1, timeout=5000)
|
|
can.send("55555555", 5, timeout=5000)
|
|
|
|
print(can.recv(0))
|
|
print(can.recv(1))
|
|
|
|
del can
|
|
|
|
# Testing asynchronous send
|
|
can = CAN(1, CAN.LOOPBACK)
|
|
can.setfilter(0, CAN.MASK16, 0, (0, 0, 0, 0))
|
|
|
|
while can.any(0):
|
|
can.recv(0)
|
|
|
|
can.send("abcde", 1, timeout=0)
|
|
print(can.any(0))
|
|
while not can.any(0):
|
|
pass
|
|
|
|
print(can.recv(0))
|
|
|
|
try:
|
|
can.send("abcde", 2, timeout=0)
|
|
can.send("abcde", 3, timeout=0)
|
|
can.send("abcde", 4, timeout=0)
|
|
can.send("abcde", 5, timeout=0)
|
|
except OSError as e:
|
|
if str(e) == "16":
|
|
print("passed")
|
|
else:
|
|
print("failed")
|
|
|
|
pyb.delay(500)
|
|
while can.any(0):
|
|
print(can.recv(0))
|
|
|
|
# Testing rtr messages
|
|
bus1 = CAN(1, CAN.LOOPBACK)
|
|
while bus1.any(0):
|
|
bus1.recv(0)
|
|
bus1.setfilter(0, CAN.LIST16, 0, (1, 2, 3, 4))
|
|
bus1.setfilter(1, CAN.LIST16, 0, (5, 6, 7, 8), rtr=(True, True, True, True))
|
|
bus1.setfilter(2, CAN.MASK16, 0, (64, 64, 32, 32), rtr=(False, True))
|
|
|
|
bus1.send("", 1, rtr=True)
|
|
print(bus1.any(0))
|
|
bus1.send("", 5, rtr=True)
|
|
print(bus1.recv(0))
|
|
bus1.send("", 6, rtr=True)
|
|
print(bus1.recv(0))
|
|
bus1.send("", 7, rtr=True)
|
|
print(bus1.recv(0))
|
|
bus1.send("", 16, rtr=True)
|
|
print(bus1.any(0))
|
|
bus1.send("", 32, rtr=True)
|
|
print(bus1.recv(0))
|
|
|
|
# test HAL error, timeout
|
|
can = pyb.CAN(1, pyb.CAN.NORMAL)
|
|
try:
|
|
can.send("1", 1, timeout=50)
|
|
except OSError as e:
|
|
print(repr(e))
|