2018-06-02 18:39:15 +02:00
|
|
|
#!/usr/bin/env python3
|
2020-06-03 23:40:05 +01:00
|
|
|
|
|
|
|
# SPDX-FileCopyrightText: Copyright (c) 2014-2016 Damien P. George
|
|
|
|
# SPDX-FileCopyrightText: Copyright (c) 2017 Paul Sokolovsky
|
|
|
|
# SPDX-FileCopyrightText: Copyright (c) 2017 Scott Shawcroft for Adafruit Industries
|
|
|
|
# SPDX-FileCopyrightText: Copyright (c) 2018 Noralf Trønnes
|
|
|
|
# SPDX-FileCopyrightText: 2014 MicroPython & CircuitPython contributors (https://github.com/adafruit/circuitpython/graphs/contributors)
|
2018-06-02 18:39:15 +02:00
|
|
|
#
|
2020-06-03 23:40:05 +01:00
|
|
|
# SPDX-License-Identifier: MIT
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
import os
|
2023-02-28 15:07:35 -08:00
|
|
|
import pathlib
|
2018-06-02 18:39:15 +02:00
|
|
|
import re
|
|
|
|
import serial
|
|
|
|
import sys
|
|
|
|
import time
|
|
|
|
|
|
|
|
import serial.tools.list_ports
|
|
|
|
import sh
|
|
|
|
import shutil
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
class CPboardError(BaseException):
|
|
|
|
pass
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
# supervisor/messages/default.h:
|
|
|
|
MSG_NEWLINE = b"\r\n"
|
|
|
|
MSG_SAFE_MODE_CRASH = b"Looks like our core CircuitPython code crashed hard. Whoops!"
|
2020-06-03 18:01:12 -05:00
|
|
|
MSG_SAFE_MODE_BROWN_OUT_LINE_1 = (
|
|
|
|
b"The microcontroller's power dipped. Please make sure your power supply provides"
|
|
|
|
)
|
|
|
|
MSG_SAFE_MODE_BROWN_OUT_LINE_2 = (
|
|
|
|
b"enough power for the whole circuit and press reset (after ejecting CIRCUITPY)."
|
|
|
|
)
|
2018-06-02 18:39:15 +02:00
|
|
|
MSG_WAIT_BEFORE_REPL = b"Press any key to enter the REPL. Use CTRL-D to reload."
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
class REPL:
|
2020-06-03 18:01:12 -05:00
|
|
|
CHAR_CTRL_A = b"\x01"
|
|
|
|
CHAR_CTRL_B = b"\x02"
|
|
|
|
CHAR_CTRL_C = b"\x03"
|
|
|
|
CHAR_CTRL_D = b"\x04"
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
def __init__(self, board):
|
|
|
|
self.board = board
|
|
|
|
self.write_chunk_size = 32
|
|
|
|
self.safe_mode = False
|
2020-06-03 18:01:12 -05:00
|
|
|
self.session = b""
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
self.reset()
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, exception_type, exception_value, traceback):
|
|
|
|
pass
|
|
|
|
|
|
|
|
@property
|
|
|
|
def serial(self):
|
|
|
|
return self.board.serial
|
|
|
|
|
|
|
|
def read(self):
|
2020-06-03 17:39:31 -05:00
|
|
|
if self.serial.in_waiting:
|
|
|
|
data = self.serial.read(self.serial.in_waiting)
|
2018-06-02 18:39:15 +02:00
|
|
|
else:
|
2020-06-03 18:01:12 -05:00
|
|
|
data = b""
|
2018-06-02 18:39:15 +02:00
|
|
|
self.session += data
|
|
|
|
return data
|
|
|
|
|
|
|
|
def read_until(self, ending, timeout=10):
|
2020-06-03 18:01:12 -05:00
|
|
|
data = b""
|
2018-06-02 18:39:15 +02:00
|
|
|
timeout_count = 0
|
|
|
|
while True:
|
|
|
|
if data.endswith(ending):
|
|
|
|
break
|
2020-06-03 17:39:31 -05:00
|
|
|
elif self.serial.in_waiting > 0:
|
2018-06-02 18:39:15 +02:00
|
|
|
new_data = self.serial.read(1)
|
|
|
|
data += new_data
|
|
|
|
self.session += new_data
|
|
|
|
timeout_count = 0
|
|
|
|
else:
|
|
|
|
timeout_count += 1
|
|
|
|
if timeout is not None and timeout_count >= 100 * timeout:
|
2023-03-17 08:49:47 -07:00
|
|
|
print("timeout")
|
2018-06-02 18:39:15 +02:00
|
|
|
raise TimeoutError(110, "timeout waiting for", ending)
|
|
|
|
time.sleep(0.01)
|
|
|
|
return data
|
|
|
|
|
|
|
|
def write(self, data, chunk_size=None):
|
|
|
|
if chunk_size is None:
|
|
|
|
chunk_size = self.write_chunk_size
|
|
|
|
if not isinstance(data, bytes):
|
2020-06-03 18:01:12 -05:00
|
|
|
data = bytes(data, encoding="utf8")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
for i in range(0, len(data), chunk_size):
|
2020-06-03 18:01:12 -05:00
|
|
|
chunk = data[i : min(i + chunk_size, len(data))]
|
2018-06-02 18:39:15 +02:00
|
|
|
self.session += chunk
|
2023-02-28 15:07:35 -08:00
|
|
|
c = self.serial.write(chunk)
|
|
|
|
if c < len(chunk):
|
|
|
|
raise RuntimeError()
|
2018-06-02 18:39:15 +02:00
|
|
|
time.sleep(0.01)
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
# Use read() since serial.reset_input_buffer() fails with termios.error now and then
|
|
|
|
self.read()
|
2020-06-03 18:01:12 -05:00
|
|
|
self.session = b""
|
2023-02-28 15:07:35 -08:00
|
|
|
self.write(REPL.CHAR_CTRL_C + REPL.CHAR_CTRL_C) # interrupt any running program
|
2020-06-03 18:01:12 -05:00
|
|
|
self.write(b"\r" + REPL.CHAR_CTRL_B) # enter or reset friendly repl
|
2023-02-28 15:07:35 -08:00
|
|
|
self.read_until(b">>> ", timeout=60)
|
2018-06-02 18:39:15 +02:00
|
|
|
|
2020-06-03 21:51:33 -05:00
|
|
|
def execute(self, code, timeout=10, wait_for_response=True):
|
2020-06-03 18:01:12 -05:00
|
|
|
self.read() # Throw away
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
self.write(REPL.CHAR_CTRL_A)
|
2020-06-03 18:01:12 -05:00
|
|
|
self.read_until(b"\r\n>")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
self.write(code)
|
|
|
|
|
|
|
|
self.write(REPL.CHAR_CTRL_D)
|
2020-06-03 21:51:33 -05:00
|
|
|
if not wait_for_response:
|
2020-06-03 18:01:12 -05:00
|
|
|
return b"", b""
|
|
|
|
self.read_until(b"OK")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
output = self.read_until(b"\x04", timeout=timeout)
|
2018-06-02 18:39:15 +02:00
|
|
|
output = output[:-1]
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
error = self.read_until(b"\x04")
|
2018-06-02 18:39:15 +02:00
|
|
|
error = error[:-1]
|
|
|
|
|
|
|
|
return output, error
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
if self.safe_mode:
|
|
|
|
raise RuntimeError("Can't run in safe mode")
|
|
|
|
|
|
|
|
self.reset()
|
|
|
|
|
|
|
|
self.write(REPL.CHAR_CTRL_D)
|
2020-06-03 18:01:12 -05:00
|
|
|
data = self.read_until(b" output:\r\n")
|
|
|
|
if b"Running in safe mode" in data:
|
2018-06-02 18:39:15 +02:00
|
|
|
self.safe_mode = True
|
|
|
|
raise RuntimeError("Can't run in safe mode")
|
|
|
|
|
|
|
|
# TODO: MSG_SAFE_MODE_CRASH
|
|
|
|
# TODO: BROWNOUT
|
|
|
|
|
|
|
|
marker = MSG_NEWLINE + MSG_WAIT_BEFORE_REPL + MSG_NEWLINE
|
|
|
|
data = self.read_until(marker)
|
|
|
|
data = data.split(marker)[0]
|
|
|
|
|
|
|
|
# Haven't found out why we have to strip off this...
|
2020-06-03 18:01:12 -05:00
|
|
|
if data.endswith(b"\r\n\r\n"):
|
2018-06-02 18:39:15 +02:00
|
|
|
data = data[:-4]
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
class Disk:
|
|
|
|
def __init__(self, dev):
|
|
|
|
self.dev = os.path.realpath(dev)
|
|
|
|
self.mountpoint = None
|
2020-06-03 18:01:12 -05:00
|
|
|
with open("/etc/mtab", "r") as f:
|
2018-06-02 18:39:15 +02:00
|
|
|
mtab = f.read()
|
2021-03-15 19:27:36 +05:30
|
|
|
mount = [mount.split(" ") for mount in mtab.splitlines() if mount.startswith(self.dev)]
|
2018-06-02 18:39:15 +02:00
|
|
|
if mount:
|
|
|
|
self._path = mount[0][1]
|
|
|
|
else:
|
|
|
|
name = os.path.basename(dev)
|
2023-03-17 08:49:47 -07:00
|
|
|
try:
|
|
|
|
sh.pmount("-tvfat", dev, name, _timeout=10)
|
|
|
|
except sh.CommandNotFound:
|
|
|
|
raise ValueError()
|
2018-06-02 18:39:15 +02:00
|
|
|
self.mountpoint = "/media/" + name
|
|
|
|
self._path = self.mountpoint
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, exception_type, exception_value, traceback):
|
|
|
|
try:
|
|
|
|
self.close()
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
@property
|
|
|
|
def path(self):
|
|
|
|
return self._path
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
if not self.mountpoint:
|
|
|
|
return
|
|
|
|
mountpoint = self.mountpoint
|
|
|
|
self.mountpoint = None
|
|
|
|
|
|
|
|
start_time = time.monotonic()
|
|
|
|
unmounted = False
|
|
|
|
while not unmounted and start_time - time.monotonic() < 30:
|
|
|
|
try:
|
|
|
|
sh.pumount(mountpoint)
|
|
|
|
unmounted = True
|
|
|
|
except sh.ErrorReturnCode_5:
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
|
|
def sync(self):
|
|
|
|
disk_device = os.path.basename(self.dev)[:-1]
|
|
|
|
os.sync()
|
|
|
|
# Monitor the block device so we know when the sync request is actually finished.
|
|
|
|
with open("/sys/block/" + disk_device + "/stat", "r") as f:
|
|
|
|
disk_inflight = 1
|
|
|
|
last_wait_time = 0
|
|
|
|
wait_time = 1
|
|
|
|
while disk_inflight > 0 or wait_time > last_wait_time:
|
|
|
|
f.seek(0)
|
|
|
|
stats = f.read()
|
|
|
|
block_stats = stats.split()
|
|
|
|
disk_inflight = int(block_stats[8])
|
|
|
|
last_wait_time = wait_time
|
|
|
|
wait_time = int(block_stats[9])
|
|
|
|
|
|
|
|
def copy(self, src, dst=None, sync=True):
|
|
|
|
if dst is None:
|
|
|
|
dst = os.path.basename(src)
|
|
|
|
shutil.copy(src, os.path.join(self.path, dst))
|
|
|
|
if sync:
|
|
|
|
self.sync()
|
|
|
|
|
|
|
|
|
|
|
|
class Firmware:
|
|
|
|
def __init__(self, board):
|
|
|
|
self.board = board
|
|
|
|
|
|
|
|
@property
|
|
|
|
def disk(self):
|
|
|
|
disks = self.board.get_disks()
|
|
|
|
if len(disks) != 1:
|
|
|
|
raise RuntimeError("Boot disk not found for: " + self.board.device)
|
|
|
|
return Disk(disks[0])
|
|
|
|
|
|
|
|
@property
|
|
|
|
def info(self):
|
|
|
|
with self.disk as disk:
|
2020-06-03 18:01:12 -05:00
|
|
|
fname = os.path.join(disk.path, "INFO_UF2.TXT")
|
|
|
|
with open(fname, "r") as f:
|
2018-06-02 18:39:15 +02:00
|
|
|
info = f.read()
|
|
|
|
lines = info.splitlines()
|
|
|
|
res = {}
|
2020-06-03 18:01:12 -05:00
|
|
|
res["header"] = lines[0]
|
2018-06-02 18:39:15 +02:00
|
|
|
for line in lines[1:]:
|
2020-06-03 18:01:12 -05:00
|
|
|
k, _, v = line.partition(":")
|
|
|
|
res[k.replace(":", "")] = v.strip()
|
2018-06-02 18:39:15 +02:00
|
|
|
return res
|
|
|
|
|
|
|
|
def upload(self, fw):
|
2020-06-03 18:01:12 -05:00
|
|
|
with open(fw, "rb") as f:
|
2018-06-02 18:39:15 +02:00
|
|
|
header = f.read(32)
|
2020-06-03 18:01:12 -05:00
|
|
|
if header[0:4] != b"UF2\n":
|
|
|
|
raise ValueError("Only UF2 files are supported")
|
2018-06-02 18:39:15 +02:00
|
|
|
self.board.close()
|
|
|
|
with self.disk as disk:
|
|
|
|
disk.copy(fw, sync=False)
|
|
|
|
|
|
|
|
|
|
|
|
class CPboard:
|
|
|
|
@classmethod
|
|
|
|
def from_try_all(cls, name, **kwargs):
|
|
|
|
try:
|
|
|
|
return CPboard.from_build_name(name, **kwargs)
|
|
|
|
except ValueError:
|
|
|
|
pass
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
vendor, _, product = name.partition(":")
|
2018-06-02 18:39:15 +02:00
|
|
|
if vendor and product:
|
2021-03-15 19:27:36 +05:30
|
|
|
return CPboard.from_usb(**kwargs, idVendor=int(vendor, 16), idProduct=int(product, 16))
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
return CPboard(name, **kwargs)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_build_name(cls, name, **kwargs):
|
|
|
|
boards = {
|
2020-06-03 18:01:12 -05:00
|
|
|
#'arduino_zero'
|
|
|
|
"circuitplayground_express": (0x239A, 0x8019),
|
|
|
|
#'feather_m0_adalogger' : (0x239a, ),
|
|
|
|
#'feather_m0_basic' : (0x239a, ),
|
|
|
|
"feather_m0_express": (0x239A, 0x8023),
|
|
|
|
#'feather_m0_rfm69' : (0x239a, ),
|
|
|
|
#'feather_m0_rfm9x' : (0x239a, ),
|
|
|
|
#'feather_m0_supersized' : (0x239a, ),
|
|
|
|
#'feather_m4_express' : (0x239a, ),
|
|
|
|
#'gemma_m0' : (0x239a, ),
|
|
|
|
#'itsybitsy_m0_express' : (0x239a, ),
|
|
|
|
"itsybitsy_m4_express": (0x239A, 0x802C),
|
|
|
|
"metro_m0_express": (0x239A, 0x8014),
|
|
|
|
"metro_m4_express": (0x239A, 0x8021),
|
|
|
|
#'metro_m4_express_revb' : (0x239a, ),
|
|
|
|
#'pirkey_m0' : (0x239a, ),
|
|
|
|
#'trinket_m0' : (0x239a, ),
|
|
|
|
#'trinket_m0_haxpress' : (0x239a, ),
|
|
|
|
#'ugame10'
|
|
|
|
}
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
try:
|
|
|
|
vendor, product = boards[name]
|
|
|
|
except KeyError:
|
|
|
|
raise ValueError("Unknown build name: " + name)
|
|
|
|
|
|
|
|
return CPboard.from_usb(**kwargs, idVendor=vendor, idProduct=product)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_build_name_bootloader(cls, name, **kwargs):
|
|
|
|
boards = {
|
2020-06-03 18:01:12 -05:00
|
|
|
#'arduino_zero'
|
|
|
|
#'circuitplayground_express' : (0x239a, ),
|
|
|
|
#'feather_m0_adalogger' : (0x239a, ),
|
|
|
|
#'feather_m0_basic' : (0x239a, ),
|
|
|
|
"feather_m0_express": (0x239A, 0x001B),
|
|
|
|
#'feather_m0_rfm69' : (0x239a, ),
|
|
|
|
#'feather_m0_rfm9x' : (0x239a, ),
|
|
|
|
#'feather_m0_supersized' : (0x239a, ),
|
|
|
|
#'feather_m4_express' : (0x239a, ),
|
|
|
|
#'gemma_m0' : (0x239a, ),
|
|
|
|
#'itsybitsy_m0_express' : (0x239a, ),
|
|
|
|
"itsybitsy_m4_express": (0x239A, 0x002B),
|
|
|
|
#'metro_m0_express' : (0x239a, 0x8014),
|
|
|
|
"metro_m4_express": (0x239A, 0x0021),
|
|
|
|
#'metro_m4_express_revb' : (0x239a, ),
|
|
|
|
#'pirkey_m0' : (0x239a, ),
|
|
|
|
#'trinket_m0' : (0x239a, ),
|
|
|
|
#'trinket_m0_haxpress' : (0x239a, ),
|
|
|
|
#'ugame10'
|
|
|
|
}
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
try:
|
|
|
|
vendor, product = boards[name]
|
|
|
|
except KeyError:
|
|
|
|
raise ValueError("Unknown build name: " + name)
|
|
|
|
|
|
|
|
board = CPboard.from_usb(**kwargs, idVendor=vendor, idProduct=product)
|
|
|
|
board.bootloader = True
|
|
|
|
return board
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_usb(cls, baudrate=115200, wait=0, timeout=10, **kwargs):
|
|
|
|
import usb.core
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
dev = usb.core.find(**kwargs)
|
|
|
|
if not dev:
|
|
|
|
s = "Can't find USB device: "
|
|
|
|
args = []
|
|
|
|
for x in kwargs.items():
|
|
|
|
try:
|
2020-06-03 18:01:12 -05:00
|
|
|
args.append("%s=0x%x" % x)
|
2018-06-02 18:39:15 +02:00
|
|
|
except:
|
2020-06-03 18:01:12 -05:00
|
|
|
args.append("%s = %s" % x)
|
|
|
|
raise RuntimeError("Can't find USB device: " + ", ".join(args))
|
2018-06-02 18:39:15 +02:00
|
|
|
return cls(dev, baudrate=baudrate, wait=wait, timeout=timeout)
|
|
|
|
|
|
|
|
def __init__(self, device, baudrate=115200, wait=0, timeout=10):
|
2023-02-28 15:07:35 -08:00
|
|
|
self.device = str(pathlib.Path(device).resolve())
|
2018-06-02 18:39:15 +02:00
|
|
|
self.usb_dev = None
|
|
|
|
try:
|
|
|
|
# Is it a usb.core.Device?
|
2020-06-03 18:01:12 -05:00
|
|
|
portstr = ":" + ".".join(map(str, device.port_numbers)) + ":"
|
2018-06-02 18:39:15 +02:00
|
|
|
except:
|
|
|
|
pass
|
|
|
|
else:
|
2021-03-15 19:27:36 +05:30
|
|
|
serials = [serial for serial in os.listdir("/dev/serial/by-path") if portstr in serial]
|
2018-06-02 18:39:15 +02:00
|
|
|
if len(serials) != 1:
|
2023-02-28 15:07:35 -08:00
|
|
|
raise RuntimeError("Can't find exactly one matching usb serial device")
|
2018-06-02 18:39:15 +02:00
|
|
|
self.device = os.path.realpath("/dev/serial/by-path/" + serials[0])
|
|
|
|
self.usb_dev = device
|
|
|
|
|
|
|
|
self.baudrate = baudrate
|
|
|
|
self.wait = wait
|
|
|
|
self.timeout = timeout
|
|
|
|
self.debug = True
|
|
|
|
self.mount = None
|
|
|
|
self.serial = None
|
|
|
|
self.bootloader = False
|
|
|
|
self.repl = REPL(self)
|
|
|
|
|
2023-02-28 15:07:35 -08:00
|
|
|
# Disable autoreload so that file copies won't mess us up.
|
|
|
|
with self:
|
|
|
|
self.exec("import supervisor;supervisor.runtime.autoreload = False")
|
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
def __enter__(self):
|
|
|
|
self.open()
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, exception_type, exception_value, traceback):
|
|
|
|
self.close()
|
|
|
|
|
|
|
|
def open(self, baudrate=None, wait=None):
|
|
|
|
if self.serial:
|
|
|
|
return
|
|
|
|
if baudrate is None:
|
|
|
|
baudrate = self.baudrate
|
|
|
|
if wait is None:
|
|
|
|
wait = self.wait
|
|
|
|
|
|
|
|
delayed = False
|
|
|
|
for attempt in range(wait + 1):
|
|
|
|
try:
|
2020-06-03 18:01:12 -05:00
|
|
|
self.serial = serial.Serial(
|
|
|
|
self.device,
|
|
|
|
baudrate=self.baudrate,
|
|
|
|
timeout=self.timeout,
|
|
|
|
inter_byte_timeout=10,
|
|
|
|
write_timeout=self.timeout,
|
|
|
|
)
|
2018-06-02 18:39:15 +02:00
|
|
|
break
|
2020-06-03 18:01:12 -05:00
|
|
|
except (OSError, IOError): # Py2 and Py3 have different errors
|
2018-06-02 18:39:15 +02:00
|
|
|
if wait == 0:
|
|
|
|
continue
|
|
|
|
if attempt == 0:
|
2020-06-03 18:01:12 -05:00
|
|
|
sys.stdout.write("Waiting {} seconds for board ".format(wait))
|
2018-06-02 18:39:15 +02:00
|
|
|
delayed = True
|
|
|
|
time.sleep(1)
|
2020-06-03 18:01:12 -05:00
|
|
|
sys.stdout.write(".")
|
2018-06-02 18:39:15 +02:00
|
|
|
sys.stdout.flush()
|
|
|
|
else:
|
|
|
|
if delayed:
|
2020-06-03 18:01:12 -05:00
|
|
|
print("")
|
|
|
|
raise CPboardError("failed to access " + self.device)
|
2018-06-02 18:39:15 +02:00
|
|
|
if delayed:
|
2020-06-03 18:01:12 -05:00
|
|
|
print("")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
def close(self):
|
|
|
|
if self.serial:
|
|
|
|
self.serial.close()
|
|
|
|
self.serial = None
|
|
|
|
|
2020-06-03 21:51:33 -05:00
|
|
|
def exec(self, command, timeout=10, wait_for_response=True):
|
2018-06-02 18:39:15 +02:00
|
|
|
with self.repl as repl:
|
|
|
|
try:
|
2020-06-03 18:01:12 -05:00
|
|
|
output, error = repl.execute(
|
|
|
|
command, timeout=timeout, wait_for_response=wait_for_response
|
|
|
|
)
|
2018-06-02 18:39:15 +02:00
|
|
|
except OSError as e:
|
|
|
|
if self.debug:
|
2020-06-03 18:01:12 -05:00
|
|
|
print("exec: session: ", self.repl.session)
|
|
|
|
raise CPboardError("timeout", e)
|
2018-06-02 18:39:15 +02:00
|
|
|
if error:
|
2020-06-03 18:01:12 -05:00
|
|
|
raise CPboardError("exception", output, error)
|
2018-06-02 18:39:15 +02:00
|
|
|
return output
|
|
|
|
|
|
|
|
def eval(self, expression, timeout=10):
|
2020-06-03 18:01:12 -05:00
|
|
|
command = "print({})".format(expression)
|
2018-06-02 18:39:15 +02:00
|
|
|
with self.repl as repl:
|
|
|
|
output, error = repl.execute(command, timeout=timeout)
|
|
|
|
if error:
|
2020-06-03 18:01:12 -05:00
|
|
|
raise CPboardError("exception", output, error)
|
2018-06-02 18:39:15 +02:00
|
|
|
try:
|
2020-06-03 18:01:12 -05:00
|
|
|
res = eval(str(output, encoding="utf8"))
|
2018-06-02 18:39:15 +02:00
|
|
|
except:
|
2020-06-03 18:01:12 -05:00
|
|
|
raise CPboardError("failed to eval: %s" % output)
|
2018-06-02 18:39:15 +02:00
|
|
|
return res
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
def _reset(self, mode="NORMAL"):
|
|
|
|
self.exec(
|
|
|
|
"import microcontroller;microcontroller.on_next_reset(microcontroller.RunMode.%s)"
|
|
|
|
% mode
|
|
|
|
)
|
2018-06-02 18:39:15 +02:00
|
|
|
try:
|
2021-03-15 19:27:36 +05:30
|
|
|
self.exec("import microcontroller;microcontroller.reset()", wait_for_response=False)
|
2018-06-02 18:39:15 +02:00
|
|
|
except OSError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
def reset(self, safe_mode=False, delay=5, wait=10):
|
2020-06-03 18:01:12 -05:00
|
|
|
self._reset("SAFE_MODE" if safe_mode else "NORMAL")
|
2018-06-02 18:39:15 +02:00
|
|
|
self.close()
|
|
|
|
time.sleep(delay)
|
|
|
|
self.open(wait)
|
|
|
|
time.sleep(delay)
|
|
|
|
|
|
|
|
def reset_to_bootloader(self, repl=False):
|
|
|
|
if repl:
|
2020-06-03 18:01:12 -05:00
|
|
|
self._reset("BOOTLOADER")
|
2018-06-02 18:39:15 +02:00
|
|
|
self.close()
|
|
|
|
else:
|
|
|
|
self.close()
|
|
|
|
s = serial.Serial(self.device, 1200, write_timeout=4, timeout=4)
|
|
|
|
s.close()
|
|
|
|
|
|
|
|
def get_port_info(self):
|
|
|
|
portinfo = None
|
|
|
|
for port_iter in serial.tools.list_ports.comports():
|
|
|
|
if port_iter.device == self.device:
|
|
|
|
portinfo = port_iter
|
|
|
|
break
|
|
|
|
return portinfo
|
|
|
|
|
|
|
|
@property
|
|
|
|
def serial_number(self):
|
2020-06-03 18:01:12 -05:00
|
|
|
try: # Permissions are needed to read the value
|
2018-06-02 18:39:15 +02:00
|
|
|
return self.usb_dev.serial_number
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
p = self.get_port_info()
|
|
|
|
return p.serial_number if p else None
|
|
|
|
|
|
|
|
def get_disks(self):
|
|
|
|
if self.usb_dev:
|
2020-06-03 18:01:12 -05:00
|
|
|
portstr = ":" + ".".join(map(str, self.usb_dev.port_numbers)) + ":"
|
|
|
|
return [
|
|
|
|
"/dev/disk/by-path/" + disk
|
|
|
|
for disk in os.listdir("/dev/disk/by-path")
|
|
|
|
if portstr in disk
|
|
|
|
]
|
2018-06-02 18:39:15 +02:00
|
|
|
serial = self.serial_number
|
|
|
|
if not serial:
|
|
|
|
raise RuntimeError("Serial number not found for: " + self.device)
|
2020-06-03 18:01:12 -05:00
|
|
|
return [
|
2021-03-15 19:27:36 +05:30
|
|
|
"/dev/disk/by-id/" + disk for disk in os.listdir("/dev/disk/by-id") if serial in disk
|
2020-06-03 18:01:12 -05:00
|
|
|
]
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
@property
|
|
|
|
def disk(self):
|
|
|
|
disks = self.get_disks()
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
part = [part for part in disks if "part1" in part]
|
2018-06-02 18:39:15 +02:00
|
|
|
if not part:
|
2023-02-28 15:07:35 -08:00
|
|
|
return None
|
2018-06-02 18:39:15 +02:00
|
|
|
|
2023-03-17 08:49:47 -07:00
|
|
|
try:
|
|
|
|
return Disk(part[0])
|
|
|
|
except ValueError:
|
|
|
|
return None
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
@property
|
|
|
|
def firmware(self):
|
|
|
|
return Firmware(self)
|
|
|
|
|
|
|
|
def execfile_disk(self, filename):
|
|
|
|
with self.disk as disk:
|
2020-06-03 18:01:12 -05:00
|
|
|
disk.copy(filename, "code.py")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
with self.repl as repl:
|
|
|
|
try:
|
|
|
|
output = repl.run()
|
|
|
|
except OSError as e:
|
2020-06-03 18:01:12 -05:00
|
|
|
raise CPboardError("timeout", e)
|
2018-06-02 18:39:15 +02:00
|
|
|
except RuntimeError:
|
|
|
|
if self.repl.safe_mode:
|
|
|
|
raise PyboardError("Can't run in safe mode")
|
|
|
|
else:
|
|
|
|
raise
|
|
|
|
|
|
|
|
return output
|
|
|
|
|
|
|
|
def execfile(self, filename, timeout=10):
|
2020-06-03 18:01:12 -05:00
|
|
|
if os.environ.get("CPBOARD_EXEC_MODE") == "disk":
|
2018-06-02 18:39:15 +02:00
|
|
|
return self.execfile_disk(filename)
|
|
|
|
else:
|
2020-06-03 18:01:12 -05:00
|
|
|
with open(filename, "rb") as f:
|
2018-06-02 18:39:15 +02:00
|
|
|
pyfile = f.read()
|
|
|
|
return self.exec(pyfile, timeout=timeout)
|
|
|
|
|
|
|
|
|
|
|
|
# Implement just enough to make tests/run-tests work
|
|
|
|
PyboardError = CPboardError
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
class Pyboard:
|
2021-03-15 19:27:36 +05:30
|
|
|
def __init__(self, device, baudrate=115200, user="micro", password="python", wait=0):
|
2018-06-02 18:39:15 +02:00
|
|
|
self.board = CPboard.from_try_all(device, baudrate=baudrate, wait=wait)
|
2023-03-17 08:49:47 -07:00
|
|
|
disk = self.board.disk
|
|
|
|
if disk:
|
|
|
|
with disk as open_disk:
|
|
|
|
open_disk.copy("skip_if.py")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self.board.close()
|
|
|
|
|
|
|
|
def enter_raw_repl(self):
|
|
|
|
self.board.open()
|
2023-03-17 08:49:47 -07:00
|
|
|
self.board.repl.reset()
|
2018-06-02 18:39:15 +02:00
|
|
|
|
2023-02-28 15:07:35 -08:00
|
|
|
def exit_raw_repl(self):
|
|
|
|
self.close()
|
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
def execfile(self, filename):
|
|
|
|
return self.board.execfile(filename)
|
|
|
|
|
2023-02-28 15:07:35 -08:00
|
|
|
def exec_(self, command, data_consumer=None):
|
2023-03-17 08:49:47 -07:00
|
|
|
try:
|
|
|
|
output, error = self.board.repl.execute(command, timeout=20000, wait_for_response=True)
|
|
|
|
except OSError as e:
|
|
|
|
raise CPboardError("timeout", e)
|
|
|
|
if error:
|
|
|
|
raise CPboardError("exception", output, error)
|
2023-02-28 15:07:35 -08:00
|
|
|
return output
|
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
def eval_namedtuple(board, command):
|
|
|
|
from collections import namedtuple
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
s = board.exec("print(%s)" % command)
|
|
|
|
s = s.decode().strip()
|
2020-06-03 18:01:12 -05:00
|
|
|
items = [key.split("=") for key in s[1:-1].split(", ")]
|
2018-06-02 18:39:15 +02:00
|
|
|
keys = [item[0] for item in items]
|
|
|
|
vals = [item[1] for item in items]
|
2020-06-03 18:01:12 -05:00
|
|
|
nt = namedtuple("eval", keys)
|
2018-06-02 18:39:15 +02:00
|
|
|
res = nt(*[eval(val) for val in vals])
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
|
|
def os_uname(board):
|
|
|
|
return eval_namedtuple(board, "__import__('os').uname()")
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
def print_verbose(cargs, *args, **kwargs):
|
|
|
|
if cargs.verbose:
|
|
|
|
print(*args, flush=True, **kwargs)
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
def upload(args):
|
|
|
|
try:
|
|
|
|
board = CPboard.from_build_name_bootloader(args.board)
|
2020-06-03 18:01:12 -05:00
|
|
|
print_verbose(args, "Board is already in the bootloader")
|
2018-06-02 18:39:15 +02:00
|
|
|
except (ValueError, RuntimeError):
|
|
|
|
board = CPboard.from_try_all(args.board)
|
|
|
|
|
|
|
|
print_verbose(args, "Serial number :", board.serial_number)
|
|
|
|
|
|
|
|
if not (args.quiet or board.bootloader):
|
|
|
|
board.open()
|
2020-06-03 18:01:12 -05:00
|
|
|
print("Current version:", os_uname(board).version, flush=True)
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
if not board.bootloader:
|
2020-06-03 18:01:12 -05:00
|
|
|
print_verbose(args, "Reset to bootloader...", end="")
|
2021-03-15 19:27:36 +05:30
|
|
|
board.reset_to_bootloader(repl=True) # Feather M0 Express doesn't respond to 1200 baud
|
2018-06-02 18:39:15 +02:00
|
|
|
time.sleep(5)
|
2020-06-03 18:01:12 -05:00
|
|
|
print_verbose(args, "done")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
print_verbose(args, "Bootloader:", board.firmware.info)
|
2018-06-02 18:39:15 +02:00
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
print_verbose(args, "Upload firmware...", end="")
|
2018-06-02 18:39:15 +02:00
|
|
|
board.firmware.upload(args.firmware)
|
2020-06-03 18:01:12 -05:00
|
|
|
print_verbose(args, "done")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
print_verbose(args, "Wait for board...", end="")
|
2018-06-02 18:39:15 +02:00
|
|
|
time.sleep(5)
|
2020-06-03 18:01:12 -05:00
|
|
|
print_verbose(args, "done")
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
if not args.quiet:
|
|
|
|
if board.bootloader:
|
|
|
|
board = CPboard.from_try_all(args.board)
|
|
|
|
board.open(wait=10)
|
2020-06-03 18:01:12 -05:00
|
|
|
print("New version:", os_uname(board).version, flush=True)
|
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
def print_error_exit(args, e):
|
|
|
|
if args.debug:
|
|
|
|
return False
|
|
|
|
if not args.quiet:
|
|
|
|
print(e, file=sys.stderr)
|
|
|
|
sys.exit(1)
|
|
|
|
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
def main():
|
|
|
|
import argparse
|
2020-06-03 18:01:12 -05:00
|
|
|
|
2021-05-06 16:22:35 +03:00
|
|
|
cmd_parser = argparse.ArgumentParser(description="CircuitPython Board Tool")
|
2020-06-03 18:01:12 -05:00
|
|
|
cmd_parser.add_argument("board", help="build_name, vid:pid or /dev/tty")
|
|
|
|
cmd_parser.add_argument("-f", "--firmware", help="upload UF2 firmware file")
|
|
|
|
cmd_parser.add_argument("-c", "--command", help="program passed in as string")
|
|
|
|
cmd_parser.add_argument("--tty", action="store_true", help="print tty")
|
2021-03-15 19:27:36 +05:30
|
|
|
cmd_parser.add_argument("--verbose", "-v", action="count", default=0, help="be verbose")
|
2020-06-03 18:01:12 -05:00
|
|
|
cmd_parser.add_argument("-q", "--quiet", action="store_true", help="be quiet")
|
|
|
|
cmd_parser.add_argument("--debug", action="store_true", help="raise exceptions")
|
2018-06-02 18:39:15 +02:00
|
|
|
args = cmd_parser.parse_args()
|
|
|
|
|
|
|
|
if args.quiet:
|
|
|
|
args.verbose = 0
|
|
|
|
args.debug = False
|
|
|
|
|
|
|
|
if args.firmware:
|
|
|
|
try:
|
|
|
|
upload(args)
|
|
|
|
except BaseException as e:
|
|
|
|
if not print_error_exit(args, e):
|
|
|
|
raise
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
try:
|
|
|
|
board = CPboard.from_try_all(args.board)
|
|
|
|
except BaseException as e:
|
|
|
|
if not print_error_exit(args, e):
|
|
|
|
raise
|
|
|
|
|
|
|
|
if args.verbose:
|
2020-06-03 18:01:12 -05:00
|
|
|
exec_mode = os.environ.get("CPBOARD_EXEC_MODE")
|
2018-06-02 18:39:15 +02:00
|
|
|
if exec_mode:
|
2020-06-03 18:01:12 -05:00
|
|
|
print("CPBOARD_EXEC_MODE =", exec_mode)
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
# Make sure we can open serial
|
|
|
|
try:
|
|
|
|
with board:
|
|
|
|
pass
|
|
|
|
except BaseException as e:
|
|
|
|
if not print_error_exit(args, e):
|
|
|
|
raise
|
|
|
|
|
|
|
|
if args.tty:
|
|
|
|
print(board.device)
|
|
|
|
elif args.command:
|
|
|
|
with board as b:
|
|
|
|
print(b.eval(args.command))
|
|
|
|
else:
|
|
|
|
with board as b:
|
2020-06-03 18:01:12 -05:00
|
|
|
print("Device: ", end="")
|
2018-06-02 18:39:15 +02:00
|
|
|
if b.usb_dev:
|
2021-03-15 19:27:36 +05:30
|
|
|
print("%04x:%04x on " % (b.usb_dev.idVendor, b.usb_dev.idProduct), end="")
|
2018-06-02 18:39:15 +02:00
|
|
|
print(b.device)
|
2020-06-03 18:01:12 -05:00
|
|
|
print("Serial number:", b.serial_number)
|
2018-06-02 18:39:15 +02:00
|
|
|
uname = os_uname(b)
|
2020-06-03 18:01:12 -05:00
|
|
|
print("os.uname:")
|
|
|
|
print(" sysname:", uname.sysname)
|
|
|
|
print(" nodename:", uname.nodename)
|
|
|
|
print(" release:", uname.release)
|
|
|
|
print(" version:", uname.version)
|
|
|
|
print(" machine:", uname.machine)
|
|
|
|
|
2018-06-02 18:39:15 +02:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|