81 lines
1.9 KiB
Python
81 lines
1.9 KiB
Python
# Proof-of-concept of a REPL over BLE UART.
|
|
#
|
|
# Tested with the Adafruit Bluefruit app on Android.
|
|
# Set the EoL characters to \r\n.
|
|
|
|
import bluetooth
|
|
import io
|
|
import os
|
|
import micropython
|
|
import machine
|
|
|
|
from ble_uart_peripheral import BLEUART
|
|
|
|
_MP_STREAM_POLL = const(3)
|
|
_MP_STREAM_POLL_RD = const(0x0001)
|
|
|
|
# TODO: Remove this when STM32 gets machine.Timer.
|
|
if hasattr(machine, 'Timer'):
|
|
_timer = machine.Timer(-1)
|
|
else:
|
|
_timer = None
|
|
|
|
# Batch writes into 50ms intervals.
|
|
def schedule_in(handler, delay_ms):
|
|
def _wrap(_arg):
|
|
handler()
|
|
if _timer:
|
|
_timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap)
|
|
else:
|
|
micropython.schedule(_wrap, None)
|
|
|
|
# Simple buffering stream to support the dupterm requirements.
|
|
class BLEUARTStream(io.IOBase):
|
|
def __init__(self, uart):
|
|
self._uart = uart
|
|
self._tx_buf = bytearray()
|
|
self._uart.irq(self._on_rx)
|
|
|
|
def _on_rx(self):
|
|
# Needed for ESP32.
|
|
if hasattr(os, 'dupterm_notify'):
|
|
os.dupterm_notify(None)
|
|
|
|
def read(self, sz=None):
|
|
return self._uart.read(sz)
|
|
|
|
def readinto(self, buf):
|
|
avail = self._uart.read(len(buf))
|
|
if not avail:
|
|
return None
|
|
for i in range(len(avail)):
|
|
buf[i] = avail[i]
|
|
return len(avail)
|
|
|
|
def ioctl(self, op, arg):
|
|
if op == _MP_STREAM_POLL:
|
|
if self._uart.any():
|
|
return _MP_STREAM_POLL_RD
|
|
return 0
|
|
|
|
def _flush(self):
|
|
data = self._tx_buf[0:100]
|
|
self._tx_buf = self._tx_buf[100:]
|
|
self._uart.write(data)
|
|
if self._tx_buf:
|
|
schedule_in(self._flush, 50)
|
|
|
|
def write(self, buf):
|
|
empty = not self._tx_buf
|
|
self._tx_buf += buf
|
|
if empty:
|
|
schedule_in(self._flush, 50)
|
|
|
|
|
|
def start():
|
|
ble = bluetooth.BLE()
|
|
uart = BLEUART(ble, name='mpy-repl')
|
|
stream = BLEUARTStream(uart)
|
|
|
|
os.dupterm(stream)
|