From 89a95b7c85cd90967020b80a0a4649b2f745bea8 Mon Sep 17 00:00:00 2001 From: Jim Mussared Date: Wed, 17 Jun 2020 14:57:52 +1000 Subject: [PATCH] examples/bluetooth: Add simple UART demo with central and peripheral. --- examples/bluetooth/ble_simple_central.py | 245 ++++++++++++++++++++ examples/bluetooth/ble_simple_peripheral.py | 98 ++++++++ 2 files changed, 343 insertions(+) create mode 100644 examples/bluetooth/ble_simple_central.py create mode 100644 examples/bluetooth/ble_simple_peripheral.py diff --git a/examples/bluetooth/ble_simple_central.py b/examples/bluetooth/ble_simple_central.py new file mode 100644 index 0000000000..f6996366e3 --- /dev/null +++ b/examples/bluetooth/ble_simple_central.py @@ -0,0 +1,245 @@ +# This example finds and connects to a peripheral running the +# UART service (e.g. ble_simple_peripheral.py). + +import bluetooth +import random +import struct +import time +import micropython + +from ble_advertising import decode_services, decode_name + +from micropython import const + +_IRQ_CENTRAL_CONNECT = const(1) +_IRQ_CENTRAL_DISCONNECT = const(2) +_IRQ_GATTS_WRITE = const(3) +_IRQ_GATTS_READ_REQUEST = const(4) +_IRQ_SCAN_RESULT = const(5) +_IRQ_SCAN_DONE = const(6) +_IRQ_PERIPHERAL_CONNECT = const(7) +_IRQ_PERIPHERAL_DISCONNECT = const(8) +_IRQ_GATTC_SERVICE_RESULT = const(9) +_IRQ_GATTC_SERVICE_DONE = const(10) +_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11) +_IRQ_GATTC_CHARACTERISTIC_DONE = const(12) +_IRQ_GATTC_DESCRIPTOR_RESULT = const(13) +_IRQ_GATTC_DESCRIPTOR_DONE = const(14) +_IRQ_GATTC_READ_RESULT = const(15) +_IRQ_GATTC_READ_DONE = const(16) +_IRQ_GATTC_WRITE_DONE = const(17) +_IRQ_GATTC_NOTIFY = const(18) +_IRQ_GATTC_INDICATE = const(19) + +_ADV_IND = const(0x00) +_ADV_DIRECT_IND = const(0x01) +_ADV_SCAN_IND = const(0x02) +_ADV_NONCONN_IND = const(0x03) + +_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") +_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") +_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") + + +class BLESimpleCentral: + def __init__(self, ble): + self._ble = ble + self._ble.active(True) + self._ble.irq(handler=self._irq) + + self._reset() + + def _reset(self): + # Cached name and address from a successful scan. + self._name = None + self._addr_type = None + self._addr = None + + # Callbacks for completion of various operations. + # These reset back to None after being invoked. + self._scan_callback = None + self._conn_callback = None + self._read_callback = None + + # Persistent callback for when new data is notified from the device. + self._notify_callback = None + + # Connected device. + self._conn_handle = None + self._start_handle = None + self._end_handle = None + self._tx_handle = None + self._rx_handle = None + + def _irq(self, event, data): + if event == _IRQ_SCAN_RESULT: + addr_type, addr, adv_type, rssi, adv_data = data + if adv_type in (_ADV_IND, _ADV_DIRECT_IND,) and _UART_SERVICE_UUID in decode_services( + adv_data + ): + # Found a potential device, remember it and stop scanning. + self._addr_type = addr_type + self._addr = bytes( + addr + ) # Note: addr buffer is owned by caller so need to copy it. + self._name = decode_name(adv_data) or "?" + self._ble.gap_scan(None) + + elif event == _IRQ_SCAN_DONE: + if self._scan_callback: + if self._addr: + # Found a device during the scan (and the scan was explicitly stopped). + self._scan_callback(self._addr_type, self._addr, self._name) + self._scan_callback = None + else: + # Scan timed out. + self._scan_callback(None, None, None) + + elif event == _IRQ_PERIPHERAL_CONNECT: + # Connect successful. + conn_handle, addr_type, addr, = data + if addr_type == self._addr_type and addr == self._addr: + self._conn_handle = conn_handle + self._ble.gattc_discover_services(self._conn_handle) + + elif event == _IRQ_PERIPHERAL_DISCONNECT: + # Disconnect (either initiated by us or the remote end). + conn_handle, _, _, = data + if conn_handle == self._conn_handle: + # If it was initiated by us, it'll already be reset. + self._reset() + + elif event == _IRQ_GATTC_SERVICE_RESULT: + # Connected device returned a service. + conn_handle, start_handle, end_handle, uuid = data + print("service", data) + if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID: + self._start_handle, self._end_handle = start_handle, end_handle + + elif event == _IRQ_GATTC_SERVICE_DONE: + # Service query complete. + if self._start_handle and self._end_handle: + self._ble.gattc_discover_characteristics( + self._conn_handle, self._start_handle, self._end_handle + ) + else: + print("Failed to find uart service.") + + elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: + # Connected device returned a characteristic. + conn_handle, def_handle, value_handle, properties, uuid = data + if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID: + self._rx_handle = value_handle + if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID: + self._tx_handle = value_handle + + elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: + # Characteristic query complete. + if self._tx_handle is not None and self._rx_handle is not None: + # We've finished connecting and discovering device, fire the connect callback. + if self._conn_callback: + self._conn_callback() + else: + print("Failed to find uart rx characteristic.") + + elif event == _IRQ_GATTC_WRITE_DONE: + conn_handle, value_handle, status = data + print("TX complete") + + elif event == _IRQ_GATTC_NOTIFY: + conn_handle, value_handle, notify_data = data + if conn_handle == self._conn_handle and value_handle == self._tx_handle: + if self._notify_callback: + self._notify_callback(notify_data) + + # Returns true if we've successfully connected and discovered characteristics. + def is_connected(self): + return ( + self._conn_handle is not None + and self._tx_handle is not None + and self._rx_handle is not None + ) + + # Find a device advertising the environmental sensor service. + def scan(self, callback=None): + self._addr_type = None + self._addr = None + self._scan_callback = callback + self._ble.gap_scan(2000, 30000, 30000) + + # Connect to the specified device (otherwise use cached address from a scan). + def connect(self, addr_type=None, addr=None, callback=None): + self._addr_type = addr_type or self._addr_type + self._addr = addr or self._addr + self._conn_callback = callback + if self._addr_type is None or self._addr is None: + return False + self._ble.gap_connect(self._addr_type, self._addr) + return True + + # Disconnect from current device. + def disconnect(self): + if not self._conn_handle: + return + self._ble.gap_disconnect(self._conn_handle) + self._reset() + + # Send data over the UART + def write(self, v, response=False): + if not self.is_connected(): + return + self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0) + + # Set handler for when data is received over the UART. + def on_notify(self, callback): + self._notify_callback = callback + + +def demo(): + ble = bluetooth.BLE() + central = BLESimpleCentral(ble) + + not_found = False + + def on_scan(addr_type, addr, name): + if addr_type is not None: + print("Found peripheral:", addr_type, addr, name) + central.connect() + else: + nonlocal not_found + not_found = True + print("No peripheral found.") + + central.scan(callback=on_scan) + + # Wait for connection... + while not central.is_connected(): + time.sleep_ms(100) + if not_found: + return + + print("Connected") + + def on_rx(v): + print("RX", v) + + central.on_notify(on_rx) + + with_response = False + + i = 0 + while central.is_connected(): + try: + v = str(i) + "_" + print("TX", v) + central.write(v, with_response) + except: + print("TX failed") + i += 1 + time.sleep_ms(400 if with_response else 30) + + print("Disconnected") + + +if __name__ == "__main__": + demo() diff --git a/examples/bluetooth/ble_simple_peripheral.py b/examples/bluetooth/ble_simple_peripheral.py new file mode 100644 index 0000000000..08cd7fa98d --- /dev/null +++ b/examples/bluetooth/ble_simple_peripheral.py @@ -0,0 +1,98 @@ +# This example demonstrates a UART periperhal. + +import bluetooth +import random +import struct +import time +from ble_advertising import advertising_payload + +from micropython import const + +_IRQ_CENTRAL_CONNECT = const(1) +_IRQ_CENTRAL_DISCONNECT = const(2) +_IRQ_GATTS_WRITE = const(3) + +_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") +_UART_TX = ( + bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"), + bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY, +) +_UART_RX = ( + bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"), + bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE, +) +_UART_SERVICE = ( + _UART_UUID, + (_UART_TX, _UART_RX,), +) + + +class BLESimplePeripheral: + def __init__(self, ble, name="mpy-uart"): + self._ble = ble + self._ble.active(True) + self._ble.irq(handler=self._irq) + ((self._handle_tx, self._handle_rx,),) = self._ble.gatts_register_services( + (_UART_SERVICE,) + ) + self._connections = set() + self._write_callback = None + self._payload = advertising_payload(name=name, services=[_UART_UUID],) + self._advertise() + + def _irq(self, event, data): + # Track connections so we can send notifications. + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, _, _, = data + print("New connection", conn_handle) + self._connections.add(conn_handle) + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _, = data + print("Disconnected", conn_handle) + self._connections.remove(conn_handle) + # Start advertising again to allow a new connection. + self._advertise() + elif event == _IRQ_GATTS_WRITE: + conn_handle, value_handle = data + value = self._ble.gatts_read(value_handle) + if value_handle == self._handle_rx and self._write_callback: + self._write_callback(value) + + def send(self, data): + for conn_handle in self._connections: + self._ble.gatts_notify(conn_handle, self._handle_tx, data) + + def is_connected(self): + return len(self._connections) > 0 + + def _advertise(self, interval_us=500000): + print("Starting advertising") + self._ble.gap_advertise(interval_us, adv_data=self._payload) + + def on_write(self, callback): + self._write_callback = callback + + +def demo(): + ble = bluetooth.BLE() + p = BLESimplePeripheral(ble) + + def on_rx(v): + print("RX", v) + + p.on_write(on_rx) + + i = 0 + while True: + if p.is_connected(): + # Short burst of queued notifications. + for _ in range(3): + data = str(i) + "_" + print("TX", data) + p.send(data) + i += 1 + time.sleep_ms(100) + + +if __name__ == "__main__": + demo()