circuitpython/examples/bluetooth/ble_uart_peripheral.py
Jim Mussared e6881f0829 extmod/modbluetooth: Make modbluetooth event not a bitfield.
There doesn't appear to be any use for only triggering on specific events,
so it's just easier to number them sequentially.  This makes them smaller
values so they take up only 1 byte in the ringbuf, only 1 byte for the
opcode in the bytecode, and makes room for more events.

Also add a couple of new event types that need to be implemented (to avoid
re-numbering later).

And rename _COMPLETE and _STATUS to _DONE for consistency.

In the future the "trigger" keyword argument can be reinstated by requiring
the user to compute the bitmask, eg:

    ble.irq(handler, 1 << _IRQ_SCAN_RESULT | 1 << _IRQ_SCAN_DONE)
2020-06-05 14:04:20 +10:00

117 lines
3.4 KiB
Python

# This example demonstrates a peripheral implementing the Nordic UART Service (NUS).
import bluetooth
from ble_advertising import advertising_payload
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
bluetooth.FLAG_NOTIFY,
)
_UART_RX = (
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
bluetooth.FLAG_WRITE,
)
_UART_SERVICE = (
_UART_UUID,
(_UART_TX, _UART_RX,),
)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_COMPUTER = const(128)
class BLEUART:
def __init__(self, ble, name="mpy-uart", rxbuf=100):
self._ble = ble
self._ble.active(True)
self._ble.irq(handler=self._irq)
((self._tx_handle, self._rx_handle,),) = self._ble.gatts_register_services(
(_UART_SERVICE,)
)
# Increase the size of the rx buffer and enable append mode.
self._ble.gatts_set_buffer(self._rx_handle, rxbuf, True)
self._connections = set()
self._rx_buffer = bytearray()
self._handler = None
# Optionally add services=[_UART_UUID], but this is likely to make the payload too large.
self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER)
self._advertise()
def irq(self, handler):
self._handler = handler
def _irq(self, event, data):
# Track connections so we can send notifications.
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _, = data
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _, = data
if conn_handle in self._connections:
self._connections.remove(conn_handle)
# Start advertising again to allow a new connection.
self._advertise()
elif event == _IRQ_GATTS_WRITE:
conn_handle, value_handle, = data
if conn_handle in self._connections and value_handle == self._rx_handle:
self._rx_buffer += self._ble.gatts_read(self._rx_handle)
if self._handler:
self._handler()
def any(self):
return len(self._rx_buffer)
def read(self, sz=None):
if not sz:
sz = len(self._rx_buffer)
result = self._rx_buffer[0:sz]
self._rx_buffer = self._rx_buffer[sz:]
return result
def write(self, data):
for conn_handle in self._connections:
self._ble.gatts_notify(conn_handle, self._tx_handle, data)
def close(self):
for conn_handle in self._connections:
self._ble.gap_disconnect(conn_handle)
self._connections.clear()
def _advertise(self, interval_us=500000):
self._ble.gap_advertise(interval_us, adv_data=self._payload)
def demo():
import time
ble = bluetooth.BLE()
uart = BLEUART(ble)
def on_rx():
print("rx: ", uart.read().decode().strip())
uart.irq(handler=on_rx)
nums = [4, 8, 15, 16, 23, 42]
i = 0
try:
while True:
uart.write(str(nums[i]) + "\n")
i = (i + 1) % len(nums)
time.sleep_ms(1000)
except KeyboardInterrupt:
pass
uart.close()
if __name__ == "__main__":
demo()