diff --git a/tools/cpboard.py b/tools/cpboard.py index 8c60077c2c..e5a29ab254 100644 --- a/tools/cpboard.py +++ b/tools/cpboard.py @@ -39,27 +39,34 @@ import serial.tools.list_ports import sh import shutil + class CPboardError(BaseException): pass + # supervisor/messages/default.h: MSG_NEWLINE = b"\r\n" MSG_SAFE_MODE_CRASH = b"Looks like our core CircuitPython code crashed hard. Whoops!" -MSG_SAFE_MODE_BROWN_OUT_LINE_1 = b"The microcontroller's power dipped. Please make sure your power supply provides" -MSG_SAFE_MODE_BROWN_OUT_LINE_2 = b"enough power for the whole circuit and press reset (after ejecting CIRCUITPY)." +MSG_SAFE_MODE_BROWN_OUT_LINE_1 = ( + b"The microcontroller's power dipped. Please make sure your power supply provides" +) +MSG_SAFE_MODE_BROWN_OUT_LINE_2 = ( + b"enough power for the whole circuit and press reset (after ejecting CIRCUITPY)." +) MSG_WAIT_BEFORE_REPL = b"Press any key to enter the REPL. Use CTRL-D to reload." + class REPL: - CHAR_CTRL_A = b'\x01' - CHAR_CTRL_B = b'\x02' - CHAR_CTRL_C = b'\x03' - CHAR_CTRL_D = b'\x04' + CHAR_CTRL_A = b"\x01" + CHAR_CTRL_B = b"\x02" + CHAR_CTRL_C = b"\x03" + CHAR_CTRL_D = b"\x04" def __init__(self, board): self.board = board self.write_chunk_size = 32 self.safe_mode = False - self.session = b'' + self.session = b"" def __enter__(self): self.reset() @@ -76,12 +83,12 @@ class REPL: if self.serial.in_waiting: data = self.serial.read(self.serial.in_waiting) else: - data = b'' + data = b"" self.session += data return data def read_until(self, ending, timeout=10): - data = b'' + data = b"" timeout_count = 0 while True: if data.endswith(ending): @@ -102,10 +109,10 @@ class REPL: if chunk_size is None: chunk_size = self.write_chunk_size if not isinstance(data, bytes): - data = bytes(data, encoding='utf8') + data = bytes(data, encoding="utf8") for i in range(0, len(data), chunk_size): - chunk = data[i:min(i + chunk_size, len(data))] + chunk = data[i : min(i + chunk_size, len(data))] self.session += chunk self.serial.write(chunk) time.sleep(0.01) @@ -113,28 +120,30 @@ class REPL: def reset(self): # Use read() since serial.reset_input_buffer() fails with termios.error now and then self.read() - self.session = b'' - self.write(b'\r' + REPL.CHAR_CTRL_C + REPL.CHAR_CTRL_C) # interrupt any running program - self.write(b'\r' + REPL.CHAR_CTRL_B) # enter or reset friendly repl - data = self.read_until(b'>>> ') + self.session = b"" + self.write( + b"\r" + REPL.CHAR_CTRL_C + REPL.CHAR_CTRL_C + ) # interrupt any running program + self.write(b"\r" + REPL.CHAR_CTRL_B) # enter or reset friendly repl + data = self.read_until(b">>> ") def execute(self, code, timeout=10, wait_for_response=False): - self.read() # Throw away + self.read() # Throw away self.write(REPL.CHAR_CTRL_A) - self.read_until(b'\r\n>') + self.read_until(b"\r\n>") self.write(code) self.write(REPL.CHAR_CTRL_D) if wait_for_response: - return b'', b'' - self.read_until(b'OK') + return b"", b"" + self.read_until(b"OK") - output = self.read_until(b'\x04', timeout=timeout) + output = self.read_until(b"\x04", timeout=timeout) output = output[:-1] - error = self.read_until(b'\x04') + error = self.read_until(b"\x04") error = error[:-1] return output, error @@ -146,8 +155,8 @@ class REPL: self.reset() self.write(REPL.CHAR_CTRL_D) - data = self.read_until(b' output:\r\n') - if b'Running in safe mode' in data: + data = self.read_until(b" output:\r\n") + if b"Running in safe mode" in data: self.safe_mode = True raise RuntimeError("Can't run in safe mode") @@ -159,7 +168,7 @@ class REPL: data = data.split(marker)[0] # Haven't found out why we have to strip off this... - if data.endswith(b'\r\n\r\n'): + if data.endswith(b"\r\n\r\n"): data = data[:-4] return data @@ -168,9 +177,13 @@ class Disk: def __init__(self, dev): self.dev = os.path.realpath(dev) self.mountpoint = None - with open('/etc/mtab', 'r') as f: + with open("/etc/mtab", "r") as f: mtab = f.read() - mount = [mount.split(' ') for mount in mtab.splitlines() if mount.startswith(self.dev)] + mount = [ + mount.split(" ") + for mount in mtab.splitlines() + if mount.startswith(self.dev) + ] if mount: self._path = mount[0][1] else: @@ -245,22 +258,22 @@ class Firmware: @property def info(self): with self.disk as disk: - fname = os.path.join(disk.path, 'INFO_UF2.TXT') - with open(fname, 'r') as f: + fname = os.path.join(disk.path, "INFO_UF2.TXT") + with open(fname, "r") as f: info = f.read() lines = info.splitlines() res = {} - res['header'] = lines[0] + res["header"] = lines[0] for line in lines[1:]: - k, _, v = line.partition(':') - res[k.replace(':', '')] = v.strip() + k, _, v = line.partition(":") + res[k.replace(":", "")] = v.strip() return res def upload(self, fw): - with open(fw, 'rb') as f: + with open(fw, "rb") as f: header = f.read(32) - if header[0:4] != b'UF2\n': - raise ValueError('Only UF2 files are supported') + if header[0:4] != b"UF2\n": + raise ValueError("Only UF2 files are supported") self.board.close() with self.disk as disk: disk.copy(fw, sync=False) @@ -274,35 +287,37 @@ class CPboard: except ValueError: pass - vendor, _, product = name.partition(':') + vendor, _, product = name.partition(":") if vendor and product: - return CPboard.from_usb(**kwargs, idVendor=int(vendor, 16), idProduct=int(product, 16)) + return CPboard.from_usb( + **kwargs, idVendor=int(vendor, 16), idProduct=int(product, 16) + ) return CPboard(name, **kwargs) @classmethod def from_build_name(cls, name, **kwargs): boards = { - #'arduino_zero' - 'circuitplayground_express' : (0x239a, 0x8019), - #'feather_m0_adalogger' : (0x239a, ), - #'feather_m0_basic' : (0x239a, ), - 'feather_m0_express' : (0x239a, 0x8023), - #'feather_m0_rfm69' : (0x239a, ), - #'feather_m0_rfm9x' : (0x239a, ), - #'feather_m0_supersized' : (0x239a, ), - #'feather_m4_express' : (0x239a, ), - #'gemma_m0' : (0x239a, ), - #'itsybitsy_m0_express' : (0x239a, ), - #'itsybitsy_m4_express' : (0x239a, ), - 'metro_m0_express' : (0x239a, 0x8014), - 'metro_m4_express' : (0x239a, 0x8021), - #'metro_m4_express_revb' : (0x239a, ), - #'pirkey_m0' : (0x239a, ), - #'trinket_m0' : (0x239a, ), - #'trinket_m0_haxpress' : (0x239a, ), - #'ugame10' - } + #'arduino_zero' + "circuitplayground_express": (0x239A, 0x8019), + #'feather_m0_adalogger' : (0x239a, ), + #'feather_m0_basic' : (0x239a, ), + "feather_m0_express": (0x239A, 0x8023), + #'feather_m0_rfm69' : (0x239a, ), + #'feather_m0_rfm9x' : (0x239a, ), + #'feather_m0_supersized' : (0x239a, ), + #'feather_m4_express' : (0x239a, ), + #'gemma_m0' : (0x239a, ), + #'itsybitsy_m0_express' : (0x239a, ), + "itsybitsy_m4_express": (0x239A, 0x802C), + "metro_m0_express": (0x239A, 0x8014), + "metro_m4_express": (0x239A, 0x8021), + #'metro_m4_express_revb' : (0x239a, ), + #'pirkey_m0' : (0x239a, ), + #'trinket_m0' : (0x239a, ), + #'trinket_m0_haxpress' : (0x239a, ), + #'ugame10' + } try: vendor, product = boards[name] @@ -314,26 +329,26 @@ class CPboard: @classmethod def from_build_name_bootloader(cls, name, **kwargs): boards = { - #'arduino_zero' - #'circuitplayground_express' : (0x239a, ), - #'feather_m0_adalogger' : (0x239a, ), - #'feather_m0_basic' : (0x239a, ), - 'feather_m0_express' : (0x239a, 0x001b), - #'feather_m0_rfm69' : (0x239a, ), - #'feather_m0_rfm9x' : (0x239a, ), - #'feather_m0_supersized' : (0x239a, ), - #'feather_m4_express' : (0x239a, ), - #'gemma_m0' : (0x239a, ), - #'itsybitsy_m0_express' : (0x239a, ), - #'itsybitsy_m4_express' : (0x239a, ), - #'metro_m0_express' : (0x239a, 0x8014), - 'metro_m4_express' : (0x239a, 0x0021), - #'metro_m4_express_revb' : (0x239a, ), - #'pirkey_m0' : (0x239a, ), - #'trinket_m0' : (0x239a, ), - #'trinket_m0_haxpress' : (0x239a, ), - #'ugame10' - } + #'arduino_zero' + #'circuitplayground_express' : (0x239a, ), + #'feather_m0_adalogger' : (0x239a, ), + #'feather_m0_basic' : (0x239a, ), + "feather_m0_express": (0x239A, 0x001B), + #'feather_m0_rfm69' : (0x239a, ), + #'feather_m0_rfm9x' : (0x239a, ), + #'feather_m0_supersized' : (0x239a, ), + #'feather_m4_express' : (0x239a, ), + #'gemma_m0' : (0x239a, ), + #'itsybitsy_m0_express' : (0x239a, ), + "itsybitsy_m4_express": (0x239A, 0x002B), + #'metro_m0_express' : (0x239a, 0x8014), + "metro_m4_express": (0x239A, 0x0021), + #'metro_m4_express_revb' : (0x239a, ), + #'pirkey_m0' : (0x239a, ), + #'trinket_m0' : (0x239a, ), + #'trinket_m0_haxpress' : (0x239a, ), + #'ugame10' + } try: vendor, product = boards[name] @@ -347,16 +362,17 @@ class CPboard: @classmethod def from_usb(cls, baudrate=115200, wait=0, timeout=10, **kwargs): import usb.core + dev = usb.core.find(**kwargs) if not dev: s = "Can't find USB device: " args = [] for x in kwargs.items(): try: - args.append('%s=0x%x' % x) + args.append("%s=0x%x" % x) except: - args.append('%s = %s' % x) - raise RuntimeError("Can't find USB device: " + ', '.join(args)) + args.append("%s = %s" % x) + raise RuntimeError("Can't find USB device: " + ", ".join(args)) return cls(dev, baudrate=baudrate, wait=wait, timeout=timeout) def __init__(self, device, baudrate=115200, wait=0, timeout=10): @@ -364,11 +380,15 @@ class CPboard: self.usb_dev = None try: # Is it a usb.core.Device? - portstr = ':' + '.'.join(map(str, device.port_numbers)) + ':' + portstr = ":" + ".".join(map(str, device.port_numbers)) + ":" except: pass else: - serials = [serial for serial in os.listdir("/dev/serial/by-path") if portstr in serial] + serials = [ + serial + for serial in os.listdir("/dev/serial/by-path") + if portstr in serial + ] if len(serials) != 1: raise RuntimeError("Can't find excatly one matching usb serial device") self.device = os.path.realpath("/dev/serial/by-path/" + serials[0]) @@ -401,26 +421,29 @@ class CPboard: delayed = False for attempt in range(wait + 1): try: - self.serial = serial.Serial(self.device, baudrate=self.baudrate, - timeout=self.timeout, - inter_byte_timeout=10, - write_timeout=self.timeout) + self.serial = serial.Serial( + self.device, + baudrate=self.baudrate, + timeout=self.timeout, + inter_byte_timeout=10, + write_timeout=self.timeout, + ) break - except (OSError, IOError): # Py2 and Py3 have different errors + except (OSError, IOError): # Py2 and Py3 have different errors if wait == 0: continue if attempt == 0: - sys.stdout.write('Waiting {} seconds for board '.format(wait)) + sys.stdout.write("Waiting {} seconds for board ".format(wait)) delayed = True time.sleep(1) - sys.stdout.write('.') + sys.stdout.write(".") sys.stdout.flush() else: if delayed: - print('') - raise CPboardError('failed to access ' + self.device) + print("") + raise CPboardError("failed to access " + self.device) if delayed: - print('') + print("") def close(self): if self.serial: @@ -430,38 +453,43 @@ class CPboard: def exec(self, command, timeout=10, wait_for_response=False): with self.repl as repl: try: - output, error = repl.execute(command, timeout=timeout, - wait_for_response=wait_for_response) + output, error = repl.execute( + command, timeout=timeout, wait_for_response=wait_for_response + ) except OSError as e: if self.debug: - print('exec: session: ', self.repl.session) - raise CPboardError('timeout', e) + print("exec: session: ", self.repl.session) + raise CPboardError("timeout", e) if error: - raise CPboardError('exception', output, error) + raise CPboardError("exception", output, error) return output def eval(self, expression, timeout=10): - command = 'print({})'.format(expression) + command = "print({})".format(expression) with self.repl as repl: output, error = repl.execute(command, timeout=timeout) if error: - raise CPboardError('exception', output, error) + raise CPboardError("exception", output, error) try: - res = eval(str(output, encoding='utf8')) + res = eval(str(output, encoding="utf8")) except: - raise CPboardError('failed to eval: %s' % output) + raise CPboardError("failed to eval: %s" % output) return res - def _reset(self, mode='NORMAL'): - self.exec("import microcontroller;microcontroller.on_next_reset(microcontroller.RunMode.%s)" % mode) + def _reset(self, mode="NORMAL"): + self.exec( + "import microcontroller;microcontroller.on_next_reset(microcontroller.RunMode.%s)" + % mode + ) try: - self.exec("import microcontroller;microcontroller.reset()", - wait_for_response=True) + self.exec( + "import microcontroller;microcontroller.reset()", wait_for_response=True + ) except OSError: pass def reset(self, safe_mode=False, delay=5, wait=10): - self._reset('SAFE_MODE' if safe_mode else 'NORMAL') + self._reset("SAFE_MODE" if safe_mode else "NORMAL") self.close() time.sleep(delay) self.open(wait) @@ -469,7 +497,7 @@ class CPboard: def reset_to_bootloader(self, repl=False): if repl: - self._reset('BOOTLOADER') + self._reset("BOOTLOADER") self.close() else: self.close() @@ -486,7 +514,7 @@ class CPboard: @property def serial_number(self): - try: # Permissions are needed to read the value + try: # Permissions are needed to read the value return self.usb_dev.serial_number except: pass @@ -495,18 +523,26 @@ class CPboard: def get_disks(self): if self.usb_dev: - portstr = ':' + '.'.join(map(str, self.usb_dev.port_numbers)) + ':' - return ["/dev/disk/by-path/" + disk for disk in os.listdir("/dev/disk/by-path") if portstr in disk] + portstr = ":" + ".".join(map(str, self.usb_dev.port_numbers)) + ":" + return [ + "/dev/disk/by-path/" + disk + for disk in os.listdir("/dev/disk/by-path") + if portstr in disk + ] serial = self.serial_number if not serial: raise RuntimeError("Serial number not found for: " + self.device) - return ["/dev/disk/by-id/" + disk for disk in os.listdir("/dev/disk/by-id") if serial in disk] + return [ + "/dev/disk/by-id/" + disk + for disk in os.listdir("/dev/disk/by-id") + if serial in disk + ] @property def disk(self): disks = self.get_disks() - part = [part for part in disks if 'part1' in part] + part = [part for part in disks if "part1" in part] if not part: raise RuntimeError("Disk not found for: " + self.device) @@ -518,13 +554,13 @@ class CPboard: def execfile_disk(self, filename): with self.disk as disk: - disk.copy(filename, 'code.py') + disk.copy(filename, "code.py") with self.repl as repl: try: output = repl.run() except OSError as e: - raise CPboardError('timeout', e) + raise CPboardError("timeout", e) except RuntimeError: if self.repl.safe_mode: raise PyboardError("Can't run in safe mode") @@ -534,10 +570,10 @@ class CPboard: return output def execfile(self, filename, timeout=10): - if os.environ.get('CPBOARD_EXEC_MODE') == 'disk': + if os.environ.get("CPBOARD_EXEC_MODE") == "disk": return self.execfile_disk(filename) else: - with open(filename, 'rb') as f: + with open(filename, "rb") as f: pyfile = f.read() return self.exec(pyfile, timeout=timeout) @@ -545,11 +581,14 @@ class CPboard: # Implement just enough to make tests/run-tests work PyboardError = CPboardError + class Pyboard: - def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0): + def __init__( + self, device, baudrate=115200, user="micro", password="python", wait=0 + ): self.board = CPboard.from_try_all(device, baudrate=baudrate, wait=wait) with self.board.disk as disk: - disk.copy('skip_if.py') + disk.copy("skip_if.py") def close(self): self.board.close() @@ -563,12 +602,13 @@ class Pyboard: def eval_namedtuple(board, command): from collections import namedtuple + s = board.exec("print(%s)" % command) s = s.decode().strip() - items = [key.split('=') for key in s[1:-1].split(', ')] + items = [key.split("=") for key in s[1:-1].split(", ")] keys = [item[0] for item in items] vals = [item[1] for item in items] - nt = namedtuple('eval', keys) + nt = namedtuple("eval", keys) res = nt(*[eval(val) for val in vals]) return res @@ -576,14 +616,16 @@ def eval_namedtuple(board, command): def os_uname(board): return eval_namedtuple(board, "__import__('os').uname()") + def print_verbose(cargs, *args, **kwargs): if cargs.verbose: print(*args, flush=True, **kwargs) + def upload(args): try: board = CPboard.from_build_name_bootloader(args.board) - print_verbose(args, 'Board is already in the bootloader') + print_verbose(args, "Board is already in the bootloader") except (ValueError, RuntimeError): board = CPboard.from_try_all(args.board) @@ -591,29 +633,32 @@ def upload(args): if not (args.quiet or board.bootloader): board.open() - print('Current version:', os_uname(board).version, flush=True) + print("Current version:", os_uname(board).version, flush=True) if not board.bootloader: - print_verbose(args, 'Reset to bootloader...', end='') - board.reset_to_bootloader(repl=True) # Feather M0 Express doesn't respond to 1200 baud + print_verbose(args, "Reset to bootloader...", end="") + board.reset_to_bootloader( + repl=True + ) # Feather M0 Express doesn't respond to 1200 baud time.sleep(5) - print_verbose(args, 'done') + print_verbose(args, "done") - print_verbose(args, 'Bootloader:', board.firmware.info) + print_verbose(args, "Bootloader:", board.firmware.info) - print_verbose(args, 'Upload firmware...', end='') + print_verbose(args, "Upload firmware...", end="") board.firmware.upload(args.firmware) - print_verbose(args, 'done') + print_verbose(args, "done") - print_verbose(args, 'Wait for board...', end='') + print_verbose(args, "Wait for board...", end="") time.sleep(5) - print_verbose(args, 'done') + print_verbose(args, "done") if not args.quiet: if board.bootloader: board = CPboard.from_try_all(args.board) board.open(wait=10) - print('New version:', os_uname(board).version, flush=True) + print("New version:", os_uname(board).version, flush=True) + def print_error_exit(args, e): if args.debug: @@ -622,16 +667,20 @@ def print_error_exit(args, e): print(e, file=sys.stderr) sys.exit(1) + def main(): import argparse - cmd_parser = argparse.ArgumentParser(description='Circuit Python Board Tool') - cmd_parser.add_argument('board', help='build_name, vid:pid or /dev/tty') - cmd_parser.add_argument('-f', '--firmware', help='upload UF2 firmware file') - cmd_parser.add_argument('-c', '--command', help='program passed in as string') - cmd_parser.add_argument('--tty', action='store_true', help='print tty') - cmd_parser.add_argument('--verbose', '-v', action='count', default=0, help='be verbose') - cmd_parser.add_argument('-q', '--quiet', action='store_true', help='be quiet') - cmd_parser.add_argument('--debug', action='store_true', help='raise exceptions') + + cmd_parser = argparse.ArgumentParser(description="Circuit Python Board Tool") + cmd_parser.add_argument("board", help="build_name, vid:pid or /dev/tty") + cmd_parser.add_argument("-f", "--firmware", help="upload UF2 firmware file") + cmd_parser.add_argument("-c", "--command", help="program passed in as string") + cmd_parser.add_argument("--tty", action="store_true", help="print tty") + cmd_parser.add_argument( + "--verbose", "-v", action="count", default=0, help="be verbose" + ) + cmd_parser.add_argument("-q", "--quiet", action="store_true", help="be quiet") + cmd_parser.add_argument("--debug", action="store_true", help="raise exceptions") args = cmd_parser.parse_args() if args.quiet: @@ -653,9 +702,9 @@ def main(): raise if args.verbose: - exec_mode = os.environ.get('CPBOARD_EXEC_MODE') + exec_mode = os.environ.get("CPBOARD_EXEC_MODE") if exec_mode: - print('CPBOARD_EXEC_MODE =', exec_mode) + print("CPBOARD_EXEC_MODE =", exec_mode) # Make sure we can open serial try: @@ -672,18 +721,21 @@ def main(): print(b.eval(args.command)) else: with board as b: - print('Device: ', end='') + print("Device: ", end="") if b.usb_dev: - print('%04x:%04x on ' % (b.usb_dev.idVendor, b.usb_dev.idProduct), end='') + print( + "%04x:%04x on " % (b.usb_dev.idVendor, b.usb_dev.idProduct), end="" + ) print(b.device) - print('Serial number:', b.serial_number) + print("Serial number:", b.serial_number) uname = os_uname(b) - print('os.uname:') - print(' sysname:', uname.sysname) - print(' nodename:', uname.nodename) - print(' release:', uname.release) - print(' version:', uname.version) - print(' machine:', uname.machine) + print("os.uname:") + print(" sysname:", uname.sysname) + print(" nodename:", uname.nodename) + print(" release:", uname.release) + print(" version:", uname.version) + print(" machine:", uname.machine) + if __name__ == "__main__": main()