40ad8f1666
This is consistent with the other 'micro' modules and allows implementing additional features in Python via e.g. micropython-lib's sys. Note this is a breaking change (not backwards compatible) for ports which do not enable weak links, as "import sys" must now be replaced with "import usys".
151 lines
4.7 KiB
Python
151 lines
4.7 KiB
Python
"""Test for nrf24l01 module. Portable between MicroPython targets."""
|
|
|
|
import usys
|
|
import ustruct as struct
|
|
import utime
|
|
from machine import Pin, SPI
|
|
from nrf24l01 import NRF24L01
|
|
from micropython import const
|
|
|
|
# Slave pause between receiving data and checking for further packets.
|
|
_RX_POLL_DELAY = const(15)
|
|
# Slave pauses an additional _SLAVE_SEND_DELAY ms after receiving data and before
|
|
# transmitting to allow the (remote) master time to get into receive mode. The
|
|
# master may be a slow device. Value tested with Pyboard, ESP32 and ESP8266.
|
|
_SLAVE_SEND_DELAY = const(10)
|
|
|
|
if usys.platform == "pyboard":
|
|
cfg = {"spi": 2, "miso": "Y7", "mosi": "Y8", "sck": "Y6", "csn": "Y5", "ce": "Y4"}
|
|
elif usys.platform == "esp8266": # Hardware SPI
|
|
cfg = {"spi": 1, "miso": 12, "mosi": 13, "sck": 14, "csn": 4, "ce": 5}
|
|
elif usys.platform == "esp32": # Software SPI
|
|
cfg = {"spi": -1, "miso": 32, "mosi": 33, "sck": 25, "csn": 26, "ce": 27}
|
|
else:
|
|
raise ValueError("Unsupported platform {}".format(usys.platform))
|
|
|
|
# Addresses are in little-endian format. They correspond to big-endian
|
|
# 0xf0f0f0f0e1, 0xf0f0f0f0d2
|
|
pipes = (b"\xe1\xf0\xf0\xf0\xf0", b"\xd2\xf0\xf0\xf0\xf0")
|
|
|
|
|
|
def master():
|
|
csn = Pin(cfg["csn"], mode=Pin.OUT, value=1)
|
|
ce = Pin(cfg["ce"], mode=Pin.OUT, value=0)
|
|
if cfg["spi"] == -1:
|
|
spi = SPI(-1, sck=Pin(cfg["sck"]), mosi=Pin(cfg["mosi"]), miso=Pin(cfg["miso"]))
|
|
nrf = NRF24L01(spi, csn, ce, payload_size=8)
|
|
else:
|
|
nrf = NRF24L01(SPI(cfg["spi"]), csn, ce, payload_size=8)
|
|
|
|
nrf.open_tx_pipe(pipes[0])
|
|
nrf.open_rx_pipe(1, pipes[1])
|
|
nrf.start_listening()
|
|
|
|
num_needed = 16
|
|
num_successes = 0
|
|
num_failures = 0
|
|
led_state = 0
|
|
|
|
print("NRF24L01 master mode, sending %d packets..." % num_needed)
|
|
|
|
while num_successes < num_needed and num_failures < num_needed:
|
|
# stop listening and send packet
|
|
nrf.stop_listening()
|
|
millis = utime.ticks_ms()
|
|
led_state = max(1, (led_state << 1) & 0x0F)
|
|
print("sending:", millis, led_state)
|
|
try:
|
|
nrf.send(struct.pack("ii", millis, led_state))
|
|
except OSError:
|
|
pass
|
|
|
|
# start listening again
|
|
nrf.start_listening()
|
|
|
|
# wait for response, with 250ms timeout
|
|
start_time = utime.ticks_ms()
|
|
timeout = False
|
|
while not nrf.any() and not timeout:
|
|
if utime.ticks_diff(utime.ticks_ms(), start_time) > 250:
|
|
timeout = True
|
|
|
|
if timeout:
|
|
print("failed, response timed out")
|
|
num_failures += 1
|
|
|
|
else:
|
|
# recv packet
|
|
(got_millis,) = struct.unpack("i", nrf.recv())
|
|
|
|
# print response and round-trip delay
|
|
print(
|
|
"got response:",
|
|
got_millis,
|
|
"(delay",
|
|
utime.ticks_diff(utime.ticks_ms(), got_millis),
|
|
"ms)",
|
|
)
|
|
num_successes += 1
|
|
|
|
# delay then loop
|
|
utime.sleep_ms(250)
|
|
|
|
print("master finished sending; successes=%d, failures=%d" % (num_successes, num_failures))
|
|
|
|
|
|
def slave():
|
|
csn = Pin(cfg["csn"], mode=Pin.OUT, value=1)
|
|
ce = Pin(cfg["ce"], mode=Pin.OUT, value=0)
|
|
if cfg["spi"] == -1:
|
|
spi = SPI(-1, sck=Pin(cfg["sck"]), mosi=Pin(cfg["mosi"]), miso=Pin(cfg["miso"]))
|
|
nrf = NRF24L01(spi, csn, ce, payload_size=8)
|
|
else:
|
|
nrf = NRF24L01(SPI(cfg["spi"]), csn, ce, payload_size=8)
|
|
|
|
nrf.open_tx_pipe(pipes[1])
|
|
nrf.open_rx_pipe(1, pipes[0])
|
|
nrf.start_listening()
|
|
|
|
print("NRF24L01 slave mode, waiting for packets... (ctrl-C to stop)")
|
|
|
|
while True:
|
|
if nrf.any():
|
|
while nrf.any():
|
|
buf = nrf.recv()
|
|
millis, led_state = struct.unpack("ii", buf)
|
|
print("received:", millis, led_state)
|
|
for led in leds:
|
|
if led_state & 1:
|
|
led.on()
|
|
else:
|
|
led.off()
|
|
led_state >>= 1
|
|
utime.sleep_ms(_RX_POLL_DELAY)
|
|
|
|
# Give master time to get into receive mode.
|
|
utime.sleep_ms(_SLAVE_SEND_DELAY)
|
|
nrf.stop_listening()
|
|
try:
|
|
nrf.send(struct.pack("i", millis))
|
|
except OSError:
|
|
pass
|
|
print("sent response")
|
|
nrf.start_listening()
|
|
|
|
|
|
try:
|
|
import pyb
|
|
|
|
leds = [pyb.LED(i + 1) for i in range(4)]
|
|
except:
|
|
leds = []
|
|
|
|
print("NRF24L01 test module loaded")
|
|
print("NRF24L01 pinout for test:")
|
|
print(" CE on", cfg["ce"])
|
|
print(" CSN on", cfg["csn"])
|
|
print(" SCK on", cfg["sck"])
|
|
print(" MISO on", cfg["miso"])
|
|
print(" MOSI on", cfg["mosi"])
|
|
print("run nrf24l01test.slave() on slave, then nrf24l01test.master() on master")
|