diff --git a/drivers/nrf24l01/nrf24l01.py b/drivers/nrf24l01/nrf24l01.py new file mode 100644 index 0000000000..255aade83f --- /dev/null +++ b/drivers/nrf24l01/nrf24l01.py @@ -0,0 +1,234 @@ +"""NRF24L01 driver for Micro Python""" + +import pyb + +# nRF24L01+ registers +CONFIG = const(0x00) +EN_RXADDR = const(0x02) +SETUP_AW = const(0x03) +SETUP_RETR = const(0x04) +RF_CH = const(0x05) +RF_SETUP = const(0x06) +STATUS = const(0x07) +OBSERVE_TX = const(0x08) +RX_ADDR_P0 = const(0x0a) +TX_ADDR = const(0x10) +RX_PW_P0 = const(0x11) +FIFO_STATUS = const(0x17) +DYNPD = const(0x1c) + +# CONFIG register +EN_CRC = const(0x08) # enable CRC +CRCO = const(0x04) # CRC encoding scheme; 0=1 byte, 1=2 bytes +PWR_UP = const(0x02) # 1=power up, 0=power down +PRIM_RX = const(0x01) # RX/TX control; 0=PTX, 1=PRX + +# RF_SETUP register +POWER_0 = const(0x00) # -18 dBm +POWER_1 = const(0x02) # -12 dBm +POWER_2 = const(0x04) # -6 dBm +POWER_3 = const(0x06) # 0 dBm +SPEED_1M = const(0x00) +SPEED_2M = const(0x08) +SPEED_250K = const(0x20) + +# STATUS register +RX_DR = const(0x40) # RX data ready; write 1 to clear +TX_DS = const(0x20) # TX data sent; write 1 to clear +MAX_RT = const(0x10) # max retransmits reached; write 1 to clear + +# FIFO_STATUS register +RX_EMPTY = const(0x01) # 1 if RX FIFO is empty + +# constants for instructions +R_RX_PL_WID = const(0x60) # read RX payload width +R_RX_PAYLOAD = const(0x61) # read RX payload +W_TX_PAYLOAD = const(0xa0) # write TX payload +FLUSH_TX = const(0xe1) # flush TX FIFO +FLUSH_RX = const(0xe2) # flush RX FIFO +NOP = const(0xff) # use to read STATUS register + +class NRF24L01: + def __init__(self, spi, cs, ce, channel=46, payload_size=16): + assert payload_size <= 32 + + # init the SPI bus and pins + spi.init(spi.MASTER, baudrate=4000000, polarity=0, phase=1, firstbit=spi.MSB) + cs.init(cs.OUT_PP, cs.PULL_NONE) + ce.init(ce.OUT_PP, ce.PULL_NONE) + + # store the pins + self.spi = spi + self.cs = cs + self.ce = ce + + # reset everything + self.ce.low() + self.cs.high() + self.payload_size = payload_size + self.pipe0_read_addr = None + pyb.delay(5) + + # set address width to 5 bytes + self.reg_write(SETUP_AW, 0b11) + + # disable dynamic payloads + self.reg_write(DYNPD, 0) + + # auto retransmit delay: 1750us + # auto retransmit count: 8 + self.reg_write(SETUP_RETR, (6 << 4) | 8) + + # set rf power and speed + self.set_power_speed(POWER_3, SPEED_1M) + + # init CRC + self.set_crc(2) + + # clear status flags + self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT) + + # set channel + self.set_channel(channel) + + # flush buffers + self.flush_rx() + self.flush_tx() + + def reg_read(self, reg): + self.cs.low() + self.spi.send_recv(reg) + buf = self.spi.recv(1) + self.cs.high() + return buf[0] + + def reg_read_ret_status(self, reg): + self.cs.low() + status = self.spi.send_recv(reg)[0] + buf = self.spi.recv(1) + self.cs.high() + return status + + def reg_write(self, reg, buf): + self.cs.low() + status = self.spi.send_recv(0x20 | reg)[0] + self.spi.send(buf) + self.cs.high() + return status + + def flush_rx(self): + self.cs.low() + self.spi.send(FLUSH_RX) + self.cs.high() + + def flush_tx(self): + self.cs.low() + self.spi.send(FLUSH_TX) + self.cs.high() + + # power is one of POWER_x defines; speed is one of SPEED_x defines + def set_power_speed(self, power, speed): + setup = self.reg_read(RF_SETUP) & 0b11010001 + self.reg_write(RF_SETUP, setup | power | speed) + + # length in bytes: 0, 1 or 2 + def set_crc(self, length): + config = self.reg_read(CONFIG) & ~(CRCO | EN_CRC) + if length == 0: + pass + elif length == 1: + config |= EN_CRC + else: + config |= EN_CRC | CRCO + self.reg_write(CONFIG, config) + + def set_channel(self, channel): + self.reg_write(RF_CH, min(channel, 127)) + + # address should be a bytes object 5 bytes long + def open_tx_pipe(self, address): + assert len(address) == 5 + self.reg_write(RX_ADDR_P0, address) + self.reg_write(TX_ADDR, address) + self.reg_write(RX_PW_P0, self.payload_size) + + # address should be a bytes object 5 bytes long + # pipe 0 and 1 have 5 byte address + # pipes 2-5 use same 4 most-significant bytes as pipe 1, plus 1 extra byte + def open_rx_pipe(self, pipe_id, address): + assert len(address) == 5 + assert 0 <= pipe_id <= 5 + if pipe_id == 0: + self.pipe0_read_addr = address + if pipe_id < 2: + self.reg_write(RX_ADDR_P0 + pipe_id, address) + else: + self.reg_write(RX_ADDR_P0 + pipe_id, address[0]) + self.reg_write(RX_PW_P0 + pipe_id, self.payload_size) + self.reg_write(EN_RXADDR, self.reg_read(EN_RXADDR) | (1 << pipe_id)) + + def start_listening(self): + self.reg_write(CONFIG, self.reg_read(CONFIG) | PWR_UP | PRIM_RX) + self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT) + + if self.pipe0_read_addr is not None: + self.reg_write(RX_ADDR_P0, self.pipe0_read_addr) + + self.flush_rx() + self.flush_tx() + self.ce.high() + pyb.udelay(130) + + def stop_listening(self): + self.ce.low() + self.flush_tx() + self.flush_rx() + + # returns True if any data available to recv + def any(self): + return not bool(self.reg_read(FIFO_STATUS) & RX_EMPTY) + + def recv(self): + # get the data + self.cs.low() + self.spi.send(R_RX_PAYLOAD) + buf = self.spi.recv(self.payload_size) + self.cs.high() + + # clear RX ready flag + self.reg_write(STATUS, RX_DR) + + return buf + + def send(self, buf, timeout=500): + # power up + self.reg_write(CONFIG, (self.reg_read(CONFIG) | PWR_UP) & ~PRIM_RX) + pyb.udelay(150) + + # send the data + self.cs.low() + self.spi.send(W_TX_PAYLOAD) + self.spi.send(buf) + if len(buf) < self.payload_size: + self.spi.send(b'\x00' * (self.payload_size - len(buf))) # pad out data + self.cs.high() + + # enable the chip so it can send the data + self.ce.high() + pyb.udelay(15) # needs to be >10us + self.ce.low() + + # blocking wait for tx complete + start = pyb.millis() + while pyb.millis() - start < timeout: + status = self.reg_read_ret_status(OBSERVE_TX) + if status & (TX_DS | MAX_RT): + break + + # get and clear all status flags + status = self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT) + if not (status & TX_DS): + raise OSError("send failed") + + # power down + self.reg_write(CONFIG, self.reg_read(CONFIG) & ~PWR_UP) diff --git a/drivers/nrf24l01/nrf24l01test.py b/drivers/nrf24l01/nrf24l01test.py new file mode 100644 index 0000000000..264bc21c8c --- /dev/null +++ b/drivers/nrf24l01/nrf24l01test.py @@ -0,0 +1,100 @@ +"""Test for nrf24l01 module.""" + +import struct +import pyb +from pyb import Pin, SPI +from nrf24l01 import NRF24L01 + +pipes = (b'\xf0\xf0\xf0\xf0\xe1', b'\xf0\xf0\xf0\xf0\xd2') + +def master(): + nrf = NRF24L01(SPI(2), Pin('Y5'), Pin('Y4'), payload_size=8) + + nrf.open_tx_pipe(pipes[0]) + nrf.open_rx_pipe(1, pipes[1]) + nrf.start_listening() + + num_needed = 16 + num_successes = 0 + num_failures = 0 + led_state = 0 + + print('NRF24L01 master mode, sending %d packets...' % num_needed) + + while num_successes < num_needed and num_failures < num_needed: + # stop listening and send packet + nrf.stop_listening() + millis = pyb.millis() + led_state = max(1, (led_state << 1) & 0x0f) + print('sending:', millis, led_state) + try: + nrf.send(struct.pack('ii', millis, led_state)) + except OSError: + pass + + # start listening again + nrf.start_listening() + + # wait for response, with 250ms timeout + start_time = pyb.millis() + timeout = False + while not nrf.any() and not timeout: + if pyb.elapsed_millis(start_time) > 250: + timeout = True + + if timeout: + print('failed, respones timed out') + num_failures += 1 + + else: + # recv packet + got_millis, = struct.unpack('i', nrf.recv()) + + # print response and round-trip delay + print('got response:', got_millis, '(delay', pyb.millis() - got_millis, 'ms)') + num_successes += 1 + + # delay then loop + pyb.delay(250) + + print('master finished sending; succeses=%d, failures=%d' % (num_successes, num_failures)) + +def slave(): + nrf = NRF24L01(SPI(2), Pin('Y5'), Pin('Y4'), payload_size=8) + + nrf.open_tx_pipe(pipes[1]) + nrf.open_rx_pipe(1, pipes[0]) + nrf.start_listening() + + print('NRF24L01 slave mode, waiting for packets... (ctrl-C to stop)') + + while True: + pyb.wfi() + if nrf.any(): + while nrf.any(): + buf = nrf.recv() + millis, led_state = struct.unpack('ii', buf) + print('received:', millis, led_state) + for i in range(4): + if led_state & (1 << i): + pyb.LED(i + 1).on() + else: + pyb.LED(i + 1).off() + pyb.delay(15) + + nrf.stop_listening() + try: + nrf.send(struct.pack('i', millis)) + except OSError: + pass + print('sent response') + nrf.start_listening() + +print('NRF24L01 test module loaded') +print('NRF24L01 pinout for test:') +print(' CE on Y4') +print(' CSN on Y5') +print(' SCK on Y6') +print(' MISO on Y7') +print(' MOSI on Y8') +print('run nrf24l01test.slave() on slave, then nrf24l01test.master() on master')